IBM Cloud Docs
Nachrichten verarbeiten

Nachrichten verarbeiten

Ein Consumer ist eine Anwendung, die Datenströme von Nachrichten von Kafka-Topics verarbeitet. Ein Consumer kann mindestens ein Topic oder eine Partition abonnieren. Diese Informationen konzentrieren sich auf die Java™ -Programmierschnittstelle, die Teil des Apache Kafka -Projekts ist. Die Konzepte gelten auch für andere Sprachen, sie heißen nur etwas anders.

Wenn ein Consumer sich mit Kafka verbindet, wird eine erste Bootstrap-Verbindung hergestellt. Diese Verbindung kann mit einem der Server im Cluster hergestellt werden. Der Consumer fordert Informationen zu Partition und Leadership zum Topic an, den er verarbeiten möchte. Anschließend stellt der Konsument eine weitere Verbindung zum Partitionsleader her und kann Nachrichten verarbeiten. Diese Aktionen werden automatisch intern ausgeführt, wenn Ihr Consumer eine Verbindung zum Kafka-Cluster herstellt.

Ein Consumer ist in der Regel eine Anwendung mit langer Laufzeit. Ein Consumer fordert Nachrichten von Kafka an, indem er regelmäßig Consumer.poll(...) aufruft. Der Consumer ruft poll() auf, empfängt Nachrichten im Stapelbetrieb, verarbeitet sie sofort und ruft wieder poll() auf.

Wenn ein Consumer eine Nachricht verarbeitet, wird die Nachricht nicht aus dem Topic entfernt. Stattdessen können Consumer aus verschiedenen Möglichkeiten wählen, um Kafka mitzuteilen, welche Nachrichten verarbeitet wurden. Dieser Prozess wird als "Offset-Commit" bezeichnet.

In den Programmierschnittstellen wird eine Nachricht als Datensatz bezeichnet. Die Java-Klasse "org.apache.kafka.clients.consumer.ConsumerRecord" wird beispielsweise verwendet, um eine Nachricht für die Consumer-API darzustellen. Die Begriffe Datensatz und Nachricht können austauschbar verwendet werden, aber im Wesentlichen wird ein Datensatz verwendet, um eine Nachricht darzustellen.

Es kann hilfreich sein, diese Informationen zusammen mit Nachrichten erzeugen in Event Streamszu lesen.

Consumer-Eigenschaften konfigurieren

Es gibt viele Konfigurationseinstellungen für den Verbraucher, die Aspekte seines Verhaltens steuern. Die folgenden Einstellungen sind einige der wichtigsten.

Consumer-Eigenschaften konfigurieren
Name Beschreibung Gültige Werte Standard
key.deserializer Die Klasse, die zum Deserialisieren von Schlüsseln verwendet wird. Java-Klasse, die die Deserializer-Schnittstelle implementiert, z. B. org.apache.kafka.common.serialization.StringDeserializer. Kein Standardwert - Sie müssen einen Wert angeben.
value.deserializer Die Klasse, die zum Deserialisieren von Werten verwendet wird. Java-Klasse, die die Deserializer-Schnittstelle implementiert, z. B. org.apache.kafka.common.serialization.StringDeserializer. Kein Standardwert - Sie müssen einen Wert angeben.
group.id Eine ID für die Consumergruppe, zu der der Consumer gehört. Zeichenfolge Kein Standardwert
auto.offset.reset Das Verhalten, wenn der Consumer keinen ursprünglichen Offset hat oder der aktuelle Offset im Cluster nicht mehr verfügbar ist. latest, earliest, none Neueste
enable.auto.commit Legt fest, ob der Offset des Consumers automatisch im Hintergrund per Commit festgeschrieben wird. true, false Ja
auto.commit.interval.ms Die Anzahl der Millisekunden zwischen periodischen Commits von Offsets. 0,... 5000 (5 Sekunden)
max.poll.records Die maximale Anzahl von Datensätzen, die bei einem Aufruf von poll() zurückgegeben werden. 1,... 500
session.timeout.ms Die Anzahl der Millisekunden, innerhalb derer ein Consumer-Heartbeat empfangen werden muss, um die Mitgliedschaft eines Consumers in einer Consumergruppe zu behalten. 6000-300000 10000 (10 Sekunden)
max.poll.interval.ms Das maximale Zeitintervall zwischen Umfragen, bevor der Consumer die Gruppe verlässt. 1,... 300000 (5 Minuten)

