Suscripción a eventos Kafka
Con este tutorial, usted puede aprender cómo suscribirse a eventos Kafka utilizando el IBM Cloud® Code Engine CLI.
En entornos distribuidos, a menudo deseará que las aplicaciones o trabajos reaccionen a los mensajes (sucesos) que se generan en otros componentes, normalmente llamados generadores de sucesos. Con Code Engine, las aplicaciones o los trabajos pueden recibir sucesos de interés suscribiéndose a productores de sucesos. La información de sucesos se recibe como solicitudes POST HTTP para aplicaciones y como variables de entorno para trabajos.
El generador de sucesos de Kafka observa si aparecen mensajes nuevos en una instancia de Kafka. Cuando crea una suscripción de Code Engine Kafka para un conjunto de temas, la app o el trabajo recibe un suceso independiente para cada mensaje nuevo que aparece en uno de los temas.
Aunque puede utilizar cualquier instancia de Kafka, los ejemplos de esta guía de aprendizaje utilizan el servicio IBM® Event Streams for IBM Cloud®. Event Streams es un servicio de streaming de sucesos de IBM para sucesos de Kafka. Para obtener más información sobre este servicio, consulte la documentación deEvent Streams.
Antes de empezar
- Configure su entorno de CLI de Code Engine.
- Configure la CLI de Event Streams.
- Cree y trabaje con un proyecto.
Todos los usuarios de Code Engine están obligados a tener una cuenta de pago por uso. Las guías de aprendizaje pueden incurrir en costes. Utilice el Estimador de costes para generar una estimación de costes basada en el uso previsto. Para obtener más información, consulte Precios deCode Engine.
Configuración del productor de eventos Kafka
Puede configurar el productor de mensajes Kafka para enviar mensajes a las suscripciones de sucesos de Code Engine Kafka. Utilice la suscripción de suceso Code Engine Kafka para desencadenar aplicaciones o trabajos cuando se reciba un mensaje Kafka.
Para empezar, cree una instancia de servicio de Event Streams para el servicio de streaming de sucesos. Aunque puede utilizar la consola o la CLI, los pasos siguientes describen cómo configurar el generador de sucesos de Event Streams con la CLI.
-
Cree una instancia de servicio para Event Streams. El nombre del servicio CLI Event Streams es
messagehub
. Para este ejemplo, cree una instancia de servicio de Event Streams denominadamyeventstream
.ibmcloud resource service-instance-create myeventstream messagehub lite us-south
-
Cree una clave de servicio para proporcionar credenciales a la instancia de servicio.
ibmcloud resource service-key-create myeventstream-key Manager --instance-name myeventstream
Salida de ejemplo
Creating service key of service instance myeventstream under account <user_account>... OK Service key crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd was created. Name: myeventstream-key ID: crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd Created At: Mon Mar 21 18:36:09 UTC 2022 State: active Credentials: api_key: abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh apikey: abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh iam_apikey_description: Auto-generated for key crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd iam_apikey_name: myeventstream-key iam_role_crn: crn:v1:bluemix:public:iam::::serviceRole:Manager iam_serviceid_crn: crn:v1:bluemix:public:iam-identity::a/e43abfcbd191404cb17ef650e9681dd3::serviceid:ServiceId-3e99caa5-b174-4f04-9845-5c5d783b8bc7 instance_id: c0736069-3f4a-438a-b614-6846877d692d kafka_admin_url: https://abcdabcdabcdabcd.svc07.us-south.eventstreams.cloud.ibm.com kafka_brokers_sasl: [broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093] kafka_http_url: https://abcdabcdabcdabcd.svc07.us-south.eventstreams.cloud.ibm.com password: abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh user: token
Anote los valores de
user
,password
ykafka-brokers_sasl
para la clave de servicio. Necesita esta información cuando configura la suscripción de Code Engine Kafka. Los valores depassword
yapikey
son los mismos en la clave de servicio para la instancia de servicio de Event Streams. También puede utilizar el mandatoibmcloud resource service-key myeventstream-key
para recuperar la información de clave de servicio. -
Inicialice el plugin Event Streams relativo a la instancia de servicio de Event Streams.
ibmcloud es init --instance-name myeventstream
-
Crea un tema Event Streams.
ibmcloud es topic-create kafka-topic1
Configuración de una app de ejemplo Code Engine para generar mensajes Kafka
Para esta guía de aprendizaje, configure una aplicación Code Engine para que actúe como productor de sucesos de mensajes Kafka. La finalidad de esta aplicación kafka-sender-app
es conectarse a la instancia de Event Streams y producir
(enviar) mensajes Kafka a un receptor de los mensajes (consumidorKafka ). Esta aplicación que genera sucesos para mensajes Kafka utiliza la Code Engine Kafka para enviar mensajes Kafka. Esta imagen de remitente de ejemplo requiere la variable de entorno BROKERS
y un secreto que incluya las credenciales de password
.
-
Cree un secreto con credenciales que sean necesarias para los ejemplos de Code Engine Kafka. Por ejemplo, cree el secreto
kafka-subscription-secret
, para que contenga las credenciales necesarias para la app de ejemplo del remitente Kafka y la suscripción de sucesos Kafka, que utiliza el ejemplo del destinatario Kafka. Estas credenciales son necesarias para la app de remitente Kafka de ejemplo y la suscripción de suceso Code Engine Kafka para comunicarse con la instancia de servicio para Event Streams. Aunque no es necesario que cree este secreto antes de crear la app del remitente Kafka y la suscripción de sucesos, esta acción simplifica los pasos necesarios.Para crear el secreto
kafka-subscription-secret
, añada una variable de entorno literal parapassword
yusername
. Para obtener más información, consulte crear un secreto con la CLI.-
Especifique la clave
username
con el valor deuser
que se lista en los detalles de las credenciales de servicio en la instancia de servicio de Event Streams. Para la instancia de servicio de Event Streams, este valor estoken
. Esta clave es necesaria para la autenticación entre la suscripción de sucesos Code Engine Kafka y el intermediario de mensajes Kafka. -
Especifique la clave
password
con el valor deapikey
que se lista en los detalles de las credenciales de servicio en la instancia de servicio de Event Streams. Esta clave es necesaria para el ejemplo del remitente y para habilitar las comunicaciones entre la suscripción de sucesos Code Engine Kafka y el intermediario de mensajes Kafka.ibmcloud ce secret create --name kafka-subscription-secret --from-literal password=<value_of_apikey> --from-literal username=<value_of_user>
Por ejemplo:
ibmcloud ce secret create --name kafka-subscription-secret --from-literal password=abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh --from-literal username=token
-
-
Cree el
kafka-sender-app
con la siguiente información.-
Especifique la opción
--image
para hacer referencia a la imagen de contenedoricr.io/codeengine/kafka-sender
. Esta imagen se crea a partir desender.go
, que está disponible en el GitHub de Samples for IBM Cloud Code Engine. Esta app de remitente de ejemplo requiere las credenciales depassword
que están almacenadas enkafka-subscription-secret
y requiere la variable de entornoBROKERS
. -
Especifique la opción
--env-from-secret
para hacer referencia al secreto completo,kafka-subscription-secret
, que contiene las credenciales depassword
. -
Especifique la opción
--env
para añadir una variable de entorno literal,BROKERS
, y proporcione el nombre de uno de los hosts de intermediarios listados en los detalles de las credenciales de servicio en la instancia de servicio de Event Streams. Sin embargo, si desea especificar más de un nombre de host de intermediario, utilice el formato--env BROKERS-broker1,broker2,broker3
. -
(opcional) Especifique la opción
--min-scale=1
para que la aplicación siempre tenga una instancia en ejecución y no se escale a cero. La configuración de la app para que siempre haya una instancia de ejecución es útil cuando se ven los registros. Si está ejecutando en un entorno de producción, tenga en cuenta el coste de mantener una instancia en ejecución de la app o si desea que Code Engine se escale automáticamente a cero. De forma predeterminada, la aplicación se escala a cero cuando no se utiliza.ibmcloud ce app create --name kafka-sender-app --image icr.io/codeengine/kafka-sender --env-from-secret kafka-subscription-secret --env BROKERS=broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --min-scale 1
-
-
Después de desplegar esta app, ejecute el mandato
app get
para confirmar que la app está en estadoready
.ibmcloud ce app get -n kafka-sender-app
Ha creado la aplicación kafka-sender-app
para producir mensajes Kafka para suscripciones de sucesos de Code Engine y ha creado el secreto kafka-subscription-secret
que contiene las credenciales necesarias.
Configuración de una suscripción de Code Engine Kafka
Para que Code Engine funcione con sucesos de Kafka, configure una suscripción de sucesos de Code Engine Kafka para conectarse a intermediarios de sucesos de Kafka y escuchar sucesos de Kafka. Además, configure una aplicación Code Engine para que actúe como receptor de los sucesos de Kafka. La suscripción de sucesos de Kafka define la relación entre el productor (remitente) y el consumidor (destinatario) de sucesos de Kafka.
La suscripción de suceso Code Engine Kafka se conecta al intermediario de mensajes Kafka y envía solicitudes HTTP Post para cada mensaje Kafka entrante a la aplicación receptora. Para obtener más información sobre la información que se incluye con los sucesos de Kafka, consulte Cabeceras HTTP e información de cuerpo para sucesos que se entregan a apps.
-
Cree una aplicación Code Engine para que actúe como consumidor de sucesos de mensajes Kafka y reciba los sucesos Kafka. Por ejemplo, cree una aplicación denominada
kafka-receiver-app
que utilice la imagenicr.io/codeengine/kafka-receiver
. Esta imagen se crea a partir dereceiver.go
, que está disponible en el GitHub de Samples for IBM Cloud Code Engine. Este ejemplo no requiere ninguna variable de entorno. Puede especificar opcionalmente la opción--min-scale=1
, de forma que la aplicación siempre tenga una instancia en ejecución y no se escale a cero. La configuración de la app para que siempre haya una instancia de ejecución es útil cuando se ven los registros. Si está ejecutando en un entorno de producción, tenga en cuenta el coste de mantener una instancia en ejecución de la app o si desea que Code Engine se escale automáticamente a cero. De forma predeterminada, la aplicación se escala a cero cuando no se utiliza.ibmcloud ce app create -n kafka-receiver-app --image icr.io/codeengine/kafka-receiver --min-scale 1
De forma predeterminada, los sucesos se direccionan al URL raíz de la aplicación de destino. Puede enviar sucesos a un destino diferente dentro de la app utilizando la opción
--path
. Por ejemplo, si la suscripción indica--path /event
, el suceso se envía ahttps://<base application URL>/events
. -
Después de desplegar esta app, ejecute el mandato
app get
para confirmar que la app está en estadoready
.ibmcloud ce app get -n kafka-receiver-app
-
Cree una suscripción de suceso de Code Engine Kafka para los sucesos de Kafka utilizando el mandato
ibmcloud ce sub kafka create
. Utilice el secreto dekafka-subscription-secret
que ha creado anteriormente para acceder al intermediario de mensajes. Especifique la información de intermediario basada en la información de credenciales de servicio para el recurso Kafka. Para este ejemplo, puede obtener la información de intermediario de la salida del mandatoibmcloud resource service-key myeventstream-key
. Tenga en cuenta que debe especificar una opción--broker
para cada intermediario del tema. La opción--destination
especifica el recurso Code Engine que recibe los sucesos.ibmcloud ce sub kafka create --name mykafkasubscription --destination kafka-receiver-app --secret kafka-subscription-secret --topic kafka-topic1 --broker broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
-
Muestra los detalles de la suscripción a eventos Kafka.
ibmcloud ce sub kafka get -n mykafkasubscription
Salida de ejemplo
Getting Kafka event subscription 'mykafkasubscription'... OK Name: mykafkasubscription [...] Destination Type: app Destination: kafka-receiver-app Brokers: broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 Consumer Group: knative-kafka-source-a4072fe1-1dfa-4470-9d07-bf7a0ff8e340 Topics: kafka-topic1 Secret key reference (user): kafka-subscription-secret.username Secret key reference (password): kafka-subscription-secret.password Ready: true Conditions: Type OK Age Reason ConnectionEstablished true 24s InitialOffsetsCommitted true 24s Ready true 24s Scheduled true 24s SinkProvided true 24s Events: Type Reason Age Source Messages Normal FinalizerUpdate 26s kafkasource-controller Updated "mykafkasubscription" finalizers
Prueba de la suscripción
Ahora que se ha creado la suscripción de sucesos de Kafka, que hace referencia a la aplicación kafka-receiver-app
, utilice kafka-sender-app
para enviar sucesos de mensaje a la aplicación receptora.
-
Obtenga el URL público de la app de destino,
kafka-sender-app
utilizando el mandatoibmcloud ce app get
con la opción--output url
para buscar el URL de la app.ibmcloud ce app get -n kafka-sender-app --output url
Salida de ejemplo
https://kafka-sender-app.abcdabcdabc.us-south.codeengine.appdomain.cloud
-
Ejecute la aplicación de generador de sucesos Kafka,
kafka-sender-app
para enviar sucesos a la aplicación Code Engine de destino. Llame a la aplicaciónkafka-sender-app
concurl
y especifique valores para el tema y el número de mensajes. Utilice la salida del comandoibmcloud ce app get
para encontrar la URL pública de su aplicación. Asegúrese de ajustar el valor a curl entre comillas para asegurarse de que se trata como una sola serie. Por ejemplo:curl "<public_URL_of_Kafka_sender_app>?topic=<your_topic_name>&num=<number_of_messages_to_produce>"
Por ejemplo:
curl "https://kafka-sender-app.abcdabcdabc.us-south.codeengine.appdomain.cloud?topic=kafka-topic1&num=1"
-
Ver sucesos en registros. Cuando se crea la suscripción de sucesos de Kafka con un intermediario, temas y un secreto de acceso que son válidos, y tiene una aplicación Kafka que genera mensajes sobre ese tema (como
kafka-sender-app
), puede ver sucesos en los registros para la aplicación Code Engine de destino que recibe mensajes Kafka, comokafka-receiver-app
. Cuando utilice la aplicación receptora Kafka (icr.io/codeengine/kafka-receiver
), busqueEvent data
en los registros de la aplicación receptora para ver los mensajes recibidos.ibmcloud ce app logs -n kafka-receiver-app
Salida de ejemplo
Getting logs for all instances of application 'kafka-receiver-app'... OK kafka-receiver-app-00001-deployment-66976f7988-9xttm/user-container: 2022/03/31 22:19:45 Listening on port 8080 2022/03/31 22:19:46 ---------- 2022/03/31 22:19:46 Path: / 2022/03/31 22:19:46 Header: Accept-Encoding=[gzip] 2022/03/31 22:19:46 Header: Ce-Id=[partition:0/offset:167] 2022/03/31 22:19:46 Header: Ce-Source=[/apis/v1/namespaces/glxo4k7nj7d/kafkasources/mykafkasubscription#kafka-topic1] 2022/03/31 22:19:46 Header: Ce-Specversion=[1.0] 2022/03/31 22:19:46 Header: Ce-Subject=[partition:0#167] 2022/03/31 22:19:46 Header: Ce-Time=[2022-03-31T22:19:36.499Z] 2022/03/31 22:19:46 Header: Ce-Type=[dev.knative.kafka.event] 2022/03/31 22:19:46 Header: Content-Length=[8] 2022/03/31 22:19:46 Header: Forwarded=[for=172.30.208.213;proto=http, for=127.0.0.6] 2022/03/31 22:19:46 Header: K-Proxy-Request=[activator] 2022/03/31 22:19:46 Header: Traceparent=[00-b033708685c715a7c2384cdf05797785-65540b0937e9b0ce-00] 2022/03/31 22:19:46 Header: User-Agent=[Go-http-client/1.1] 2022/03/31 22:19:46 Header: X-B3-Parentspanid=[e1a785d7fdbead6c] 2022/03/31 22:19:46 Header: X-B3-Sampled=[1] 2022/03/31 22:19:46 Header: X-B3-Spanid=[abcde9901e6bf83f] 2022/03/31 22:19:46 Header: X-B3-Traceid=[abcde490a426573772fa0bf60caf5ddb] 2022/03/31 22:19:46 Header: X-Envoy-Attempt-Count=[1] 2022/03/31 22:19:46 Header: X-Forwarded-For=[172.30.208.213, 127.0.0.6, 127.0.0.6] 2022/03/31 22:19:46 Header: X-Forwarded-Proto=[http] 2022/03/31 22:19:46 Header: X-Request-Id=[abcdeb4e-c5ac-abcd-abcd-60e6278abcde] 2022/03/31 22:19:46 Event data: test1: 1
Ten en cuenta que la información de registro de las aplicaciones sólo dura una hora. Para obtener más información sobre la visualización de registros para apps (o trabajos), consulte Visualización de registros.
Actualización de la suscripción
Para actualizar una suscripción de suceso con la CLI, utilice el mandato ibmcloud ce subscription kafka update
. El ejemplo siguiente
actualiza el nombre del tema.
ibmcloud ce sub kafka update -n mykafkasubscription --topic kafka-topic2
Puede utilizar el mandato ibmcloud ce subscription kafka update
para actualizar los valores de la suscripción Kafka. Sin embargo,
no puede modificar el valor del grupo de consumidores con este mandato. Si desea actualizar la suscripción para que haga referencia a un tema diferente, asegúrese de que el tema Kafka existe antes de actualizar la suscripción.
Tutorial de limpieza para la suscripción a Kafka
¿Está preparado para suprimir la suscripción de Kafka, las aplicaciones de envío y recepción y el secreto? Puede utilizar los mandatos ibmcloud ce app delete
,
ibmcloud ce sub kafka delete
y ibmcloud ce sub kafka delete
.
Opcionalmente, puede utilizar la opción -f
para forzar la eliminación del componente sin confirmación.
Cuando suprime la suscripción Kafka, la supresión no suprime la aplicación a la que hace referencia la suscripción.
Para eliminar la suscripción,
ibmcloud ce sub kafka delete --name mykafkasubscription -f
Para eliminar la aplicación de recepción de mensajes Kafka,
ibmcloud ce app delete --name kafka-receiver-app -f
De forma similar, puede eliminar el kafka-sender-app
.
ibmcloud ce app delete --name kafka-sender-app -f
Para eliminar el kafka-subscription-secret
,
ibmcloud ce secret delete --name kafka-subscription-secret -f
¿Está preparado para suprimir la instancia de servicio de Event Streams ? La opción --recursive
especifica que se eliminen todos los recursos de la instancia de servicio, que incluye la clave de servicio asociada.
ibmcloud resource service-instance-delete myeventstream --recursive -f
Próximos pasos
Para obtener más información sobre cómo trabajar con suscripciones de sucesos de Kafka, consulte Trabajar con el productor de sucesos de Kafka.
¿Desea ver más ejemplos de código? Consulte los ejemplos de para IBM Cloud Code Engine GitHub.