IBM Cloud Docs
Consumindo mensagens

Consumindo mensagens

Um consumidor é um aplicativo que consome fluxos de mensagens dos tópicos do Kafka. Um consumidor pode assinar um ou mais tópicos ou partições. Essas informações concentram-se na interface de programação Java™ que faz parte do projeto Apache Kafka. Os conceitos se aplicam a outros idiomas também, mas os nomes são, às vezes, um pouco diferentes.

Quando um consumidor se conecta ao Kafka, ele faz uma conexão de autoinicialização inicial. Essa conexão pode ser qualquer um dos servidores no cluster. O consumidor solicita as informações de partição e de liderança sobre o tópico do qual deseja consumir. Em seguida, o consumidor estabelece outra conexão com o líder da partição e pode consumir mensagens. Essas ações acontecem automaticamente internamente quando o consumidor se conecta ao cluster do Kafka.

Um consumidor é normalmente um aplicativo de longa execução. Um consumidor solicita mensagens de Kafka chamando Consumer.poll(...) regularmente. O consumidor chama poll(), recebe um lote de mensagens, processa-os imediatamente e depois chama poll() novamente.

Quando um consumidor processa uma mensagem, a mensagem não é removida de seu tópico. Em vez disso, os consumidores podem escolher dentre várias formas de notificar o Kafka quais mensagens foram processadas. Esse processo é conhecido como confirmação do deslocamento.

Nas interfaces de programação, uma mensagem é chamada de registro. Por exemplo, a classe Java org.apache.kafka.clients.consumer.ConsumerRecord é usada para representar uma mensagem para a API do consumidor. Os termos registro e mensagem podem ser usados de forma intercambiável, mas essencialmente um registro é usado para representar uma mensagem.

Você pode achar útil ler essas informações juntamente com as mensagens de produção em Event Streams.

Configurando propriedades do consumidor

Existem muitas definições de configuração para o consumidor que controlam aspectos de seu comportamento. As configurações a seguir são algumas das mais importantes.

Configurando propriedades do consumidor
Nome Descrição Valores válidos Padrão
key.deserializer A classe usada para desserializar chaves. Classe Java que implementa a interface Desserializadora, como org.apache.kafka.common.serialization.StringDeserializer. Nenhum padrão - deve-se especificar um valor
value.deserializer A classe usada para desserializar valores. Classe Java que implementa a interface Desserializadora, como org.apache.kafka.common.serialization.StringDeserializer. Sem padrão - você deve especificar um valor.
group.id Um identificador para o grupo de consumidores ao qual o consumidor pertence. Sequência Nenhum padrão
auto.offset.reset O comportamento quando o consumidor não possui um deslocamento inicial ou quando o deslocamento atual não está mais disponível no cluster. Mais recente, mais antigo, nenhum Mais recente
enable.auto.commit Determina se o deslocamento do consumidor deve ser confirmado automaticamente em segundo plano. verdadeiro, falso Sim
auto.commit.interval.ms O número de milissegundos entre as confirmações periódicas de deslocamentos. 0,... 5000 (5 segundos)
max.poll.records O número máximo de registros retornados em uma chamada para poll(). 1,... 500
session.timeout.ms O número de milissegundos em que uma pulsação do consumidor deve ser recebida para manter a associação de um consumidor de um grupo de consumidores. 6000-300000 10000 (10 segundos)
max.poll.interval.ms O intervalo de tempo máximo entre as pesquisas antes que o consumidor saia do grupo. 1,... 300000 (5 minutos)

Muitas outras definições de configuração estão disponíveis, mas leia a Apache Kafka antes de começar a experimentar com elas

Grupos de consumidores

Um grupo de consumidores é aquele que coopera para consumir mensagens de um ou mais tópicos. Todos os consumidores em um grupo usam o mesmo valor para a configuração de group.id. Caso você precise de mais de um consumidor para lidar com sua carga, é possível executar múltiplos consumidores no mesmo grupo de consumidores. Mesmo que você precise apenas de um consumidor, é comum também especificar um valor para group.id.

Cada grupo de consumidores possui um servidor no cluster que é chamado de coordenador responsável por atribuir partições aos consumidores no grupo. Essa responsabilidade é propagada entre os servidores no cluster para balancear a carga. A designação de partições para os consumidores pode mudar a cada rebalanceamento de grupo.

Quando um consumidor se associa a um grupo de consumidores, ele descobre o coordenador do grupo. Em seguida, o consumidor informar ao coordenador que deseja se associar ao grupo e o coordenador inicia um rebalanceamento das partições em todo o grupo para incluir o novo membro.