Es sind viele weitere Konfigurationseinstellungen verfügbar, aber lesen Sie die Dokumentation zuApache Kafka, bevor Sie mit dem Experimentieren beginnen.

Konsumentengruppen

Eine Konsumentengruppe ist eine Gruppe von Konsumenten, die kooperiert, um Nachrichten aus einem oder mehreren Themen zu konsumieren. Die Consumer in einer Gruppe verwenden alle denselben Wert für die group.id-Konfiguration. Wenn Sie mehr als einen Consumer zum Verarbeiten Ihrer Workload brauchen, können Sie mehrere Consumer in derselben Consumergruppe ausführen. Selbst wenn Sie nur einen Konsumenten benötigen, ist es üblich, auch einen Wert für group.idanzugeben.

Jede Konsumentengruppe verfügt über einen Server im Cluster, der als Koordinator bezeichnet wird, der den Konsumenten in der Gruppe Partitionen zuordnet. Diese Verantwortung wird auf die Server im Cluster verteilt, um die Last gleichmäßig zu verteilen. Die Zuordnung von Partitionen zu Consumern kann sich bei jedem neuen Ausgleich in der Gruppe ändern.

Wenn ein Consumer einer Consumergruppe beitritt, erkennt er den Koordinator für diese Gruppe. Der Konsument teilt dem Koordinator mit, dass er der Gruppe beitreten möchte, und der Koordinator startet eine Neuverteilung der Partitionen in der Gruppe, um das neue Member aufzunehmen.

Die Nachrichten aus einer einzelnen Partition werden durch jeweils nur einen Consumer in jeder Gruppe verarbeitet. Dieser Mechanismus stellt sicher, dass die Nachrichten in jeder Partition in der richtigen Reihenfolge verarbeitet werden. Das folgende Diagramm zeigt ein Beispiel, in dem ein Topic drei Partitionen und eine Consumergruppe, die dieses Topic konsumiert, zwei Consumer enthält. Einem Konsumenten in der Gruppe werden zwei Partitionen und dem anderen Konsumenten eine Partition zugeordnet.

Diagramm für Konsumentengruppen.
Consumer group example

Wenn eine der folgenden Änderungen in einer Konsumentengruppe auftritt, wird die Gruppe neu verteilt, indem die Zuordnung von Partitionen zu den Gruppenmitgliedern verschoben wird, um die Änderung aufzunehmen:

  • Ein Konsument tritt der Gruppe bei.
  • Ein Konsument verlässt die Gruppe.
  • Ein Konsument wird vom Koordinator als nicht mehr live betrachtet.
  • Einem vorhandenen Topic werden neue Partitionen hinzugefügt.

Wenn Sie eine Verbrauchergruppe haben, die neu ausbalanciert wird, beachten Sie, dass jeder Verbraucher, der die Gruppe verlassen hat, seine Commits zurückweist, bis er der Gruppe wieder beitritt. In diesem Fall muss der Consumer der Gruppe erneut beitreten. Möglicherweise wird ihm dann eine andere Partition zugewiesen.

Consumer-Aktivität

Kafka erkennt fehlgeschlagene Consumer automatisch, sodass aktiven Consumern Partitionen erneut zugeordnet werden können. Dabei werden zwei Mechanismen verwendet: Polling und Austausch von Überwachungssignalen.

