IBM Cloud Docs
Using the Kafka API

Using the Kafka API

Kafka provides a rich set of APIs and clients across a broad range of languages. APIs include core API, Streams API, and Connect API.

  • Kafka's core API (Consumer, Producer, and Admin API)
    Use to send and receive messages directly from one or more Kafka topics. The Kafka Admin client provides a simple interface through the Kafka API for managing Kafka resources. You can create, delete, and manage topics. You can also use the Admin client to manage consumer groups and configurations.
  • Streams API
    A higher-level stream processing API to easily consume, transform, and produce events between topics.
  • Connect API
    A framework that allows reusable or standard integrations to stream events into and out of external systems, such as databases.

The following table summarizes what you can use with Event Streams:

Kafka client support in Standard, Enterprise, and Lite plans.
Enterprise plan Standard plan Lite plan
Kafka version on cluster Kafka 3.6 Kafka 3.6 Kafka 3.6
Minimum recommended Kafka client version Kafka 2.6.0, or later Kafka 2.6.0, or later Kafka 2.6.0, or later
Supported client versions See Support summary for all recommended clients
Kafka Connect supported Yes Yes No
Kafka Streams supported Yes Yes No
ksqlDB supported Yes No No
Authentication requirements Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol. Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol. Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol.

Choosing a Kafka client to use with Event Streams

The official client for the Kafka API is written in Java, and as such contains the latest features and bug fixes. For more information about this API, see Kafka Producer API 3.6 and Kafka Consumer API 3.6.

For other languages, run one of the following clients, all of which are tested with Event Streams.

Support summary for all recommended clients

Client support summary
Client Language Recommended version Minimum version supported [1] Link to sample
Official Apache Kafka client:
Apache Kafka client Java 3.6.2, or later 2.5.0 Java console sample

Liberty sample

Third-party clients:
confluent-kafka-javascript Node.js Latest 1.0.0
confluent-kafka-python Python Latest 1.4.0 Kafka Python sample
confluent-kafka-go Go Latest 1.4.0
librdkafka C or C++ Latest 1.4.0
node-rdkafka Node.js Latest 2.8.0 Node.js sample
sarama Go Latest 1.40.0 Sarama examples

Connecting your client to Event Streams

For information about how to configure your Java client to connect to Event Streams, see Configuring your client.

Configuring your Kafka API client

To establish a connection, clients must be configured to use SASL PLAIN or SASL OAUTHBEARER over TLSv1.2 at a minimum and to require a username, and a list of the bootstrap servers. TLSv1.2 ensures that connections are encrypted and validates the authenticity of the brokers (to prevent man-in-the-middle attacks). SASL enforces authentication on all connections.

To retrieve the username, password, and list of bootstrap servers, a service credentials object, or service key is required for the service instance. For more information about creating these objects, see Connecting to Event Streams.

Using SASL PLAIN

Use the following strings and properties.

  • Use the bootstrap_endpoints string as the list of bootstrap servers and pass this string of host and port pairs to your Kafka client.
  • Use the user and api_key properties as the username and password.

For a Java client, the following example shows the minimum set of properties, where ${USERNAME}, ${PASSWORD}, and ${BOOTSTRAP_ENDPOINTS} are to be replaced by the values that you retrieved previously.

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

If you use a Kafka client earlier than version 0.10.2.1, the sasl.jaas.config property isn't supported, and you must instead provide the client configuration in a JAAS configuration file.

Using SASL OAUTHBEARER

Before configuring the SASL mechanism for Java client, there are two prerequisites.

  • The minimum supported Kafka Java client version is 3.1.0.
  • Additional jar package needs to be downloaded from Maven Central and made available in the classpath.

If Maven is used in build system, add the following information to the file pom.xml in the dependencies section.

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>1.4.0</version>
</dependency>

If Gradle is used in build system, add the following information to the file build.gradle in the dependencies section.

implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0

IBM Cloud® Identity and Access Management Identity Service supports multiple ways to generate bearer token, two of which are supported by this oauth client library.

  • API key.
  • Trusted profile and compute resource token.

Using SASL OAUTHBEARER with API key

Use the following strings and properties.

  • Use the BOOTSTRAP_ENDPOINTS string as the list of bootstrap servers and pass this string of host and port pairs to your Kafka client.
  • The IAMOAuthBearerLoginCallbackHandler is provided by the jar package com.ibm.cloud.eventstreams:oauth-client:+.
  • The IBM Cloud® Identity and Access Management's token endpoint https://iam.cloud.ibm.com/identity/token is configured to generate token from the API key by using specified grant type in jaas config. It is done on client side, thus the API key is never sent to the server side and provides better security than a long-lived API key.
  • The Cloud Identity and Access Management's key endpoint https://iam.cloud.ibm.com/identity/keys is configured to validate the token.
  • grant_type in sasl.jaas.config is urn:ibm:params:oauth:grant-type:apikey
  • apikey in sasl.jaas.config is the API key used to generate bearer token at client side. It can be either from a user or service Id.

For a Java client, the following example shows the minimum set of properties, where ${BOOTSTRAP_ENDPOINTS}, and ${APIKEY} are to be replaced by the values that you retrieved previously.

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";

Using SASL OAUTHBEARER with trusted profile and compute resource token

All the properties are the same with API key except the sasl.jaas.config is different.

  • grant_type in sasl.jaas.config is urn:ibm:params:oauth:grant-type:cr-token.
  • profile_id in sasl.jaas.config is a file location storing trusted profile ID. This file can be mounted to a Kubernetes pod running Kafka client code as a read-only volume and made available to the Kafka client code.
  • cr_token in sasl.jaas.config is a file location storing service account token from a Kubernetes pod running Kafka client code. See What is a service account token.

See below example

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";

More details on How to setup trusted profile

The source code of oauth client refer to the Event Streams Java SDK.

The sample client code refer to Event Streams Sample.

For other Kafka client libaries, refer to their documentation about how to implement OAUTHBEARER support. For example:.

  • sarama: an implementation of AccessTokenProvider interface is required.
  • librdkafka: an implementation of oauthbearer_token_refresh_cb callback is required.

For information about how to generate an IBM Cloud IAM token by using an API key, see IBM Cloud® Identity and Access Management's document.


  1. The earliest version that was validated in continual testing. Typically, it is the initial version available within the last 12 months, or newer if significant issues are known to exist. If you can't run any of the clients that are listed, you can use other third-party clients that meet the following minimum requirements (for example, librdkafka). 1. Supports Kafka 1.40, or later. 2. Can connect and authenticate by using SASL PLAIN with TLSv1.2. 3. Supports the SNI extensions for TLS where the server's hostname is includes in the TLS handshake. 4. Supports elliptic curve cryptography. In all cases, use the latest version of the client. ↩︎