IBM Cloud Docs
Setting Kafka quotas

Setting Kafka quotas

Kafka quotas enforce limits on produce and consume requests to control the broker resources used by clients. Kafka quotas enable an administrator to enforce limits on the network throughput that can be consumed by individual producer and consumer applications.

About Kafka quotas

If left unconstrained, it is possible for a small number of consumers or producers to monopolize the available network throughput for your service instance.

Kafka brokers support quotas that enforce rate limits to prevent clients saturating the network or monopolizing broker resources. For more information, see the Apache Kafka documentation.

Kafka quotas can be configured to limit network bandwidth usage, Kafka measures this throughput in bytes per second. If throughput over a 30 second window is found to exceed a configured quota, Kafka calculates a sufficient delay to bring throughput within the quota limit.

Kafka brokers then send the delay information to clients as part of standard Kafka protocol responses. A cooperative client, respecting the protocol contract, waits for this delay before making new requests; an uncooperative client may not respect the throttling request, but in such a case, the broker does not read that client's requests until the throttling delay has elapsed (which may cause timeouts on the uncooperative client).

The following information applies for quotas:

  • Separate quotas may be defined for producers and consumers.
  • By default, client quotas are unlimited.
  • Quotas are applied per broker, rather than per cluster.
  • A quota is applied to all clients that share a single user identity.
  • A quota can be applied to the 'default' user, so it applies to any user, even for whom no user-specific quota was set.

Client metrics

The Java client also exposes throttling information with the following per-broker metrics:

  • produce-throttle-time-max
  • produce-throttle-time-avg
  • fetch-throttle-time-max
  • fetch-throttle-time-avg

Setting client quotas

The IBM® Event Streams for IBM Cloud® Enterprise plan allows the use of the Kafka API to set and describe quotas on Kafka V3.1.x clusters. For more information, see the Quota Operations section of the IBM® Event Streams for IBM Cloud® Admin REST API.

With reference to the Kafka documentation on quotas, only throughput quota types ("producer_byte_rate" and "consumer_byte_rate" quota types) applied to the "user" entity (or the "default user") are supported.

The "client-id" entity, the "request", and the "controller-mutation" quota types are not supported as user-settable quotas. In Event Streams, an authenticated user identity is represented by an IBM Cloud® Identity and Access Management ID. Because Kafka quotas are applied per Cloud Identity and Access Management ID, a single quota can be shared by a group of API keys, if these all belong to the same Cloud Identity and Access Management service ID.

To obtain the Cloud Identity and Access Management ID of an Cloud Identity and Access Management (IAM) service ID, the IBM Cloud CLI can be used.

ibmcloud iam service-id ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab --output json

See the following example output:

{

    "active":true,

    "jti":"...",

    "iam_id":"iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab",

    "realmId":"iam",

     ....

}

Mapping quotas onto an IBM Event Streams Enterprise cluster

The Kafka API quotas are per-broker, however Enterprise plan capacity is described as a per-cluster throughput. Therefore, if you want to limit a user to 10 MB/s in total, you apply a quota of 10/n MB/s to each broker (where n is the number of brokers in the cluster).

To find the number of brokers in a cluster, you can use the KafkaAdminClient.describeCluster call.

For more information, see the Java documentation.

The number of brokers can also be found by using the kafka-configs.sh shell script bundled in the Apache Kafka distribution.

bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type brokers

Event Streams authorization

To be authorized to set client quotas, a user must have the Manager role on the "cluster" resource in Cloud Identity and Access Management.

A set of credentials created with the Event Streams UI with the Manager role on the instance also has the Manager role on cluster. Thus, you are able to create, delete, and alter topics in addition to setting quotas. Any authenticated user has an implied Reader role on cluster and is able to describe quotas.

To create a set of credentials that can manage topics, groups, and participate in transactions but are not authrorized to set quotas, an IAM access policy that has Manager role on resource types "topic", "group", and "txnid" and Reader role on "cluster" must to be associated with the service ID.