Wenn der Stapel von Nachrichten, die von Consumer.poll(...) zurückgegeben werden, groß ist oder die Verarbeitung zeitaufwendig ist, kann die Verzögerung bis zum poll() erneuten Aufruf erheblich oder unvorhersehbar sein. In einigen Fällen muss ein langes maximales Abfrageintervall konfiguriert werden, damit die Consumer nicht aus ihren Gruppen entfernt werden, wenn die Nachrichtenverarbeitung längere Zeit dauert. Wenn dieser Mechanismus der einzige verfügbare Mechanismus ist, ist die Zeit, die zum Erkennen eines fehlgeschlagenen Konsumenten benötigt wird, ebenfalls lang.

Damit die Aktivität der Consumer einfacher gehandhabt werden kann, wurde in Kafka 0.10.1 das Heartbeating im Hintergrund hinzugefügt. Der Gruppenkoordinator erwartet von den Gruppenmitgliedern, dass sie regelmäßig Heartbeats senden, um anzuzeigen, dass sie weiterhin aktiv sind. Ein Überwachungssignalhintergrundthread wird im Konsumenten ausgeführt und sendet regelmäßige Überwachungssignale an den Koordinator. Wenn der Koordinator kein Überwachungssignal von einem Gruppenmitglied in _Sitzungszeitlimit_empfängt, entfernt er das Mitglied aus der Gruppe und startet eine Neuverteilung der Gruppe. Das Sitzungs-Timeout kann viel kürzer sein als das maximale Polling-Intervall, so dass die Zeit, die benötigt wird, um einen ausgefallenen Verbraucher zu erkennen, kurz sein kann, auch wenn die Nachrichtenverarbeitung lange dauert.

Sie können das maximale Abfrageintervall mit der Eigenschaft max.poll.interval.ms und das Sitzungszeitlimit mit der Eigenschaft session.timeout.ms konfigurieren. Sie müssen diese Einstellungen nur verwenden, wenn die Verarbeitung eines Nachrichtenstapels länger als 5 Minuten dauert.

Offsets verwalten

Für jede Konsumentengruppe verwaltet Kafka den festgeschriebenen Offset für jede verarbeitete Partition. Wenn ein Consumer eine Nachricht verarbeitet, wird die Nachricht nicht von der Partition entfernt. Stattdessen wird nur der aktuelle Offset aktualisiert, indem ein Prozess verwendet wird, der als Festschreiben des Offsets bezeichnet wird.

Event Streams speichert Consumer-Offset-Informationen für einen Zeitraum von 7 Tagen.

Was ist, wenn kein festgeschriebener Offset vorhanden ist?

Wenn ein Konsument gestartet und einer Partition zugeordnet wird, die konsumiert werden soll, beginnt er mit dem festgeschriebenen Offset seiner Gruppe. Gibt es noch keinen verbindlichen Offset, kann der Verbraucher wählen, ob er mit der frühesten oder der spätesten verfügbaren Nachricht beginnen möchte, basierend auf der Einstellung der Eigenschaft auto.offset.reset wie folgt:

  • latest (Standardeinstellung): Ihr Konsument empfängt und konsumiert nur Nachrichten, die nach der Subskription eingehen. Ihr Konsument hat keine Kenntnis von Nachrichten, die vor der Subskription gesendet wurden, und erwartet daher nicht, dass alle Nachrichten von einem Topic konsumiert werden.
  • earliest: Ihr Konsument verarbeitet alle Nachrichten von Anfang an.

Wenn ein Konsument nach der Verarbeitung einer Nachricht, aber vor dem Festschreiben ihres Offsets fehlschlägt, spiegeln die festgeschriebenen Offsetinformationen nicht die Verarbeitung der Nachricht wider. Dies bedeutet, dass die Nachricht erneut vom nächsten Konsumenten in dieser Gruppe verarbeitet wird, dem die Partition zugeordnet werden soll.

Wenn per Commit festgeschriebene Offsets in Kafka gespeichert und die Consumer neu gestartet werden, fahren die Consumer dort fort, wo sie zuletzt gestoppt wurden. Wenn ein verbindlicher Versatz vorhanden ist, wird die Eigenschaft auto.offset.reset nicht verwendet.

