Täglich grüßt das Murmeltier: Fehlerbehandlung für Kafka Consumer mit Retries

Murphy’s Law sagt: “Anything that can go wrong will go wrong” - wenn auf etwas Verlass ist, dann auf den Fehlerteufel. Deshalb schauen wir uns an, wie wir in den Kafka-Consumern die Event-Verarbeitung mit Retries robuster bauen können. Wir benutzen im Projekt Kafka mit Kotlin und spring-kafka, die grundlegenden Konzepte lassen sich aber auch auf andere Systeme übertragen.

Ausfall und Ausreißer

In der Praxis begegnen uns zwei ganz unterschiedliche Fehlerbilder: Der Ausfall und der Ausreißer. Beim Ausfall ist eine Komponente - z.B. eine Datenbank - vorübergehend gestört; der Fehler betrifft alle Events, und er repariert sich nach einer gewissen Zeit.

Beim Ausreißer gibt es ein einzelnes fachliches Event, das Ärger macht; die anderen Events sind nicht betroffen, zum Beispiel fehlt ein notwendiges Feld. Dieses Problem löst sich typischerweise nicht von alleine.

In der Praxis ist es für die Software nicht einfach zu entscheiden, welches der Fehlerbilder vorliegt, da sowohl Ausfälle als auch Ausreißer mit ganz unterschiedlichen Fehlerbildern auftreten können.

Fehlerbehandlung mit Retries

Eine einfache Fehlerbehandlungsstrategie ist es, Ereignisse im Fehlerfall so lange zu wiederholen, bis die Verarbeitung funktioniert. Das ist vor allem bei Problemen hilfreich, die sich von alleine reparieren; fixen sie sich nicht von alleine, lösen wir nach einer gewissen Zeit einen Alarm aus.

Dicker Murmler

Blocking Retries

Ein nützliches Feature von Kafka ist, dass innerhalb einer Partition die Reihenfolge der Events gewahrt bleibt: Events kommen in der Reihenfolge an, in der sie geschehen sind. Nehmen wir an, unsere Events sind Wetterberichte für Orte, und zu jedem Ort interessiert uns immer nur der aktuelle Wetterbericht: das ist dann jeweils das letzte ankommende Event.

Ein blockierendes Retry erhält diese Reihenfolge: Erst wenn das Retry erfolgreich war, wird das nächste Event verarbeitet, alle anderen Events in der Partition warten so lange.

Das ist prima für das Fehlerbild “Ausfall”, von dem alle Events betroffen sind.

Nach Baeldung können wir blockierende Retries mit spring-kafka recht einfach einrichten, hier in Kotlin, indem wir einen DefaultErrorHandler benutzen:

typealias CityName = String

@KafkaListener(
  topics = ["weather-forecast"],
  clientIdPrefix = "my-listener",
  groupId = "my-listener-group",
  containerFactory = "forecastListenerFactory",
)
fun consume(consumerRecord: ConsumerRecord<CityName, WeatherForecast>) {
  try {
    val forecast = consumerRecord.value()
    weatherForecastService.save(forecast)
  } catch (e: Exception) {
    metrics.retryCounter.increment()
    throw e
  }
}

private val retryIntervalMs: Long = 10000L // alle 10 Sekunden
private val maxAttempts: Long = 90L // 90 * 10 Sekunden = 15 Minuten

@Bean
fun errorHandler(): DefaultErrorHandler {
  val backOff: BackOff = FixedBackOff(retryIntervalMs, maxAttempts)
  return DefaultErrorHandler({ consumerRecord, e ->
    logger.error("Skipping event, retries exhausted: $consumerRecord", exception)
    metrics.skippedCounter.increment()
  }, backOff)
}

@Bean
fun forecastListenerFactory(): ConcurrentKafkaListenerContainerFactory<CityName, WeatherForecast> {
  val factory = KafkaContainerListenerFactory(kafkaProperties).create<WeatherForecast>()
  factory.setCommonErrorHandler(errorHandler())

  // acknowledge every event separately: see Baeldung for discussion
  factory.containerProperties.ackMode = ContainerProperties.AckMode.RECORD

  factory.afterPropertiesSet()
  return factory
}

