Wieder grüßt das Murmeltier: 2. Teil der Fehlerbehandlung für Kafka Consumer mit Retries

Dies ist eine Fortsetzung des Blog-Posts Fehlerbehandlung für Kafka Consumer mit Retries, mit seitdem gewonnenen Erkenntnissen.

Phil Connors: Well, it's Groundhog Day... again...

Dicker Murmler

Hilfsklasse Processed-Messages-Filter

Im ersten Teil haben wir uns Non-blocking Retries angesehen, wodurch die Reihenfolge der Kafka-Events durcheinanderkommen kann: Deshalb haben wir die Uhrzeiten der Events verglichen, um veraltete Events zu verwerfen. Konkret in dem Beispiel war es die Uhrzeit des letzten gespeicherten Wetterberichtes: Kommen ältere Wetterberichte durch ein Retry herein, können wir sie ignorieren.

Hier hat sich in unseren Projekten inzwischen eine generische Hilfsklasse ProcessedMessagesFilter etabliert. Wir speichern die Uhrzeit des letzten verarbeiteten Events in der Datenbank:

@Entity
data class ProcessedEvent(
    val eventType: String,           // "weatherForecast"
    val key: String,                 // "Hamburg", Key des Kafka-Events
    val timestamp: OffsetDateTime,   // Uhrzeit des letzten Updates
)

Damit können wir generisch prüfen, ob bereits neuere Events verarbeitet wurden. So sieht die Benutzung aus:

processedMessagesFilter.processIfNotOutdated(
    eventType = "weatherForecast",
    key = weatherForecast.city,
    timestamp = weatherForecast.time) {

    // wird nur ausgeführt, falls das event nicht veraltet ist
    repository.save(weatherForecast)
}

Vorteil der generischen Lösung ist eine Vereinfachung des Codes, weil wir nicht mehr wissen müssen, wo in den fachlichen Objekten Timestamps gespeichert werden. Und vor allem: Wenn am Ende ihres Lebenszyklus die fachlichen Objekte gelöscht werden, merken wir uns davon unabhängig weiterhin, dass diese Events bereits verarbeitet wurden. Sonst könnte es sein, dass durch ein Retry ein bereits gelöschtes Objekt wieder angelegt wird.

Die Implementierung des ProcessedMessagesFilters ist straight-forward:

@Service
class ProcessedMessagesFilter(
    val processedMessagesRepository: ProcessedMessagesRepository,
) {
    fun <R> processIfNotOutdated(
        eventType: String,
        key: String,
        timestamp: OffsetDateTime,
        process: () -> R) {

        if (!isOutdated(eventType, key, timestamp)) {
            process()
            processedMessagesRepository.save(ProcessedEvent(eventType, key, timestamp))
        }
    }

    private fun isOutdated(
        eventType: String,
        key: String,
        timestamp: OffsetDateTime): Boolean {

        return processedMessagesRepository.findBy(eventType, key)
        .map { lastProcessed ->
            // note: If the timestamp are identical, we want to process,
            // allowing to re-process a topic if needed
            (timestamp.isBefore(lastProcessed.timestamp))
        }.orElse(
            // unknown = new key, we want to process it
            false,
        )
    }
}

Zusätzlich sollte man noch einen Aufräumjob implementieren, der nach längerer Zeit (mehreren Monaten) die ProcessedEvents aufräumt.

Dicker Murmler

Fiese Fallen

Zwei neue fiese Fallen aus der Praxis!

Fiese Falle: Exponentielle Retry-Vermehrung

Durch einen Copy-und-Paste Fehler hatten zwei Consumer in verschiedenen Consumer-Groups dasselbe Retry-Topic konfiguriert. Was jetzt passierte: Eine Nachricht landete im Retry-Topic, beide Consumer konsumierten sie, liefen beide auf einen Fehler, und schrieben jeweils eine Nachricht in das Retry-Topic. Aus 1 Nachricht wurden also 2, 4, 8, 16…, bis der Kafka in die Knie ging.

Fiese Falle: Große Callstacks

Unsere RetryTopics wurden sehr groß, fast 80 GB. Die Analyse zeigte, dass die Retry-Events in einem Kafka-Header den gesamten Callstack des fehlgeschlagenen Versuchs enthielten. Der Callstack-Retry-Header hatte fast 15000 Bytes, im Vergleich zu dem kleinen Event mit mit 350 Bytes plus sonstige Retry-Header mit 1300 Bytes. Das Event wurde also fast um den Faktor 50 größer, und zusätzlich 50x retried. Damit wurden aus 30 MB Originalevents * 50 * 50 = 75 GB Retry-Daten. Auch diese Header lassen sich abschalten:

@EnableKafka
@Configuration
@EnableScheduling
class RetryTopicsConfig : RetryTopicConfigurationSupport() {
  override fun configureDeadLetterPublishingContainerFactory(): Consumer<DeadLetterPublishingRecovererFactory> =
    Consumer<DeadLetterPublishingRecovererFactory> { recovererFactory ->
      recovererFactory.setRetainAllRetryHeaderValues(false)
      recovererFactory.setDeadLetterPublishingRecovererCustomizer { dlpr ->
        dlpr.excludeHeader(
          HeadersToAdd.EX_STACKTRACE,
          HeadersToAdd.EXCEPTION,
          HeadersToAdd.EX_CAUSE,
          HeadersToAdd.EX_MSG,
        )
      }
    }
}

Fazit

Mit dem generischen ProcessedEventsService und dem Support durch Spring sind Retries schnell eingebaut. Die beschriebenen Fallen sollte man aber kennen, weil man mit Retries versehentlich sehr viele Events produzieren kann.

Wir wünschen euch weiterhin viel Erfolg mit der Fehlerbehandlung mit Retries!

Phil: Do you know what today is?
Rita: No, what?
Phil: Today is tomorrow. It happened.

Kommentare