Offsets automatisch per Commit festschreiben

Die einfachste Methode zum Festschreiben von Offsets besteht darin, dass der Kafka -Konsument dies automatisch vollzieht. Das ist einfach, aber es bietet weniger Kontrolle als eine manuelle Eingabe. Ein Consumer schreibt standardmäßig alle 5 Sekunden Offsets per Commit fest. Der Standard-Commit wird alle 5 Sekunden durchgeführt. Dies ist vom Fortschritt der Nachrichtenverarbeitung durch den Consumer unabhängig. Wenn der Verbraucher poll() aufruft, führt dies außerdem dazu, dass der letzte Versatz, der beim vorherigen Aufruf von poll() zurückgegeben wurde, übertragen wird (da davon ausgegangen wird, dass die vorherigen Nachrichten alle verarbeitet wurden).

Wenn der Commit-Offset die Verarbeitung der Nachrichten überholt und ein Consumer-Fehler vorliegt, ist es möglich, dass einige Nachrichten nicht verarbeitet werden. Der Grund hierfür ist, dass die Verarbeitung beim festgeschriebenen Offset neu gestartet wird, der einen späteren Zeitpunkt hat als die letzte Nachricht, die vor dem Ausfall verarbeitet wurde. Aus diesem Grund ist die Zuverlässigkeit wichtiger als eine einfache Handhabung. Daher wird empfohlen, Offsets manuell per Commit festzuschreiben.

Offsets manuell per Commit festschreiben

Wenn enable.auto.commit auf false festgelegt wird, schreibt der Consumer die Offsets manuell per Commit fest. Dies kann synchron oder asynchron sein. Ein übliches Muster ist das Festschreiben des Offsets der zuletzt verarbeiteten Nachricht auf Basis eines periodischen Zeitgebers. Das bedeutet, dass jede Nachricht mindestens einmal bearbeitet, aber der festgeschriebene Offset die Verarbeitung der Nachrichten nicht übernimmt. Die Häufigkeit des periodischen Zeitgebers steuert die Anzahl der Nachrichten, die nach einem Consumer-Ausfall erneut verarbeitet werden können. Die Nachrichten werden aus dem zuletzt gespeicherten per Commit festgeschriebenen Offset abgerufen, wenn die Anwendung neu gestartet oder die Gruppe neu verteilt wird.

Das per Commit festgeschriebene Offset ist das Offset der Nachrichten, von dem die Verarbeitung fortgesetzt wird. Dies ist in der Regel das Offset der zuletzt verarbeiteten Nachrichten plus eins.

Consumer-Verzögerung

Die Consumer-Verzögerung für eine Partition ist die Differenz zwischen dem Offset der zuletzt veröffentlichten Nachricht und dem festgeschriebenen Offset des Consumers. Mit anderen Worten, es ist die Differenz zwischen der Anzahl der Datensätze, die erzeugt wurden, und der Anzahl der verarbeiteten Datensätze. Schwankungen in den Erstellungs- und Verarbeitungsraten sind zwar üblich, aber über einen längeren Zeitraum sollte die Verarbeitungsrate nicht langsamer sein als die Erstellungsrate.

Wenn Sie feststellen, dass ein Konsument Nachrichten erfolgreich verarbeitet, aber gelegentlich über eine Nachrichtengruppe zu springen scheint, kann dies ein Zeichen dafür sein, dass der Konsument nicht in der Lage ist, Schritt zu halten. Bei Topics, die keine Protokollkomprimierung verwenden, wird die Menge des Protokollspeicherbereichs verwaltet, indem alte Protokollsegmente regelmäßig gelöscht werden. Wenn ein Verbraucher so weit zurückgefallen ist, dass er Nachrichten in einem gelöschten Protokollsegment konsumiert, springt er plötzlich zum Anfang des nächsten Protokollsegments vor. Wenn es wichtig ist, dass der Consumer alle Nachrichten verarbeitet, führt dieses Verhalten zu einem Nachrichtenverlust aus Sicht des Consumers.

