IBM Cloud Docs
设置 Kafka 配额

设置 Kafka 配额

Kafka 配额对生产和使用请求实施限制,以控制客户机所使用的代理资源。 Kafka 配额使管理员能够对单个生产者和使用者应用程序可使用的网络吞吐量实施限制。

关于 Kafka 配额

如果未受约束,那么少数使用者或生产者可能会垄断服务实例的可用网络吞吐量。

Kafka 代理支持实施速率限制的配额,以防止客户机使网络饱和或垄断代理资源。 有关更多信息,请参阅 Apache Kafka 文档

可以配置 Kafka 配额以限制网络带宽使用,Kafka 度量此吞吐量 (以字节/秒为单位)。 如果发现超过 30 秒窗口的吞吐量超过已配置的配额,那么 Kafka 会计算足够的延迟,以使吞吐量在配额限制内。

然后,Kafka 代理将延迟信息作为标准 Kafka 协议响应的一部分发送给客户机。 遵循协议合同的合作客户机在发出新请求之前等待此延迟; 不合作客户机可能不遵守调速请求,但在这种情况下,代理程序在调速延迟过去之前不会读取该客户机的请求 (这可能导致不合作客户机超时)。

以下信息适用于配额:

  • 可以为生产者和消费者定义单独的配额。
  • 缺省情况下,客户机配额不受限制。
  • 将对每个代理应用配额,而不是对每个集群应用配额。
  • 配额将应用于共享单个用户身份的所有客户机。
  • 配额可以应用于“缺省”用户,因此它适用于任何用户,即使没有为其设置特定于用户的配额也是如此。

客户机度量值

Java 客户机还会使用以下每个代理度量值来公开调速信息:

  • 生成-调速-time-max
  • 生产-调速-平均时间
  • fetch-throttle-time-max
  • fetch-throttle-time-avg

设置客户机配额

IBM® Event Streams for IBM Cloud® Enterprise 套餐允许使用 Kafka API 来设置和描述 Kafka V3.1.x 集群上的配额。 有关更多信息,请参阅 IBM® Event Streams for IBM Cloud® 管理 REST API 的 配额操作部分

通过引用有关配额的 Kafka 文档,仅支持应用于“用户”实体 (或 "缺省用户") 的吞吐量配额类型 ("producer_byte_rate" 和 "consumer_byte_rate" 配额类型)。

不支持 "client-id" 实体,"request" 和“controller-突变”配额类型作为用户可设置的配额。 在 Event Streams中,已认证的用户身份由 IBM Cloud® Identity and Access Management 标识表示。 由于将按 Cloud Identity and Access Management 标识应用 Kafka 配额,因此可以由一组 API 密钥共享单个配额 (如果这些都属于同一 Cloud Identity and Access Management 服务标识)。

要获取 Cloud Identity and Access Management (IAM) 服务标识的 Cloud Identity and Access Management 标识,可以使用 IBM Cloud CLI。

ibmcloud iam service-id ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab --output json

请参阅以下示例输出:

{

    "active":true,

    "jti":"...",

    "iam_id":"iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab",

    "realmId":"iam",

     ....

}

将配额映射到 IBM Event Streams Enterprise 集群

Kafka API 配额是每个代理程序的配额,但是企业套餐容量描述为 每个集群的吞吐量。 因此,如果要将用户总数限制为 10 MB/s,请将 10/n MB/s 的配额应用于每个代理 (其中 n 是集群中的代理数量)。

要查找集群中的代理程序数,可以使用 KafkaAdminClient.describeCluster 调用。

有关更多信息,请参阅 Java 文档

还可以使用 Apache Kafka 分发中捆绑的 kafka-configs.sh shell 脚本来找到代理程序数。

bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type brokers

Event Streams 授权

要有权设置客户机配额,用户必须对 Cloud Identity and Access Management中的“集群”资源具有“管理者”角色。

通过实例上具有“管理者”角色的 Event Streams UI 创建的一组凭证在集群上也具有“管理者”角色。 因此,除了设置配额外,您还可以创建,删除和变更主题。 任何已认证的用户都在集群上具有隐含的“读者”角色,并且能够描述配额。

要创建一组可以管理主题,组和参与事务但未进行认证以设置配额的凭证,必须将对资源类型 "topic","group" 和 "txnid" 具有“管理者”角色以及对 "cluster" 具有“读者”角色的 IAM 访问策略与服务标识相关联。