Example: Managing quotas with the kafka-config.sh script (Apache client)

  1. Download a Kafka binary distribution (at least V3.1.0).

  2. Create a properties file (named command-config.properties in the following command lines examples), containing the following entries (replacing "myapikey" with the actual API key).

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="myapikey";

security.protocol=SASL_SSL

sasl.mechanism=PLAIN

ssl.protocol=TLSv1.2

ssl.enabled.protocols=TLSv1.2

ssl.endpoint.identification.algorithm=HTTPS

For more information, see Configuring your Kafka API client.

  1. Work with the following usage examples.

    • Alter quotas for user:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --add-config 'producer_byte_rate=1024,
    consumer_byte_rate=2048' --entity-type users --entity-name iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
    

    Completed updating configuration for user iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab.

    • Describe quotas for user:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type users --entity-name 
    iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
    

    The quota configurations for user-principal iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab are consumer_byte_rate=2048.0 and producer_byte_rate=1024.0.

    • Remove quotas for user:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --delete-config       'producer_byte_rate,consumer_byte_rate' --entity-type users --entity-name iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
    

    Completed updating config for user iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab.

    • Describe all quotas that were set to any user, including the default user:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type users
    
    • Alter quotas for the default user:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --add-config 'producer_byte_rate=1024,
    consumer_byte_rate=2048' --entity-type users --entity-default
    

    Completed updating default configuration for users in the cluster.

    • Remove quotas for the default user:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --delete-config 'producer_byte_rate,
    consumer_byte_rate' --entity-type users --entity-default
    

    Completed updating default configuration for users in the cluster.

Example: Alter throughput quota usage through the Java API

The following example is a short sample snippet showing how to invoke the KafkaAdminClient.alterclientQuotas method.

https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html#alterClientQuotas(java.util.Collection,org.apache.kafka.clients.admin.AlterClientQuotas
Options)

import java.util.*;

import org.apache.kafka.clients.CommonClientConfigs;

import org.apache.kafka.clients.admin.*;

import org.apache.kafka.common.config.*;

import org.apache.kafka.common.quota.*;

class Snippet {

    public static void main(String[] args) throws Exception {

        // Kafka client configuration properties.

        String mybootstrap = "...";   // from the bootstrap_endpoints of the service credentials

        String myapikey = "...";      // from the apikey of the service credentials

        Properties properties = new Properties();

        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, mybootstrap);

        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "sasl_ssl");

        properties.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2");

        properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");

        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        properties.put(SaslConfigs.SASL_JAAS_CONFIG,

                "org.apache.kafka.common.security.plain.PlainLoginModule " +

                        "required username=\"token\" password=\"" + myapikey + "\";");

        AdminClient admin = AdminClient.create(properties);

        String iamID = "iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab"; //set iam id of target user to set quotas to

        // if null is used instead of the iamID string, the following quota alteration will be applied to the default user

        // add quotas 

        ClientQuotaEntity entity = new ClientQuotaEntity(

                Collections.singletonMap(ClientQuotaEntity.USER, iamID));

        ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity,

                Arrays.asList(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0),

                        new ClientQuotaAlteration.Op("producer_byte_rate", 1000.0)));

        admin.alterClientQuotas(Arrays.asList(alteration)).all().get();

        // describe quotas

        DescribeClientQuotasResult describeClientQuotasFuture = admin.describeClientQuotas(ClientQuotaFilter.all());

        System.out.println(describeClientQuotasFuture.entities().get());

        //remove quotas (set them to null)

        entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, iamID));

        alteration = new ClientQuotaAlteration(entity,

                Arrays.asList(new ClientQuotaAlteration.Op("consumer_byte_rate", null),

                        new ClientQuotaAlteration.Op("producer_byte_rate", null)));

        admin.alterClientQuotas(Arrays.asList(alteration)).all().get();

    }

}

IBM Cloud Activity Tracker events

Whenever throughput quotas are updated, an Event Streams configuration event is generated, which can be monitored in IBM Cloud® Activity Tracker.

For more information about configuring Activity Tracker events for Event Streams, see the Activity tracker documentation.