Sie können das Tool kafka-consumer-groups verwenden, um die Konsumenten-Verzögerung anzuzeigen. Zu diesem Zweck können Sie auch die Consumer-API und die Consumer-Metriken nutzen.

Die Geschwindigkeit der Nachrichtenverarbeitung steuern

Wenn Sie Probleme mit der Nachrichtenverarbeitung haben, die durch Nachrichtenüberflutung verursacht werden, können Sie eine Verbraucheroption einstellen, um die Geschwindigkeit des Nachrichtenverbrauchs zu steuern. Verwenden Sie fetch.max.bytes und max.poll.records, um zu steuern, wie viele Daten ein Aufruf von poll() zurückgeben kann.

Consumer-Neuverteilung handhaben

Wenn Verbraucher zu einer Gruppe hinzugefügt oder aus ihr entfernt werden, findet eine Neuverteilung der Gruppe statt, und die Verbraucher können keine Nachrichten konsumieren. Dies führt dazu, dass alle Verbraucher einer Verbrauchergruppe für einen kurzen Zeitraum nicht erreichbar sind.

Wenn Sie mit dem "on partitions revoked"-Callback benachrichtigt werden, verwenden Sie einen ConsumerRebalanceListener, um Offsets manuell zu bestätigen (wenn Sie nicht auto-commit verwenden) und die weitere Verarbeitung anzuhalten, bis Sie mit dem "on partition assigned"-Callback über den erfolgreichen Rebalance informiert werden.

Code-Snippets

Diese Code-Snippets sind auf einer hohen Ebene, um die beteiligten Konzepte zu veranschaulichen. Vollständige Beispiele finden Sie in den Event Streams -Beispielen in GitHub-.

Um einen Konsumenten mit Event Streamszu verbinden, müssen Sie Serviceberechtigungsnachweise erstellen. Informationen zum Abrufen dieser Berechtigungsnachweise finden Sie unter Verbindung zu Event Streams.

Im Konsumentencode müssen Sie zunächst die Gruppe der Konfigurationseigenschaften erstellen. Alle Verbindungen zu Event Streams sind durch TLS und Benutzer-Passwort-Authentifizierung gesichert, so dass Sie mindestens diese Eigenschaften benötigen. Ersetzen Sie BOOTSTRAP_ENDPOINTS, USER und PASSWORD durch die Werte Ihrer eigenen Dienstanmeldedaten:

Properties props = new Properties();
 props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
 props.put("security.protocol", "SASL_SSL");
 props.put("sasl.mechanism", "PLAIN");
 props.put("ssl.protocol", "TLSv1.2");
 props.put("ssl.enabled.protocols", "TLSv1.2");
 props.put("ssl.endpoint.identification.algorithm", "HTTPS");

Um Nachrichten zu konsumieren, müssen Sie auch Deserialisierer für die Schlüssel und Werte angeben, wie im folgenden Beispiel.

 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Diese Deserializer müssen mit den von den Produzenten verwendeten Serializern übereinstimmen.

Verwenden Sie dann einen KafkaConsumer, um Nachrichten zu verarbeiten, wobei jede Nachricht durch einen Consumer-Datensatz (ConsumerRecord) dargestellt wird. Die gängigste Art, Nachrichten zu verarbeiten, ist es, den Consumer in eine Consumergruppe zu platzieren, indem Sie die Gruppen-ID festlegen und dann subscribe() für eine Liste von Topics aufrufen. Dem Konsumenten werden einige zu konsumierende Partitionen zugeordnet. Wenn jedoch mehr Konsumenten in der Gruppe als Partitionen im Topic vorhanden sind, werden dem Konsumenten möglicherweise keine Partitionen zugeordnet. Rufen Sie dann poll() in einer Schleife auf und empfangen Sie einen Stapel zu verarbeitender Nachrichten, wobei jede Nachricht durch einen Consumer-Datensatz (ConsumerRecord) dargestellt wird.