示例: 使用 the kafka-config.sh 脚本管理配额 (Apache 客户机)

  1. 下载 Kafka 二进制 分发版 (至少 V3.1.0)。

  2. 创建属性文件 (在以下命令行示例中名为 command-config.properties ),其中包含以下条目 (将 "myapikey" 替换为实际 API 密钥)。

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="myapikey";

security.protocol=SASL_SSL

sasl.mechanism=PLAIN

ssl.protocol=TLSv1.2

ssl.enabled.protocols=TLSv1.2

ssl.endpoint.identification.algorithm=HTTPS

有关更多信息,请参阅配置 Kafka API 客户机

  1. 使用以下用法示例。

    • 更改用户的配额:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --add-config 'producer_byte_rate=1024,
    consumer_byte_rate=2048' --entity-type users --entity-name iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
    

    已完成更新用户 iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab 的配置。

    • 描述用户的配额:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type users --entity-name
    iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
    

    用户主体 iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab 的配额配置为 consumer_byte_rate=2048.0producer_byte_rate=1024.0

    • 除去用户的配额:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --delete-config       'producer_byte_rate,consumer_byte_rate' --entity-type users --entity-name iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab
    

    已完成更新用户 iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab 的配置。

    • 描述设置为任何用户 (包括缺省用户) 的所有配额:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --describe --entity-type users
    
    • 更改缺省用户的配额:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --add-config 'producer_byte_rate=1024,
    consumer_byte_rate=2048' --entity-type users --entity-default
    

    已完成更新集群中用户的缺省配置。

    • 除去缺省用户的配额:
    bin/kafka-configs.sh --command-config command-config.properties --bootstrap-server "kafka-0.blah.cloud:9093" --alter --delete-config 'producer_byte_rate,
    consumer_byte_rate' --entity-type users --entity-default
    

    已完成更新集群中用户的缺省配置。

示例: 通过 Java API 改变吞吐量配额使用情况

以下示例是显示如何调用 KafkaAdminClient.alterclientQuotas 方法的简短样本片段。

https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html#alterClientQuotas(java.util.Collection,org.apache.kafka.clients.admin.AlterClientQuotas
Options)

import java.util.*;

import org.apache.kafka.clients.CommonClientConfigs;

import org.apache.kafka.clients.admin.*;

import org.apache.kafka.common.config.*;

import org.apache.kafka.common.quota.*;

class Snippet {

    public static void main(String[] args) throws Exception {

        // Kafka client configuration properties.

        String mybootstrap = "...";   // from the bootstrap_endpoints of the service credentials

        String myapikey = "...";      // from the apikey of the service credentials

        Properties properties = new Properties();

        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, mybootstrap);

        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "sasl_ssl");

        properties.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2");

        properties.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");

        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        properties.put(SaslConfigs.SASL_JAAS_CONFIG,

                "org.apache.kafka.common.security.plain.PlainLoginModule " +

                        "required username=\"token\" password=\"" + myapikey + "\";");

        AdminClient admin = AdminClient.create(properties);

        String iamID = "iam-ServiceId-12345678-aaaa-bbbb-cccc-1234567890ab"; //set iam id of target user to set quotas to

        // if null is used instead of the iamID string, the following quota alteration will be applied to the default user

        // add quotas 

        ClientQuotaEntity entity = new ClientQuotaEntity(

                Collections.singletonMap(ClientQuotaEntity.USER, iamID));

        ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity,

                Arrays.asList(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0),

                        new ClientQuotaAlteration.Op("producer_byte_rate", 1000.0)));

        admin.alterClientQuotas(Arrays.asList(alteration)).all().get();

        // describe quotas

        DescribeClientQuotasResult describeClientQuotasFuture = admin.describeClientQuotas(ClientQuotaFilter.all());

        System.out.println(describeClientQuotasFuture.entities().get());

        //remove quotas (set them to null)

        entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, iamID));

        alteration = new ClientQuotaAlteration(entity,

                Arrays.asList(new ClientQuotaAlteration.Op("consumer_byte_rate", null),

                        new ClientQuotaAlteration.Op("producer_byte_rate", null)));

        admin.alterClientQuotas(Arrays.asList(alteration)).all().get();

    }

}

IBM Cloud Activity Tracker 事件

每当更新吞吐量配额时,都会生成 Event Streams 配置事件,可以在 IBM Cloud® Activity Tracker中监视此事件。

有关为 Event Streams配置 Activity Tracker 事件的更多信息,请参阅 Activity Tracker 文档