订阅Kafka事件
通过本教程,您可以了解如何使用 IBM Cloud® Code Engine CLI 预订 Kafka 事件。
在分布式环境中,您通常希望应用程序或作业对从其他组件 (通常称为事件生产者) 生成的消息 (事件) 作出反应。 通过 Code Engine,应用程序或作业可以通过预订事件生产者来接收感兴趣的事件。 事件信息作为应用程序的 POST HTTP 请求接收,并作为作业的环境变量接收。
Kafka 事件生产者监视要在 Kafka 实例中显示的新消息。 为一组主题创建 Code Engine Kafka 预订时,应用程序或作业会针对其中一个主题中显示的每条新消息接收单独的事件。
虽然您可以使用任何 Kafka 实例,但本教程中的示例使用 IBM® Event Streams for IBM Cloud® 服务。Event Streams 是针对 Kafka 事件的 IBM 事件流服务。 有关此服务的更多信息,请参阅 Event Streams 文档。
准备工作
所有 Code Engine 用户都需要具有现收现付帐户。 教程可能会产生成本。 使用“成本估算器”根据您的预计使用量生成成本估算。 有关更多信息,请参阅 Code Engine 定价。
设置 Kafka 事件生产者
您可以设置 Kafka 消息生产者以将消息发送到 Code Engine Kafka 事件预订。 使用 Code Engine Kafka 事件预订在收到 Kafka 消息时触发应用程序或作业。
首先,为事件流服务创建 Event Streams 服务实例。 虽然您可以使用控制台或 CLI,但以下步骤描述了如何使用 CLI 设置 Event Streams 事件生产者。
-
为 Event Streams创建服务实例。 Event Streams CLI 服务的名称为
messagehub
。 对于此示例,请创建名为myeventstream
的 Event Streams 服务实例。ibmcloud resource service-instance-create myeventstream messagehub lite us-south
-
创建服务密钥以向服务实例提供凭证。
ibmcloud resource service-key-create myeventstream-key Manager --instance-name myeventstream
示例输出
Creating service key of service instance myeventstream under account <user_account>... OK Service key crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd was created. Name: myeventstream-key ID: crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd Created At: Mon Mar 21 18:36:09 UTC 2022 State: active Credentials: api_key: abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh apikey: abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh iam_apikey_description: Auto-generated for key crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd iam_apikey_name: myeventstream-key iam_role_crn: crn:v1:bluemix:public:iam::::serviceRole:Manager iam_serviceid_crn: crn:v1:bluemix:public:iam-identity::a/e43abfcbd191404cb17ef650e9681dd3::serviceid:ServiceId-3e99caa5-b174-4f04-9845-5c5d783b8bc7 instance_id: c0736069-3f4a-438a-b614-6846877d692d kafka_admin_url: https://abcdabcdabcdabcd.svc07.us-south.eventstreams.cloud.ibm.com kafka_brokers_sasl: [broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093] kafka_http_url: https://abcdabcdabcdabcd.svc07.us-south.eventstreams.cloud.ibm.com password: abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh user: token
记下服务密钥的
user
,password
和kafka-brokers_sasl
的值。 当您设置 Code Engine Kafka 预订时,需要此信息。password
和apikey
的值在 Event Streams 服务实例的服务密钥中相同。 您还可以使用ibmcloud resource service-key myeventstream-key
命令来检索服务密钥信息。 -
相对于 Event Streams 服务实例,初始化 Event Streams 插件。
ibmcloud es init --instance-name myeventstream
-
创建 Event Streams 主题。
ibmcloud es topic-create kafka-topic1
设置 Code Engine 样本应用程序以生成 Kafka 消息
对于本教程,请设置 Code Engine 应用程序以充当 Kafka 消息的事件生产者。 此 kafka-sender-app
应用程序的用途是连接到 Event Streams 实例,并将 Kafka 消息生成 (发送) 到消息接收方 (Kafka 使用者)。 此为 Kafka 消息生成事件的应用程序使用 Code Engine Kafka 发送方样本应用程序 来发送 Kafka 消息。 此样本发送方映像需要 BROKERS
环境变量以及包含 password
凭证的私钥。
-
使用 Code Engine Kafka 样本所需的凭证创建私钥。 例如,创建
kafka-subscription-secret
私钥,以包含 Kafka 发送方样本应用程序和使用 Kafka 接收方样本的 Kafka 事件预订所需的凭证。 样本 Kafka 发送方应用程序和 Code Engine Kafka 事件预订需要这些凭证以与 Event Streams的服务实例进行通信。 虽然在创建 Kafka 发送方应用程序和事件预订之前不要求您创建此私钥,但此操作可简化所需步骤。要创建
kafka-subscription-secret
私钥,请为password
和username
添加字面值环境变量。 有关更多信息,请参阅 使用 CLI 创建私钥。-
使用
user
的值指定username
键,该值在 Event Streams 服务实例中的服务凭证详细信息中列出。 对于 Event Streams 服务实例,此值为token
。 需要此密钥才能在 Code Engine Kafka 事件预订与 Kafka 消息代理之间进行认证。 -
使用
apikey
的值指定password
键,该值在 Event Streams 服务实例中的服务凭证详细信息中列出。 此密钥对于发送方样本是必需的,用于启用 Code Engine Kafka 事件预订与 Kafka 消息代理之间的通信。ibmcloud ce secret create --name kafka-subscription-secret --from-literal password=<value_of_apikey> --from-literal username=<value_of_user>
例如
ibmcloud ce secret create --name kafka-subscription-secret --from-literal password=abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh --from-literal username=token
-
-
用以下信息创建
kafka-sender-app
。-
指定
--image
选项以引用icr.io/codeengine/kafka-sender
容器映像。 此映像是从sender.go
构建的,可从 样本中获取 IBM Cloud Code Engine GitHub 存储库。 此样本发送方应用程序需要存储在kafka-subscription-secret
中的password
凭证,并且需要BROKERS
环境变量。 -
指定
--env-from-secret
选项以引用包含password
凭证的完整密钥kafka-subscription-secret
。 -
指定
--env
选项以添加字面值环境变量BROKERS
,并提供在 Event Streams 服务实例的服务凭证详细信息中列出的其中一个代理主机的名称。 但是,如果要指定多个代理主机名,请使用格式--env BROKERS-broker1,broker2,broker3
。 -
(可选) 指定
--min-scale=1
选项,以便应用程序始终具有正在运行的实例,并且不会缩放到零。 在查看日志时,将应用程序配置为始终具有正在运行的实例非常有用。 如果您正在生产环境中运行,请考虑保留应用程序的运行实例的成本,或者是否希望 Code Engine 自动缩放为零。 缺省情况下,应用程序在未使用时将缩放为零。ibmcloud ce app create --name kafka-sender-app --image icr.io/codeengine/kafka-sender --env-from-secret kafka-subscription-secret --env BROKERS=broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --min-scale 1
-
-
部署此应用程序后,运行
app get
命令以确认应用程序处于ready
状态。ibmcloud ce app get -n kafka-sender-app
您创建了 kafka-sender-app
应用程序以针对 Code Engine 事件预订生成 Kafka 消息,并创建了包含所需凭证的 kafka-subscription-secret
私钥。
设置 Code Engine Kafka 预订
要使 Code Engine 能够处理 Kafka 事件,请设置 Code Engine Kafka 事件预订以连接到 Kafka 事件代理并侦听 Kafka 事件。 此外,设置 Code Engine 应用程序以充当 Kafka 事件的接收方。 Kafka 事件预订定义 Kafka 生产者 (发送方) 与事件的使用者 (接收方) 之间的关系。
Code Engine Kafka 事件预订连接到 Kafka 消息代理,并将每个入局 Kafka 消息的 HTTP Post 请求发送到接收方应用程序。 有关 Kafka 事件随附的信息的更多信息,请参阅 传递到应用程序的事件的 HTTP 头和主体信息。
-
创建 Code Engine 应用程序以充当 Kafka 消息的事件使用者并接收 Kafka 事件。 例如,创建名为
kafka-receiver-app
的应用程序,该应用程序使用icr.io/codeengine/kafka-receiver
映像。 此映像是从receiver.go
构建的,可从 样本中获取 IBM Cloud Code Engine GitHub 存储库。 此样本不需要任何环境变量。 您可以选择指定--min-scale=1
选项,以便应用程序始终具有正在运行的实例,并且不会缩放到零。 在查看日志时,将应用程序配置为始终具有正在运行的实例非常有用。 如果您正在生产环境中运行,请考虑保留应用程序的运行实例的成本,或者是否希望 Code Engine 自动缩放为零。 缺省情况下,应用程序在未使用时将缩放为零。ibmcloud ce app create -n kafka-receiver-app --image icr.io/codeengine/kafka-receiver --min-scale 1
缺省情况下,事件将路由到目标应用程序的根 URL。 您可以使用
--path
选项将事件发送到应用程序中的其他目标。 例如,如果预订指定--path /event
,那么会将事件发送到https://<base application URL>/events
。 -
部署此应用程序后,运行
app get
命令以确认应用程序处于ready
状态。ibmcloud ce app get -n kafka-receiver-app
-
使用
ibmcloud ce sub kafka create
命令为 Kafka 事件创建 Code Engine Kafka 事件预订。 使用先前创建的kafka-subscription-secret
私钥来访问消息代理。 根据 Kafka 资源的服务凭证信息指定代理信息。 对于此示例,您可以从ibmcloud resource service-key myeventstream-key
命令的输出中获取代理信息。 请注意,您必须为主题的每个代理程序指定--broker
选项。--destination
选项指定接收事件的 Code Engine 资源。ibmcloud ce sub kafka create --name mykafkasubscription --destination kafka-receiver-app --secret kafka-subscription-secret --topic kafka-topic1 --broker broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
-
显示Kafka事件订阅的详细信息。
ibmcloud ce sub kafka get -n mykafkasubscription
示例输出
Getting Kafka event subscription 'mykafkasubscription'... OK Name: mykafkasubscription [...] Destination Type: app Destination: kafka-receiver-app Brokers: broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 Consumer Group: knative-kafka-source-a4072fe1-1dfa-4470-9d07-bf7a0ff8e340 Topics: kafka-topic1 Secret key reference (user): kafka-subscription-secret.username Secret key reference (password): kafka-subscription-secret.password Ready: true Conditions: Type OK Age Reason ConnectionEstablished true 24s InitialOffsetsCommitted true 24s Ready true 24s Scheduled true 24s SinkProvided true 24s Events: Type Reason Age Source Messages Normal FinalizerUpdate 26s kafkasource-controller Updated "mykafkasubscription" finalizers
测试预订
现在,已创建引用 kafka-receiver-app
应用程序的 Kafka 事件预订,请使用 kafka-sender-app
将消息事件发送到接收方应用程序。
-
通过使用带有
--output url
选项的ibmcloud ce app get
命令来查找应用程序的 URL,获取目标应用程序的公共 URLkafka-sender-app
。ibmcloud ce app get -n kafka-sender-app --output url
示例输出
https://kafka-sender-app.abcdabcdabc.us-south.codeengine.appdomain.cloud
-
运行 Kafka 事件生产者应用程序
kafka-sender-app
以将事件发送到目标 Code Engine 应用程序。 使用curl
调用kafka-sender-app
应用程序,并指定主题的值和消息数。 使用ibmcloud ce app get
命令的输出来查找应用程序的公共 URL。 请确保将该值用引号括起来,以确保将其视为单个字符串。 例如curl "<public_URL_of_Kafka_sender_app>?topic=<your_topic_name>&num=<number_of_messages_to_produce>"
例如
curl "https://kafka-sender-app.abcdabcdabc.us-south.codeengine.appdomain.cloud?topic=kafka-topic1&num=1"
-
查看日志中的事件。 使用有效的代理程序,主题和访问私钥创建 Kafka 事件预订时,如果您有 Kafka 应用程序 (例如
kafka-sender-app
) 生成有关该主题的消息,那么您可以在日志中看到接收 Kafka 消息的目标 Code Engine 应用程序 (例如kafka-receiver-app
) 的事件。 使用 Kafka 接收方应用程序 (icr.io/codeengine/kafka-receiver
) 时,请在日志中搜索接收方应用程序的Event data
以查看接收到的消息。ibmcloud ce app logs -n kafka-receiver-app
示例输出
Getting logs for all instances of application 'kafka-receiver-app'... OK kafka-receiver-app-00001-deployment-66976f7988-9xttm/user-container: 2022/03/31 22:19:45 Listening on port 8080 2022/03/31 22:19:46 ---------- 2022/03/31 22:19:46 Path: / 2022/03/31 22:19:46 Header: Accept-Encoding=[gzip] 2022/03/31 22:19:46 Header: Ce-Id=[partition:0/offset:167] 2022/03/31 22:19:46 Header: Ce-Source=[/apis/v1/namespaces/glxo4k7nj7d/kafkasources/mykafkasubscription#kafka-topic1] 2022/03/31 22:19:46 Header: Ce-Specversion=[1.0] 2022/03/31 22:19:46 Header: Ce-Subject=[partition:0#167] 2022/03/31 22:19:46 Header: Ce-Time=[2022-03-31T22:19:36.499Z] 2022/03/31 22:19:46 Header: Ce-Type=[dev.knative.kafka.event] 2022/03/31 22:19:46 Header: Content-Length=[8] 2022/03/31 22:19:46 Header: Forwarded=[for=172.30.208.213;proto=http, for=127.0.0.6] 2022/03/31 22:19:46 Header: K-Proxy-Request=[activator] 2022/03/31 22:19:46 Header: Traceparent=[00-b033708685c715a7c2384cdf05797785-65540b0937e9b0ce-00] 2022/03/31 22:19:46 Header: User-Agent=[Go-http-client/1.1] 2022/03/31 22:19:46 Header: X-B3-Parentspanid=[e1a785d7fdbead6c] 2022/03/31 22:19:46 Header: X-B3-Sampled=[1] 2022/03/31 22:19:46 Header: X-B3-Spanid=[abcde9901e6bf83f] 2022/03/31 22:19:46 Header: X-B3-Traceid=[abcde490a426573772fa0bf60caf5ddb] 2022/03/31 22:19:46 Header: X-Envoy-Attempt-Count=[1] 2022/03/31 22:19:46 Header: X-Forwarded-For=[172.30.208.213, 127.0.0.6, 127.0.0.6] 2022/03/31 22:19:46 Header: X-Forwarded-Proto=[http] 2022/03/31 22:19:46 Header: X-Request-Id=[abcdeb4e-c5ac-abcd-abcd-60e6278abcde] 2022/03/31 22:19:46 Event data: test1: 1
请注意,应用程序的日志信息仅持续一个小时。 有关查看应用程序 (或作业) 日志的更多信息,请参阅 查看日志。
更新预订
要使用 CLI 更新事件预订,请使用 ibmcloud ce subscription kafka update
命令。 以下示例更新主题名称。
ibmcloud ce sub kafka update -n mykafkasubscription --topic kafka-topic2
您可以使用 ibmcloud ce subscription kafka update
命令来更新 Kafka 预订的值。 但是,不能使用此命令修改使用者组的值。 如果要更新预订以引用其他主题,请先确保 Kafka 主题存在,然后再更新预订。
清除 Kafka 预订教程
是否准备好删除 Kafka 预订,发送和接收应用程序以及私钥? 您可以使用 ibmcloud ce app delete
,ibmcloud ce sub kafka delete
和 ibmcloud ce sub kafka delete
命令。 您可以选择使用 -f
选项来强制删除组件,而无需确认。
删除 Kafka 预订时,该删除不会删除该预订所引用的应用程序。
要除去预订,
ibmcloud ce sub kafka delete --name mykafkasubscription -f
要除去 Kafka 消息接收应用程序,
ibmcloud ce app delete --name kafka-receiver-app -f
同样,您可以除去 kafka-sender-app
。
ibmcloud ce app delete --name kafka-sender-app -f
要除去 kafka-subscription-secret
,
ibmcloud ce secret delete --name kafka-subscription-secret -f
准备好删除 Event Streams 服务实例吗? --recursive
选项指定除去服务实例 (包括关联的服务密钥) 的所有资源。
ibmcloud resource service-instance-delete myeventstream --recursive -f
后续步骤
有关使用 Kafka 事件预订的更多信息,请参阅 使用 Kafka 事件生产者。
要查找更多代码示例吗? 请查看 IBM Cloud Code Engine GitHub 存储库的样本。