As mensagens de uma única partição são processadas por somente um consumidor em cada grupo. Esse mecanismo assegura que as mensagens em cada partição sejam processadas em ordem. Consulte o diagrama a seguir para obter um exemplo em que um tópico contém três partições e um grupo de consumidores, que está consumindo esse tópico, contém dois consumidores Um consumidor no grupo recebe duas partições e o outro consumidor recebe uma partição.

Diagrama de grupos de consumidores.
Consumer group example

Quando uma das mudanças a seguir ocorre em um grupo de consumidores, o grupo rebalanceia deslocando a atribuição de partições aos membros do grupo para acomodar a mudança:

  • Um consumidor se associa ao grupo.
  • Um consumidor sai do grupo.
  • Um consumidor não mais é considerado em tempo real pelo coordenador.
  • Novas partições são incluídas em um tópico existente.

Se você tiver um grupo de consumidores que é reequilibrado, saiba que qualquer consumidor que tenha saído do grupo terá seus commits rejeitados até que ele volte ao grupo. Nesse caso, o consumidor precisa unir-se novamente ao grupo, em que ele pode ser designado a uma partição diferente daquela que ele estava consumindo anteriormente.

Atividade do consumidor

O Kafka detecta automaticamente os consumidores com falha para que ele possa designar novamente partições para consumidores ativos. Ele usa dois mecanismos: pesquisa e pulsação.

Se o lote de mensagens retornadas por Consumer.poll(...) for grande ou o processamento for demorado, o atraso antes de poll() ser chamado novamente pode ser significativo ou imprevisível. Em alguns casos, é necessário configurar um intervalo de pesquisa longo máximo para que os consumidores não sejam removidos de seus grupos só porque o processamento de mensagens está demorando um pouco. Se esse mecanismo for o único disponível, o tempo que leva para detectar um consumidor fracassado também é longo.

Para tornar a atividade do consumidor mais fácil de lidar, uma pulsação em segundo plano foi incluída no Kafka 0.10.1. O coordenador do grupo espera que os membros do grupo enviem pulsações regulares para indicar que eles permanecem ativos. Um encadeamento de pulsação de plano de fundo é executado no consumidor e envia pulsações regulares para o coordenador. Se o coordenador não receber uma pulsação de um membro do grupo dentro do tempo limite de sessão, o coordenador removerá o membro do grupo e iniciará um rebalanceamento do grupo. O tempo limite da sessão pode ser muito menor do que o intervalo máximo de sondagem, de modo que o tempo necessário para detectar um consumidor com falha pode ser curto, mesmo que o processamento da mensagem demore muito.

É possível configurar o intervalo máximo de pesquisa usando a propriedade max.poll.interval.ms e o tempo limite de sessão usando a propriedade session.timeout.ms. Não é necessário usar essas configurações a menos que demore mais de 5 minutos para processar um lote de mensagens.

Gerenciando deslocamentos

Para cada grupo de consumidores, o Kafka mantém o deslocamento comprometido para cada partição que é consumida. Quando um consumidor processa uma mensagem, ele não a remove da partição. Em vez disso, ele apenas atualiza seu deslocamento atual usando um processo que é chamado confirmando o deslocamento.

O Event Streams retém as informações de deslocamento confirmadas por 7 dias.

E se nenhum deslocamento confirmado existente existir?

Quando um consumidor inicia e é designada uma partição para consumir, ele começa no deslocamento confirmado de seu grupo. Se não houver nenhuma compensação confirmada, o consumidor poderá escolher se quer começar com a mensagem disponível mais antiga ou mais recente com base na na configuração da propriedade auto.offset.reset, como segue:

  • latest (o padrão): seu consumidor recebe e consome apenas mensagens que chegam depois que você se inscreve. Seu consumidor não tem conhecimento de mensagens que foram enviadas antes de se inscrever, portanto, não espere que todas as mensagens sejam consumidas de um tópico.
  • earliest: Seu consumidor consome todas as mensagens desde o início.

Se um consumidor falhar após o processamento de uma mensagem mas antes de confirmar o seu deslocamento, as informações de deslocamento confirmadas não refletirão o processamento da mensagem. Isso significa que a mensagem é processada novamente pelo próximo consumidor no grupo a ser designada a partição.

Quando os deslocamentos confirmados são salvos no Kafka e os consumidores são reiniciados, os consumidores continuam do ponto em que eles pararam na última vez. Quando existe um deslocamento confirmado, a propriedade auto.offset.reset não é usada.

