Utilización de la API de Kafka

Kafka proporciona un conjunto enriquecido de API y clientes en una gran variedad de lenguajes. Las API incluyen la API principal, la API de Streams y la API de Connect.

  • API central de Kafka (API de consumidor, productor y de administrador) Se utiliza para enviar y recibir mensajes directamente desde uno de los temas de Kafka, o varios temas. El cliente de administración de Kafka proporciona una interfaz sencilla a través de la API de Kafka para gestionar los recursos de Kafka. Puede crear, suprimir y gestionar temas. También puede utilizar el cliente de administración para gestionar grupos de clientes y configuraciones.
  • API de flujos Una API de procesamiento de flujos de alto nivel para consumir, transformar y producir fácilmente eventos entre temas.
  • API de conexión Un marco que permite integraciones reutilizables o estándar para transmitir eventos hacia y desde sistemas externos, como bases de datos.

En la tabla siguiente se resume lo que puede utilizar con Event Streams:

Kafka asistencia al cliente en los planes Standard, Enterprise y Lite.
Plan de empresa Plan Estándar Plan Lite
Versión de Kafka en el clúster Kafka 3.8 Kafka 3.8 Kafka 3.8
Versión de cliente de Kafka mínima recomendada Kafka 2.6.0 o más tarde Kafka 2.6.0 o más tarde Kafka 2.6.0 o más tarde
Versiones de cliente admitidas Consulte Resumen de soporte para todos los clientes recomendados
Soporte a Kafka Connect No
Soporte a Kafka Streams No
Soporte a ksqlDB No No
Requisitos de autenticación El cliente debe soportar la autenticación mediante el mecanismo SASL Plain y utilizar la extensión Server Name Indication (SNI) del protocolo TLSv1.2. El cliente debe soportar la autenticación mediante el mecanismo SASL Plain y utilizar la extensión Server Name Indication (SNI) del protocolo TLSv1.2. El cliente debe soportar la autenticación mediante el mecanismo SASL Plain y utilizar la extensión Server Name Indication (SNI) del protocolo TLSv1.2.

Elección de un cliente Kafka para utilizarlo con Event Streams

El cliente oficial de la API de Kafka está escrito en Java y, como tal, contiene las funciones y correcciones de errores más recientes. Para más información sobre esta API, consulte Kafka Producer API 3.8 y Kafka Consumer API 3.8.

Para otros idiomas, ejecute uno de los clientes siguientes, todos los cuales se prueban con Event Streams.

Soporte al resumen de todos los clientes recomendados

Resumen de asistencia al cliente
Cliente Idioma Versión recomendada Versión mínima soportada [1] Enlace al ejemplo
Cliente Apache Kafka oficial:
Cliente Apache Kafka Java 3.8.1 o más tarde 2.5.0 Ejemplo de consola Java

Ejemplo de Liberty

Clientes de terceros:
confluent-kafka-javascript Node.js Más reciente 1.0.0
confluent-kafka-python Python Más reciente 1.4.0 Ejemplo de Kafka Python
confluent-kafka-go Go Más reciente 1.4.0
librdkafka C o C++ Más reciente 1.4.0
node-rdkafka Node.js Más reciente 2.8.0 Ejemplo deNode.js
sarama Go Más reciente 1.40.0 Ejemplos de Sarama

Conexión de su cliente a Event Streams

Para obtener información sobre cómo configurar el cliente Java para que se conecte a Event Streams, consulte Configuración de su cliente.

Configuración del cliente de API Kafka

Para establecer una conexión, los clientes deben estar configurados para utilizar SASL PLAIN o SASL OAUTHBEARER sobre TLSv1.2 como mínimo y para requerir un nombre de usuario, y una lista de los servidores de arranque. TLSv1.2 garantiza el cifrado de las conexiones y valida la autenticidad de los intermediarios (para evitar los ataques "man-in-the-middle"). SASL aplica la autenticación en todas las conexiones.

Para recuperar el nombre de usuario, la contraseña y la lista de servidores de arranque, se necesita un objeto de credenciales de servicio o una clave de servicio para la instancia de servicio. Para obtener más información sobre la creación de estos objetos, consulte Conexión a Event Streams.