Nicht praktisch ist diese Lösung für das Fehlerbild “Ausreißer”: Ein fehlerhaftes Event blockiert alle anderen Events. In unserem Beispiel würde ein Ausreißer alle 10 Sekunden wiederholt werden, bis nach 90 Versuchen = insgesamt 15 Minuten die Verarbeitung dieses Events aufgegeben wird, d.h. für 15 Minuten steht in der Partition alles still. Kürzer wollen wir den Zeitraum nicht setzen, weil sonst ein Ausfall von 15 Minuten nicht überbrückt wird.

Zusätzlich ist es nicht gesagt, dass es bei einem Ausreißer bleibt, ein solcher kann mehrfach oder an verschiedenen Stellen auftreten, und jedes Mal steht die Event-Verarbeitung.

Schauen wir uns deshalb die Non-Blocking Retries an, die dieses Problem nicht haben.

Non-Blocking Retries

Damit die fehlerhaften Events wiederholt werden können, müssen sie zwischenspeichert werden, während die restliche Event-Verarbeitung weiterläuft. Wie bei Kafka üblich, werden die fehlerhaften Events von spring-kafka in ein dafür angelegtes Topic gespeichert, das Retry-Topic.

Während das Ursprungs-Topic weather-forecast mehrere Konsumenten haben kann, ergibt ist das Retry-Topic weather-forecast-retries Sinn pro Konsument: Die Verarbeitung muss ja nicht bei allen Konsumenten bei den gleichen Events fehlschlagen, es wollen nicht alle Konsumenten dieselben Events wiederholen (deshalb würde man in der Praxis an das Topic auch den Konsumentennamen anhängen, etwa weather-forecast-retries-my-consumer).

Für ein Non-Blocking Retry konfigurieren wir ein RetryableTopic an unserem KafkaListener. Wir benötigen jetzt auch eine KafkaProducerFactory, um in das Retry-Topic zu schreiben. Ebenfalls nach Baeldung:

typealias CityName = String

@RetryableTopic(
  backoff = Backoff(delayExpression = "300000"), // alle 5 Minuten
  attempts = "900", // 900 Versuche = 75 Stunden wiederholen
  kafkaTemplate = "retryProducerTemplate",
  retryTopicSuffix = "-retries", // retry-topic heisst weather-forecast-retries
  dltStrategy = DltStrategy.NO_DLT, // events werden nach #attempts Versuchen verworfen
  fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
  autoCreateTopics = "false",
)
@KafkaListener(
  topics = ["weather-forecast"],
  clientIdPrefix = "my-listener",
  groupId = "my-listener-group",
  containerFactory = "forecastListenerFactory",
)
fun consume(consumerRecord: ConsumerRecord<CityName, WeatherForecast>) {
  try {
    val forecast = consumerRecord.value()
    weatherForecastService.save(forecast)
  } catch (e: Exception) {
    metrics.retryCounter.increment()
    throw e
  }
}

@Bean
fun forecastListenerFactory(): ConcurrentKafkaListenerContainerFactory<CityName, WeatherForecast> {
  return KafkaContainerListenerFactory(kafkaProperties).create<WeatherForecast>()
}

@Bean
fun retryProducerTemplate(): KafkaTemplate<CityName, WeatherForecast> {
  return KafkaTemplate(
    DefaultKafkaProducerFactory(
      kafkaProperties.buildProducerProperties(),
      StringSerializer(),
      KafkaJsonSerdeFactory().create<WeatherForecast>().serializer(),
    ),
  )
}

Festes Retry-Intervall

Das fixedDelay verwenden wir aus pragmatischen Gründen: Grundsätzlich hätten wir gerne ein sich verlangsamendes Retry, das ein Event etwa nach 5 Minuten noch einmal probiert, dann nach 10, 20, 40 Minuten… Das führte aber dazu, das für jede Periode 5/10/20/40/… ein eigenes Retry-Topic angelegt werden müsste: Auch die Events eines Retry-Topics werden in Erzeugungsreihenfolge abgearbeitet, und nicht durcheinander. Da wir unser Kafka nicht mit diversen Retry-Topics (pro Konsument!) überfrachten wollen, benutzen wir ein Retry mit einer konstanten Periode (alle 5 Minuten), dann reicht ein Retry-Topic pro Konsument.