Confirmando deslocamentos automaticamente

A maneira mais fácil de confirmar os deslocamentos é solicitar ao consumidor do Kafka que faça isso automaticamente. É simples, mas oferece menos controle do que a confirmação manual. Por padrão, um consumidor confirma automaticamente os deslocamentos a cada 5 segundos. Essa confirmação padrão acontece a cada cinco segundos, independentemente do progresso que o consumidor está fazendo no processamento das mensagens. Além disso, quando o consumidor chama poll(), isso também faz com que o último deslocamento retornado da chamada anterior para poll() seja confirmado (porque se presume que as mensagens anteriores foram todas processadas).

Se a compensação confirmada ultrapassar o processamento das mensagens e houver uma falha do consumidor, é possível que algumas mensagens não sejam processadas. Isso ocorre porque o processamento é reiniciado no deslocamento confirmado, que é posterior à última mensagem a ser processada antes da falha. Por essa razão, se a confiabilidade é mais importante do que simplicidade, geralmente é melhor confirmar os deslocamentos manualmente.

Confirmando os deslocamentos manualmente

Se enable.auto.commit está configurado como false, o consumidor confirma seus deslocamentos manualmente. Ele pode fazer isso de forma síncrona ou assíncrona. Um padrão comum é confirmar o deslocamento da mensagem mais recente processada com base em um cronômetro periódico. Esse padrão significa que cada mensagem é processada pelo menos uma vez, mas o deslocamento confirmado nunca ultrapassa o progresso das mensagens que estão sendo ativamente processadas. A frequência do cronômetro periódico controla o número de mensagens que podem ser reprocessadas após uma falha do consumidor. As mensagens são recuperadas novamente no último deslocamento confirmado salvo quando o aplicativo reinicia ou quando o grupo é rebalanceado.

O deslocamento confirmado é o deslocamento das mensagens nas quais o processamento é continuado. Isso é geralmente o deslocamento da mensagem processada mais recentemente mais uma.

Atraso do consumidor

O atraso do consumidor para uma partição é a diferença entre o deslocamento da mensagem publicada mais recentemente e o deslocamento confirmado do consumidor. Ou seja, é a diferença entre o número de registros que foram produzidos e o número que foram consumidos. Embora seja comum ter variações naturais nas taxas de produção e de consumo, a taxa de consumo não deve ser menor que a taxa de produção durante um período estendido.

Se você observar que um consumidor está processando mensagens com sucesso mas, ocasionalmente, aparece ignorando um grupo de mensagens, pode ser um sinal de que o consumidor não é capaz de acompanhar. Para tópicos que não usam a compactação de log, a quantia de espaço de log é gerenciada excluindo periodicamente segmentos de log antigos. Se um consumidor ficar tão para trás que estiver consumindo mensagens em um segmento de registro que foi excluído, ele subitamente avançará para o início do próximo segmento de registro. Se for importante que o consumidor processe todas as mensagens, esse comportamento indicará perda de mensagem do ponto de vista deste consumidor.

Você pode usar a ferramenta kafka-consumer-groups para ver o atraso do consumidor. Também é possível utilizar a API e as métricas do consumidor para o mesmo propósito.

Controlando a velocidade de consumo de mensagem

Se você tiver problemas com o tratamento de mensagens causado por inundação de mensagens, poderá definir uma opção de consumidor para controlar a velocidade do consumo de mensagens. Use fetch.max.bytes e max.poll.records para controlar quantos dados uma chamada para poll() pode retornar.

Manipulando o rebalanceamento do consumidor

Quando os consumidores são adicionados ou removidos de um grupo, ocorre um reequilíbrio do grupo, e os consumidores não podem consumir mensagens. Isso faz com que todos os consumidores em um grupo de consumidores fiquem indisponíveis por um curto período.

Se você for notificado com o retorno de chamada "on partitions revoked", use um ConsumerRebalanceListener para confirmar manualmente os deslocamentos (se não estiver usando o auto-commit) e para pausar o processamento adicional até que seja notificado do rebalanceamento bem-sucedido usando o retorno de chamada "on partition assigned".

Fragmentos de código

Esses fragmentos de código estão em um alto nível para ilustrar os conceitos envolvidos. Para obter exemplos completos, consulte as amostras do Event Streams em GitHub.

Para conectar um consumidor ao Event Streams, é necessário criar credenciais de serviço. Para obter informações sobre como obter essas credenciais, consulte Conectando ao Event Streams.