Utilización de SASL PLAIN

Utilice las siguientes series y propiedades.

  • Utilice la serie bootstrap_endpoints como lista de servidores de rutina de carga y pase esta serie de pares de host y puerto al cliente Kafka.
  • Utilice las propiedades user y api_key como nombre de usuario y contraseña.

Para un cliente Java, el siguiente ejemplo muestra el conjunto mínimo de propiedades, donde ${USERNAME}, ${PASSWORD}, y ${BOOTSTRAP_ENDPOINTS} deben sustituirse por los valores que recuperó anteriormente.

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

Si utiliza un cliente Kafka anterior a la versión 0.10.2.1, no se admite la propiedad sasl.jaas.config y, en su lugar, debe proporcionar la configuración del cliente en un archivo de configuración JAAS.

Uso de SASL OAUTHBEARER con clientes de Java v3.4- 4.0

Antes de configurar el mecanismo SASL para el cliente Java, existen dos requisitos previos:

  • La versión de cliente mínima compatible con Kafka Java es 3.4 ( 3.6 o superior es preferible).
  • Es necesario descargar un paquete jar adicional de Maven Central y ponerlo a disposición en el classpath.

Si se utiliza Maven en el sistema de compilación, añada la siguiente información al archivo pom.xml en la sección de dependencias:

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>1.4.0</version>
</dependency>

Si se utiliza Gradle en el sistema de compilación, añada la siguiente información al archivo build.gradle en la sección de dependencias:

implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0

IBM Cloud® Identity and Access Management El Servicio de Identidad soporta múltiples formas de generar un token de portador, dos de las cuales son soportadas por esta librería cliente oauth:

  • Clave de API
  • Perfil de confianza y token de recurso informático

Uso de SASL OAUTHBEARER con clave API

Utilice las siguientes series y propiedades.

  • Utilice la serie BOOTSTRAP_ENDPOINTS como lista de servidores de rutina de carga y pase esta serie de pares de host y puerto al cliente Kafka.
  • El IAMOAuthBearerLoginCallbackHandler lo proporciona el paquete jar com.ibm.cloud.eventstreams:oauth-client:+.
  • El IBM Cloud® Identity and Access Management 's token endpoint https://iam.cloud.ibm.com/identity/token está configurado para generar un token de la clave de la API mediante el uso de tipo de concesión especificado en jaas config. Se realiza en el lado del cliente, por lo que la clave API nunca se envía al lado del servidor y esto proporciona una mayor seguridad que una clave API de larga duración.
  • El punto final de clave de Cloud Identity and Access Management https://iam.cloud.ibm.com/identity/keys está configurado para validar la señal.
  • grant_type en sasl.jaas.config es urn:ibm:params:oauth:grant-type:apikey
  • apikey en sasl.jaas.config es la clave API utilizada para generar el token de portador en el lado del cliente. Puede ser de un ID de usuario o de servicio.

Para un cliente Java, el siguiente ejemplo muestra el conjunto mínimo de propiedades, donde usted reemplaza ${BOOTSTRAP_ENDPOINTS}, y ${APIKEY} con los valores que recuperó anteriormente.

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.cloud.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";

Uso de SASL OAUTHBEARER con perfil de confianza y token de recurso informático

Todas las propiedades son las mismas que las descritas para la clave API, salvo que sasl.jaas.config es diferente.

  • grant_type en sasl.jaas.config es urn:ibm:params:oauth:grant-type:cr-token.
  • profile_id en sasl.jaas.config es una ubicación de archivo que almacena el ID del perfil de confianza. Este archivo puede montarse en un pod Kubernetes que ejecute código de cliente Kafka como volumen de sólo lectura y ponerse a disposición del código de cliente Kafka.
  • cr_token en sasl.jaas.config es una ubicación de archivo que almacena el token de cuenta de servicio de un pod Kubernetes que ejecuta código de cliente Kafka. Para más información, consulte Qué es un token de cuenta de servicio.

Por ejemplo:

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";

Encontrará más información sobre Cómo crear un perfil de confianza.

El código fuente del cliente oauth hace referencia al SDK de Event Streams Java.

El código cliente de muestra hace referencia a la muestra Event Streams.