props.put("group.id", "G1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));  // T1 is the topic name
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
   for (ConsumerRecord<String, String> record : records)
     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Diese Consumer-Schleife läuft ewig, kann aber von einem anderen Thread aus durch den Aufruf von Consumer.wakeup() unterbrochen werden, um ein sauberes Herunterfahren zu erreichen.

Um Offsets per Commit manuell festzuschreiben, ist es zunächst notwendig, die Konfiguration enable.auto.commit auf false festzulegen. Verwenden Sie dann entweder Consumer.commmitSync() oder Consumer.commitAsync(), um den festgeschriebenen Offset laufend zu aktualisieren. Aus Gründen der Einfachheit verarbeitet dieses Beispiel die Datensätze für jede Partition und schreibt separat den letzten Offset fest.

props.put("group.id", "G1");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> partRecords = records.records(tp);
      long lastOffset = 0;
      for (ConsumerRecord<String, String> record : partRecords) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        lastOffset = record.offset();
      }
      // having processed all the records in the above loop, we commit the partition's offset to 1 more than the last offset
      consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
    }
  }
}
finally {
  consumer.close();
}

Ausnahmebehandlung

Jede stabile Anwendung, die den Kafka-Client verwendet, muss Ausnahmen für bestimmte erwartete Situationen behandeln können. In einigen Fällen werden die Ausnahmen nicht direkt ausgelöst, weil einige Methoden asynchron sind und ihre Ergebnisse mithilfe eines Future oder eines Callbacks liefern. Sehen Sie sich Beispielcode in GitHub an, der vollständige Beispiele zeigt.

Bearbeiten Sie die folgende Liste von Ausnahmebedingungen in Ihrem Code:

org.apache.kafka.common.errors.WakeupException

Wird von Consumer.poll(...) ausgelöst, wenn Consumer.wakeup() aufgerufen wird. Dies ist die Standardmethode, um die Abfrageschleife des Verbrauchers zu unterbrechen. Die Abfrageschleife wird beendet und Consumer.close() wird aufgerufen, um die Verbindung ordnungsgemäß zu trennen.

org.apache.kafka.common.errors.NotLeaderForPartitionException

Ausgelöst durch Producer.send(...) , wenn sich die Führung für eine Partition ändert. Der Client aktualisiert seine Metadaten automatisch, um die aktuellen Leaderinformationen zu suchen. Wiederholen Sie die Operation, die mit den aktualisierten Metadaten erfolgreich ist.

org.apache.kafka.common.errors.CommitFailedException

Wird als Ergebnis von Consumer.commitSync(...) ausgelöst, wenn ein nicht behebbarer Fehler auftritt. In manchen Fällen ist es nicht möglich, den Vorgang zu wiederholen, weil sich die Partitionszuordnung geändert hat und der Verbraucher seine Offsets nicht mehr festlegen kann. Da Consumer.commitSync(...) bei Verwendung mit mehreren Partitionen in einem einzigen Aufruf teilweise erfolgreich sein kann, kann die Fehlerbehebung vereinfacht werden, indem für jede Partition ein separater Consumer.commitSync(...) -Aufruf verwendet wird.

org.apache.kafka.common.errors.TimeoutException

Wird von Producer.send(...), Consumer.listTopics() ausgelöst, wenn die Metadaten nicht abgerufen werden können Die Ausnahmebedingung wird auch im gesendeten Callback (oder im zurückgegebenen Future) angezeigt, wenn die angeforderte Bestätigung nicht innerhalb von request.timeout.ms zurückgegeben wird. Der Client kann die Operation wiederholen, aber die Auswirkung der wiederholten Operation hängt von der spezifischen Operation ab. Wird zum Beispiel das Senden einer Nachricht wiederholt, wird die Nachricht möglicherweise dupliziert.