No código do consumidor, primeiro é necessário construir o conjunto de propriedades de configuração Todas as conexões com Event Streams são protegidas pelo uso de TLS e autenticação de senha de usuário, portanto, você precisa de pelo menos essas propriedades. Substitua BOOTSTRAP_ENDPOINTS, USER e PASSWORD por aqueles de suas próprias credenciais de serviço:

Properties props = new Properties();
 props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
 props.put("security.protocol", "SASL_SSL");
 props.put("sasl.mechanism", "PLAIN");
 props.put("ssl.protocol", "TLSv1.2");
 props.put("ssl.enabled.protocols", "TLSv1.2");
 props.put("ssl.endpoint.identification.algorithm", "HTTPS");

Para consumir mensagens, você também precisa especificar desserializadores para as chaves e os valores, como no exemplo a seguir.

 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Esses desserializadores devem corresponder aos serializadores usados pelos produtores..

Em seguida, use um KafkaConsumer para consumir mensagens, em que cada mensagem é representada por um ConsumerRecord. A maneira mais comum de consumir mensagens é colocar o consumidor em um grupo de consumidores configurando o ID do grupo e, em seguida, chamando subscribe() para obter uma lista de tópicos. São designadas ao consumidor algumas partições para consumir, no entanto, se houver mais consumidores no grupo do que partições no tópico, é possível que nenhuma partição seja designada ao consumidor. Em seguida, chame poll() em um loop, recebendo um lote de mensagens para processar, em que cada mensagem é representada por um ConsumerRecord.

props.put("group.id", "G1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));  // T1 is the topic name
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
   for (ConsumerRecord<String, String> record : records)
     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Esse loop de consumo é executado para sempre, mas pode ser interrompido por outro thread chamando Consumer.wakeup() para obter um desligamento organizado.

Para confirmar deslocamentos manualmente, primeiramente é necessário definir a configuração do enable.auto.commit como false. Em seguida, use Consumer.commmitSync() ou Consumer.commitAsync() para atualizar o deslocamento confirmado do consumidor periodicamente. Para simplificar, este exemplo processa os registros para cada partição e confirma o último deslocamento separadamente.

props.put("group.id", "G1");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> partRecords = records.records(tp);
      long lastOffset = 0;
      for (ConsumerRecord<String, String> record : partRecords) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        lastOffset = record.offset();
      }
      // having processed all the records in the above loop, we commit the partition's offset to 1 more than the last offset
      consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
    }
  }
}
finally {
  consumer.close();
}

Manipulação de exceção

Qualquer aplicativo robusto que usa o cliente Kafka precisa manipular exceções para determinadas situações esperadas. Em alguns casos, as exceções não são lançadas diretamente, porque alguns métodos são assíncronos e entregam seus resultados usando um Future ou um retorno de chamada. Verifique o código de exemplo no GitHub que mostra exemplos completos.

Manipule a seguinte lista de exceções em seu código:

org.apache.kafka.common.errors.WakeupException

Lançado por Consumer.poll(...) como um resultado de Consumer.wakeup() sendo chamado. Essa é a maneira padrão de interromper o loop de sondagem do consumidor. As saídas de loop de pesquisa e Consumer.close() é chamado para desconectar-se de forma limpa.

org.apache.kafka.common.errors.NotLeaderForPartitionException

Lançado como resultado de Producer.send(...) quando a liderança de uma partição muda. O cliente atualiza automaticamente seus metadados para localizar informações líderes atualizadas. Repita a operação que faz sucesso com os metadados atualizados.

org.apache.kafka.common.errors.CommitFailedException

Lançada como resultado de Consumer.commitSync(...) quando ocorre um erro irrecuperável. Em alguns casos, não é possível repetir a operação porque a atribuição da partição foi alterada e o consumidor não pode mais confirmar seus offsets. Como a Consumer.commitSync(...) pode ser parcialmente bem-sucedida quando usada com múltiplas partições em uma única chamada, a recuperação de erro pode ser simplificada usando uma chamada Consumer.commitSync(...) separada para cada partição.

org.apache.kafka.common.errors.TimeoutException

Lançada por Producer.send(...), Consumer.listTopics(), se os metadados não podem ser recuperados. A exceção também é vista no retorno de chamada de envio (ou no futuro retornado) quando o reconhecimento solicitado não volta dentro de request.timeout.ms. O cliente pode tentar novamente a operação, mas o efeito de uma operação repetida depende da operação específica. Por exemplo, se o envio de uma mensagem for repetido, a mensagem possivelmente será duplicada.