Uso de SASL OAUTHBEARER con clientes Java v4.1 y posteriores

Cuando se utiliza un cliente Kafka Java en v4.1 o superior, el cliente necesita utilizar una versión más reciente del cliente Event Streams oauth, que se basa en Kafka 's default callback handler and an appropriate Token Retriever.

Si se utiliza Maven en el sistema de compilación, añada la siguiente información al archivo pom.xml en la sección de dependencias:

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>2.0.0</version>
</dependency>

Si se utiliza Gradle en el sistema de compilación, añada la siguiente información al archivo build.gradle en la sección de dependencias:

implementation com.ibm.cloud.eventstreams:oauth-client:2.0.+

El Servicio de Identidad IBM Cloud® Identity and Access Management soporta múltiples formas de generar un token de portador, dos de las cuales son soportadas por esta librería cliente oauth:

  • Clave de API
  • Perfil de confianza y token de recurso informático

Uso de SASL OAUTHBEARER con clave API

Utilice las siguientes cadenas y propiedades además de la propiedad obligatoria bootstrap.servers y cualquier configuración específica de productor, consumidor y administrador.

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    grant_type="urn:ibm:params:oauth:grant-type:apikey" \
    apikey="${YOUR_IBM_CLOUD_API_KEY}";
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.jwt.retriever.class=com.ibm.cloud.eventstreams.oauth.client.IAMTokenRetriever
sasl.oauthbearer.token.endpoint.url=https://private.iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://private.iam.cloud.ibm.com/identity/keys

Uso de SASL OAUTHBEARER con un perfil de confianza y un token de recurso informático en entornos de contenedores

Para obtener más información, consulte Generación de un token IAM para un recurso informático.

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    grant_type="urn:ibm:params:oauth:grant-type:cr-token" \
    cr_token="/path/to/cr-token-file" \
    profile_id="/path/to/profile-id-file";
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.jwt.retriever.class=com.ibm.cloud.eventstreams.oauth.client.IAMTokenRetriever
sasl.oauthbearer.token.endpoint.url=https://private.iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://private.iam.cloud.ibm.com/identity/keys

Propiedad del sistema org.apache.kafka.sasl.oauthbearer.allowed.urls

Desde Kafka 4.0, el cliente requiere una propiedad del sistema para establecer las URLs permitidas del token SASL OAUTHBEARER y los endpoints jwks.

Para más información, consulte las propiedades del sistema.

Cuando se utilizan los scripts de shell de cliente CLI proporcionados por la distribución Apache Kafka, la propiedad del sistema también puede establecerse utilizando la variable de entorno KAFKA_OPTS.

export KAFKA_OPTS="-Dorg.apache.kafka.sasl.oauthbearer.allowed.urls=https://private.iam.cloud.ibm.com/identity/keys,https://private.iam.cloud.ibm.com/identity/token,https://api.metadata.cloud.ibm.com/identity/v1/iam_tokens"

Utilización de SASL OAUTHBEARER con clientes que no sean Java

Para otras libarias de cliente Kafka, consulte su documentación sobre cómo implementar el soporte de OAUTHBEARER. Por ejemplo:

  • sarama: es necesaria una implementación de la interfaz AccessTokenProvider.
  • librdkafka: es necesaria una implementación de la devolución de llamada oauthbearer_token_refresh_cb.

Para obtener información sobre cómo generar una señal de IAM de IBM Cloud utilizando una clave de API, consulte el documento de IBM Cloud® Identity and Access Management.


  1. La primera versión validada en pruebas continuas. Normalmente, se trata de la versión inicial disponible en los últimos 12 meses, o más reciente si se sabe que existen problemas importantes. Si no puede ejecutar ninguno de los clientes de la lista, puede utilizar otros clientes de terceros que cumplan los siguientes requisitos mínimos (por ejemplo, librdkafka ). 1. Da soporte a Kafka 1.40o posterior. 2. Puede conectarse y autenticarse utilizando SASL PLAIN con TLSv1.2. 3. Soporta las extensiones SNI para TLS donde el nombre de host del servidor se incluye en el handshake TLS. 4. Da soporte a la criptografía de curva elíptica. En todos los casos, utilice la versión más reciente del cliente. ↩︎