Setting Kafka quotas
        Kafka quotas enforce limits on produce and consume requests to control the broker resources used by clients. Kafka quotas enable an administrator to enforce limits on the network throughput that can be consumed by individual producer and consumer
          applications.
        
          About Kafka quotas
          If left unconstrained, it is possible for a small number of consumers or producers to monopolize the available network throughput for your service instance.
          Kafka brokers support quotas that enforce rate limits to prevent clients saturating the network or monopolizing broker resources. For more information, see the
            Apache Kafka documentation.
          Kafka quotas can be configured to limit network bandwidth usage, Kafka measures this throughput in bytes per second. If throughput over a 30 second window is found to exceed a configured quota, Kafka calculates a sufficient delay to bring throughput
            within the quota limit.
          Kafka brokers then send the delay information to clients as part of standard Kafka protocol responses. A cooperative client, respecting the protocol contract, waits for this delay before making new requests; an uncooperative client may not respect
            the throttling request, but in such a case, the broker does not read that client's requests until the throttling delay has elapsed (which may cause timeouts on the uncooperative client).
          The following information applies for quotas:
          
            - Separate quotas may be defined for producers and consumers.
- By default, client quotas are unlimited.
- Quotas are applied per broker, rather than per cluster.
- A quota is applied to all clients that share a single user identity.
- A quota can be applied to the 'default' user, so it applies to any user, even for whom no user-specific quota was set.
Client metrics
          The Java client also exposes throttling information with the following per-broker metrics:
          
            - produce-throttle-time-max
- produce-throttle-time-avg
- fetch-throttle-time-max
- fetch-throttle-time-avg
Setting client quotas
          The IBM® Event Streams for IBM Cloud® Enterprise plan allows the use of the Kafka API to set and describe quotas on Kafka V3.1.x clusters. For more information, see the Quota Operations section            of the IBM® Event Streams for IBM Cloud® Admin REST API.
          With reference to the Kafka documentation on quotas, only throughput quota types ("producer_byte_rate" and "consumer_byte_rate" quota types) applied to the "user"
            entity (or the "default user") are supported.
          The "client-id" entity, the "request", and the "controller-mutation" quota types are not supported as user-settable quotas. In Event Streams, an authenticated user identity is represented by an IBM Cloud® Identity
            and Access Management ID. Because Kafka quotas are applied per Cloud Identity and Access Management ID, a single quota can be shared by a group of API keys, if these all belong to the same Cloud Identity and Access Management service ID.
          To obtain the Cloud Identity and Access Management ID of an Cloud Identity and Access Management (IAM) service ID, the IBM Cloud CLI can be used.
          ibmcloud iam service-id ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab --output json
          See the following example output:
          {
    "active":true,
    "jti":"...",
    "iam_id":"iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab",
    "realmId":"iam",
     ....
}
        
        
          Mapping quotas onto an IBM Event Streams Enterprise cluster
          The Kafka API quotas are per-broker, however Enterprise plan capacity is described as a per-cluster throughput. Therefore, if you want
            to limit a user to 10 MB/s in total, you apply a quota of 10/n MB/s to each broker (where n is the number of brokers in the cluster).
          To find the number of brokers in a cluster, you can use the KafkaAdminClient.describeCluster call.
          For more information, see the Java documentation.
          The number of brokers can also be found by using the kafka-configs.sh shell script bundled in the Apache Kafka distribution.
          bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type brokers
        
        
          Event Streams authorization
          To be authorized to set client quotas, a user must have the Manager role on the "cluster" resource in Cloud Identity and Access Management.
          A set of credentials created with the Event Streams UI with the Manager role on the instance also has the Manager role on cluster. Thus, you are able to create, delete, and alter topics in addition to setting quotas. Any authenticated user has
            an implied Reader role on cluster and is able to describe quotas.
          To create a set of credentials that can manage topics, groups, and participate in transactions but are not authrorized to set quotas, an IAM access policy that has Manager role on resource types "topic", "group", and "txnid"
            and Reader role on "cluster" must to be associated with the service ID.
          
            Example: Managing quotas with the kafka-config.sh script (Apache client)
            
              - 
                Download a Kafka binary distribution (at least V3.1.0). 
- 
                Create a properties file (named command-config.properties in the following command lines examples), containing the following entries (replacing "myapikey" with the actual API key). 
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="myapikey";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
            For more information, see Configuring your Kafka API client.
            
              - 
                Work with the following usage examples. bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --add-config 'producer_byte_rate=1024,
consumer_byte_rate=2048' --entity-type users --entity-name iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
 Completed updating configuration for user iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab.
 
                  - Describe quotas for user:
 bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type users --entity-name 
iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
 The quota configurations for user-principal iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890abareconsumer_byte_rate=2048.0andproducer_byte_rate=1024.0.
 bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --delete-config       'producer_byte_rate,consumer_byte_rate' --entity-type users --entity-name iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
 Completed updating config for user iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab.
 
                  - Describe all quotas that were set to any user, including the default user:
 bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type users
 
                  - Alter quotas for the default user:
 bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --add-config 'producer_byte_rate=1024,
consumer_byte_rate=2048' --entity-type users --entity-default
 Completed updating default configuration for users in the cluster. 
                  - Remove quotas for the default user:
 bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --delete-config 'producer_byte_rate,
consumer_byte_rate' --entity-type users --entity-default
 Completed updating default configuration for users in the cluster. 
Example: Alter throughput quota usage through the Java API
              The following example is a short sample snippet showing how to invoke the KafkaAdminClient.alterclientQuotas method.
              https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html
Options)
import java.util.*;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.*;
import org.apache.kafka.common.quota.*;
class Snippet {
    public static void main(String[] args) throws Exception {
        // Kafka client configuration properties.
        String mybootstrap = "...";   // from the bootstrap_endpoints of the service credentials
        String myapikey = "...";      // from the apikey of the service credentials
        Properties properties = new Properties();
        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, mybootstrap);
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "sasl_ssl");
        properties.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2");
        properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        properties.put(SaslConfigs.SASL_JAAS_CONFIG,
                "org.apache.kafka.common.security.plain.PlainLoginModule " +
                        "required username=\"token\" password=\"" + myapikey + "\";");
        AdminClient admin = AdminClient.create(properties);
        String iamID = "iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab"; //set iam id of target user to set quotas to
        // if null is used instead of the iamID string, the following quota alteration will be applied to the default user
        // add quotas 
        ClientQuotaEntity entity = new ClientQuotaEntity(
                Collections.singletonMap(ClientQuotaEntity.USER, iamID));
        ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity,
                Arrays.asList(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0),
                        new ClientQuotaAlteration.Op("producer_byte_rate", 1000.0)));
        admin.alterClientQuotas(Arrays.asList(alteration)).all().get();
        // describe quotas
        DescribeClientQuotasResult describeClientQuotasFuture = admin.describeClientQuotas(ClientQuotaFilter.all());
        System.out.println(describeClientQuotasFuture.entities().get());
        //remove quotas (set them to null)
        entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, iamID));
        alteration = new ClientQuotaAlteration(entity,
                Arrays.asList(new ClientQuotaAlteration.Op("consumer_byte_rate", null),
                        new ClientQuotaAlteration.Op("producer_byte_rate", null)));
        admin.alterClientQuotas(Arrays.asList(alteration)).all().get();
    }
}
            
          
        
        
          IBM Cloud Activity Tracker events
          Whenever throughput quotas are updated, an Event Streams configuration event is generated, which can be monitored in IBM Cloud® Activity Tracker.
          For more information about configuring Activity Tracker events for Event Streams, see the Activity tracker documentation.