IBM Cloud Docs
Using the Kafka API

Using the Kafka API

Kafka provides a rich set of APIs and clients across a broad range of languages. APIs include core API, Streams API, and Connect API.

  • Kafka's core API (Consumer, Producer, and Admin API)
    Use to send and receive messages directly from one or more Kafka topics. The Kafka Admin client provides a simple interface through the Kafka API for managing Kafka resources. You can create, delete, and manage topics. You can also use the Admin client to manage consumer groups and configurations.
  • Streams API
    A higher-level stream processing API to easily consume, transform, and produce events between topics.
  • Connect API
    A framework that allows reusable or standard integrations to stream events into and out of external systems, such as databases.

The following table summarizes what you can use with Event Streams:

Table 1. Kafka client support in Standard, Enterprise, and Lite plans.
Enterprise plan Standard plan Lite plan
Kafka version on cluster Kafka 3.3 Kafka 3.3 Kafka 3.3
Minimum recommended Kafka client version Kafka 2.6.0, or later Kafka 2.6.0, or later Kafka 2.6.0, or later
Supported client versions See Support summary for all recommended clients
Kafka Connect supported Yes Yes No
Kafka Streams supported Yes Yes No
ksqlDB supported Yes No No
Authentication requirements Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol. Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol. Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol.

Choosing a Kafka client to use with Event Streams

The official client for the Kafka API is written in Java, and as such contains the latest features and bug fixes. For more information about this API, see Kafka Producer API 3.3 and Kafka Consumer API 3.3.

For other languages, run one of the following clients, all of which are tested with Event Streams.

Support summary for all recommended clients

Table 2. Client support summary
Client Language Recommended version Minimum version supported [1] Link to sample
Official Apache Kafka client:
Apache Kafka client Java 3.3.0, or later 2.5.0 Java console sample

Liberty sample

Third-party clients:
node-rdkafka Node.js Latest 2.8.0 Node.js sample
confluent-kafka-python Python Latest 1.4.0 Kafka Python sample
confluent-kafka-go Go Latest 1.4.0
librdkafka C or C++ Latest 1.4.0
sarama Go Latest 1.26.3 Sarama examples

Connecting your client to Event Streams

For information about how to configure your Java client to connect to Event Streams, see Configuring your client.

Configuring your Kafka API client

To establish a connection, clients must be configured to use SASL PLAIN or SASL OAUTHBEARER over TLSv1.2 at a minimum and to require a username, and a list of the bootstrap servers. TLSv1.2 ensures that connections are encrypted and validates the authenticity of the brokers (to prevent man-in-the-middle attacks). SASL enforces authentication on all connections.

To retrieve the username, password, and list of bootstrap servers, a service credentials object, or service key is required for the service instance. For more information about creating these objects, see Connecting to Event Streams.

Using SASL PLAIN

Use the following strings and properties.

  • Use the bootstrap_endpoints string as the list of bootstrap servers and pass this string of host and port pairs to your Kafka client.
  • Use the user and api_key properties as the username and password.

For a Java client, the following example shows the minimum set of properties, where ${USERNAME}, ${PASSWORD}, and ${BOOTSTRAP_ENDPOINTS} are to be replaced by the values that you retrieved previously.

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

If you use a Kafka client earlier than version 0.10.2.1, the sasl.jaas.config property isn't supported, and you must instead provide the client configuration in a JAAS configuration file.

Using SASL OAUTHBEARER

Before configuring the SASL mechanism for Java client, there are two prerequisites.

  • The minimum supported Kafka Java client version is 3.1.0.
  • Additional jar package needs to be downloaded from Maven Central and made available in the classpath.

If Maven is used in build system, add the following information to the file pom.xml in the dependencies section.

<dependency>
    <groupId>com.ibm.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>0.1.2</version>
</dependency>

If Gradle is used in build system, add the following information to the file build.gradle in the dependencies section.

implementation com.ibm.eventstreams:oauth-client:0.1.2

Use the following strings and properties.

  • Use the bootstrap_endpoints string as the list of bootstrap servers and pass this string of host and port pairs to your Kafka client.
  • Use the api_key string as the API key.
  • The IAMOAuthBearerLoginCallbackHandler is provided by the jar package com.ibm.eventstreams:oauth-client:+.
  • The IBM Cloud® Identity and Access Management's token endpoint https://iam.cloud.ibm.com/identity/token is configured to generate token from the API key by using specified grant type in jaas config. It is done on client side, thus the API key is never sent to the server side and provides better security than a long-lived API key.
  • The Cloud Identity and Access Management's key endpoint https://iam.cloud.ibm.com/identity/keys is configured to validate the token.

For a Java client, the following example shows the minimum set of properties, where ${BOOTSTRAP_ENDPOINTS}, and ${APIKEY} are to be replaced by the values that you retrieved previously.

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

The sample code refers to the Event Streams samples.

For other Kafka client libaries, refer to their documentation about how to implement OAUTHBEARER support. For example:.

  • sarama: an implementation of AccessTokenProvider interface is required.
  • librdkafka: an implementation of oauthbearer_token_refresh_cb callback is required.

For information about how to generate an IBM Cloud IAM token by using an API key, see IBM Cloud® Identity and Access Management's document.


  1. The earliest version that was validated in continual testing. Typically, it is the initial version available within the last 12 months, or newer if significant issues are known to exist. If you can't run any of the clients that are listed, you can use other third-party clients that meet the following minimum requirements (for example, librdkafka). 1. Supports Kafka 0.10, or later. 2. Can connect and authenticate by using SASL PLAIN with TLSv1.2. 3. Supports the SNI extensions for TLS where the server's hostname is includes in the TLS handshake. 4. Supports elliptic curve cryptography. In all cases, use the latest version of the client. ↩︎