Die Events, die nach attempts Anzahl an Versuchen nicht erfolgreich sind, werden bei uns einfach verworfen. Wir könnten sie auch in einem sogenannten Dead-Letter-Topic zur manuellen Behandlung ablegen - das wäre dann ein zusätzliches Topic.

Reihenfolge der Events

Durch non-blocking Retries kann die Reihenfolge der Events durcheinander kommen, weil Retry-Versuche parallel zu der Verarbeitung neuer Events geschehen. Wir schauen deshalb beim Speichern der Wetterberichte auf die Uhrzeiten: Da uns immer nur der aktuellste Wetterbericht interessiert, merken wir uns die Uhrzeit des neuesten Wetterberichts, und ignorieren veraltete Wetterberichte einfach.

@Component
class WeatherForecastService(var repository: WeatherForecastRepository) {
  fun save(weatherForecast: WeatherForecast) {
    val existingForecast = repository.findByCity(weatherForecast.city).orElse(null)

    if (existingForecast == null) {
      repository.save(weatherForecast)
    } else if (existingForecast.time >= weatherForecast.time) {
      // skip: Update is older than existing value
    } else {
      repository.save(weatherForecast)
    }
  }
}

Hierbei können natürlich grundsätzlich Race-Conditions auftreten, wenn sich zwischen findByCity und save etwas in der DB ändert, uns hat dabei das Optimistic Locking von Spring Data JDBC geholfen.

Fiese Falle: autoCreate

Schließlich noch zwei fiese Fallen aus der Praxis: Per default steht im RetryableTopic der Wert autoCreateTopics=true, d.h. das Retry-Topic würde automatisch angelegt werden. Dazu lassen sich am RetryableTopic auch replicationFactor und numPartitions konfigurieren. Das hatte bei uns allerdings den Effekt, dass nicht nur das Retry-Topic weather-forecast-retries so konfiguriert wurde, sondern auch beim Ursprungs-Topic weather-forecast der replicationFactor und numPartitions umkonfiguriert wurden! So macht man sich beliebt bei den Kolleg:innen, die dieses Topic betreiben, insbesondere da die Zahl der Partitionen nicht einfach wieder verringert werden kann. Das autoCreate ist also mit Vorsicht zu verwenden, eventuell ist es mit diesem Bug gefixed.

Fiese Falle: Anwachsende Header

Nach dem Release unserer Software lagen in unserem Retry Topic eine handvoll fehlgeschlagene Events, die jeweils öfters wiederholt wurden. Wir wunderten uns, dass unser Retry-Topic im Verhältnis zur Anzahl der enthaltenen Nachrichten sehr groß wurde (mehrere GB), wobei die Events eigentlich winzig waren. Nach genauen Hinschauen fiel uns auf, dass die Event-Header mit jeder Wiederholung anwuchsen, bis sie für jedes einzelne Event größer als 1 MB waren. Hintergrund ist ein “eigenwilliges Verhalten”, das bei jedem Retry einige Header dupliziert. Dies lässt sich mit der folgenden Config abschalten:

@EnableKafka
@Configuration
@EnableScheduling
class RetryTopicsConfig : RetryTopicConfigurationSupport() {
  override fun configureDeadLetterPublishingContainerFactory(): Consumer<DeadLetterPublishingRecovererFactory> {
    return Consumer<DeadLetterPublishingRecovererFactory> { recovererFactory ->
      recovererFactory.setRetainAllRetryHeaderValues(false)
    }
  }
}

Fazit

Grundsätzlich ist es sinnvoll, sich über die Fehlerbehandlung Gedanken zu machen: Sollen Fehler verworfen werden, oder wiederholt? Auch hier nicht behandelte Muster haben ihre Anwendungsgebiete, etwa ein Neustart der Applikation oder ein CircuitBreaker.

Es ist komfortabel, dass sich eine komplexe Retry-Logik in spring-kafka mit wenigen Annotationen einrichten lässt. Auf dem Weg dahin gibt es, wie gesehen, einige Details zu beachten.

Die Lösungen lassen sich nicht 1:1 auf andere Frameworks übertragen wie Kafka Streams (siehe hier) oder reactor.KafkaReceiver, weil diese Frameworks wieder eigene Fehlerbehandlungs-Strategien nahelegen. Die grundsätzlichen Überlegungen zu Non-Blocking-Retries und Event-Reihenfolgen bleiben aber valide.

Dicker Murmler

Kommentare