IBM Cloud Docs
Using Kafka Connect with Event Streams

Using Kafka Connect with Event Streams

Kafka Connect is part of the Apache Kafka project and allows connecting external systems to Kafka. It consists of a runtime that can run connectors to copy data to and from a cluster. Its main characteristics are:

  • Scalability: It can easily scale from a single worker to many.
  • Reliability: It automatically manages offsets and the lifecycle of connectors.
  • Extensibility: The community built connectors for most popular systems. IBM® has connectors for MQ and Cloud Object Storage.

You can use Kafka Connect with IBM® Event Streams for IBM Cloud® and can run the workers inside or outside IBM Cloud®. IBM has an extensive list of over 50 connectors that are supported either by IBM or the community. You find these connectors in the connector catalog.

Kafka Connect can run in either stand-alone or distributed mode. Stand-alone mode is intended for testing and temporary connections between systems. Distributed mode is more appropriate for production use. The configuration required to use Event Streams with these two modes is slightly different.

Stand-alone worker configuration

The stand-alone worker does not use any internal topics. Instead, it uses a file for storing offset information.

You must provide the bootstrap servers and SASL credentials information in the worker properties file that you supply when you start a Kafka Connect stand-alone worker. The following example lists the properties that you must provide in your properties file:

    bootstrap.servers=BOOTSTRAP_ENDPOINTS
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    ssl.protocol=TLSv1.2
    ssl.enabled.protocols=TLSv1.2
    ssl.endpoint.identification.algorithm=HTTPS

Replace BOOTSTRAP_ENDPOINTS, USER, and PASSWORD with the values from your Event Streams Service Credentials tab in the IBM Cloud console.

Source connector

The following example lists the properties that you must provide in your properties file:

    bootstrap.servers=BOOTSTRAP_ENDPOINTS
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=PLAIN
    producer.ssl.protocol=TLSv1.2
    producer.ssl.enabled.protocols=TLSv1.2
    producer.ssl.endpoint.identification.algorithm=HTTPS

Replace BOOTSTRAP_ENDPOINTS, USER, and PASSWORD with the values from your Event Streams Service Credentials tab in the IBM Cloud console.

Sink connector

The following example lists the properties that you must provide in your properties file:

    bootstrap.servers=BOOTSTRAP_ENDPOINTS
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
    consumer.security.protocol=SASL_SSL
    consumer.sasl.mechanism=PLAIN
    consumer.ssl.protocol=TLSv1.2
    consumer.ssl.enabled.protocols=TLSv1.2
    consumer.ssl.endpoint.identification.algorithm=HTTPS

Replace BOOTSTRAP_ENDPOINTS, USER, and PASSWORD with the values from your Event Streams Service Credentials tab in the IBM Cloud console.

Distributed worker configuration

You must provide the bootstrap servers and SASL credentials information in the properties file that you supply when you start the Kafka Connect distributed workers. The following example lists the properties that you must provide in your properties file:

    bootstrap.servers=BOOTSTRAP_ENDPOINTS
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    ssl.protocol=TLSv1.2
    ssl.enabled.protocols=TLSv1.2
    ssl.endpoint.identification.algorithm=HTTPS

Replace BOOTSTRAP_ENDPOINTS, USER, and PASSWORD with the values from your Event Streams Service Credentials tab in the IBM Cloud console.

If you want to use a source connector, you must also specify the SSL and SASL configuration for the producer as follows:

    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=PLAIN
    producer.ssl.protocol=TLSv1.2
    producer.ssl.enabled.protocols=TLSv1.2
    producer.ssl.endpoint.identification.algorithm=HTTPS

If you want to use a sink connector, you must also specify the SSL and SASL configuration for the consumer as follows:

    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
    consumer.security.protocol=SASL_SSL
    consumer.sasl.mechanism=PLAIN
    consumer.ssl.protocol=TLSv1.2
    consumer.ssl.enabled.protocols=TLSv1.2
    consumer.ssl.endpoint.identification.algorithm=HTTPS

In addition, Kafka Connect in distributed mode uses three topics internally. These topics are created automatically when a worker starts up, if you use Kafka Connect in Apache Kafka version 0.11 or later. You provide the names of the topics as configuration parameters. Ensure that the values are the same for all workers with the same group.id configuration value.

Table 1. Topics in Kafka Connect
Configuration Description
offset.storage.topic Connector offsets topic
offset.storage.partitions Number of partitions for connector offsets topic (default 25)
config.storage.topic Connector configuration topic
status.storage.topic Connector status topic
status.storage.partitions Number of partitions for connector status topic (default 5)

For example, you can use the following key-value pairs in your properties file:

    offset.storage.topic=connect-offsets
    config.storage.topic=connect-configs
    status.storage.topic=connect-status

Consider reducing the number of partitions if you are making only light use of Kafka Connect.

For more information about Kafka Connect, see Kafka Connect overview.