IBM Cloud Docs
Suscripción a eventos Kafka

Suscripción a eventos Kafka

Con este tutorial, usted puede aprender cómo suscribirse a eventos Kafka utilizando el IBM Cloud® Code Engine CLI.

En entornos distribuidos, a menudo deseará que las aplicaciones o trabajos reaccionen a los mensajes (sucesos) que se generan en otros componentes, normalmente llamados generadores de sucesos. Con Code Engine, las aplicaciones o los trabajos pueden recibir sucesos de interés suscribiéndose a productores de sucesos. La información de sucesos se recibe como solicitudes POST HTTP para aplicaciones y como variables de entorno para trabajos.

El generador de sucesos de Kafka observa si aparecen mensajes nuevos en una instancia de Kafka. Cuando crea una suscripción de Code Engine Kafka para un conjunto de temas, la app o el trabajo recibe un suceso independiente para cada mensaje nuevo que aparece en uno de los temas.

Aunque puede utilizar cualquier instancia de Kafka, los ejemplos de esta guía de aprendizaje utilizan el servicio IBM® Event Streams for IBM Cloud®. Event Streams es un servicio de streaming de sucesos de IBM para sucesos de Kafka. Para obtener más información sobre este servicio, consulte la documentación deEvent Streams.

Antes de empezar

Todos los usuarios de Code Engine están obligados a tener una cuenta de pago por uso. Las guías de aprendizaje pueden incurrir en costes. Utilice el Estimador de costes para generar una estimación de costes basada en el uso previsto. Para obtener más información, consulte Precios deCode Engine.

Configuración del productor de eventos Kafka

Puede configurar el productor de mensajes Kafka para enviar mensajes a las suscripciones de sucesos de Code Engine Kafka. Utilice la suscripción de suceso Code Engine Kafka para desencadenar aplicaciones o trabajos cuando se reciba un mensaje Kafka.

Para empezar, cree una instancia de servicio de Event Streams para el servicio de streaming de sucesos. Aunque puede utilizar la consola o la CLI, los pasos siguientes describen cómo configurar el generador de sucesos de Event Streams con la CLI.

  1. Cree una instancia de servicio para Event Streams. El nombre del servicio CLI Event Streams es messagehub. Para este ejemplo, cree una instancia de servicio de Event Streams denominada myeventstream.

    ibmcloud resource service-instance-create myeventstream messagehub lite us-south
    
  2. Cree una clave de servicio para proporcionar credenciales a la instancia de servicio.

    ibmcloud resource service-key-create myeventstream-key Manager --instance-name myeventstream
    

    Salida de ejemplo

    Creating service key of service instance myeventstream under account <user_account>...
    OK
    Service key crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd was created.
    
    Name:          myeventstream-key
    ID:            crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd
    Created At:    Mon Mar 21 18:36:09 UTC 2022
    State:         active
    Credentials:
                api_key:                  abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh
                apikey:                   abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh
                iam_apikey_description:   Auto-generated for key crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd
                iam_apikey_name:          myeventstream-key
                iam_role_crn:             crn:v1:bluemix:public:iam::::serviceRole:Manager
                iam_serviceid_crn:        crn:v1:bluemix:public:iam-identity::a/e43abfcbd191404cb17ef650e9681dd3::serviceid:ServiceId-3e99caa5-b174-4f04-9845-5c5d783b8bc7
                instance_id:              c0736069-3f4a-438a-b614-6846877d692d
                kafka_admin_url:          https://abcdabcdabcdabcd.svc07.us-south.eventstreams.cloud.ibm.com
                kafka_brokers_sasl:       [broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093]
                kafka_http_url:           https://abcdabcdabcdabcd.svc07.us-south.eventstreams.cloud.ibm.com
                password:                 abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh
                user:                     token
    

    Anote los valores de user, password y kafka-brokers_sasl para la clave de servicio. Necesita esta información cuando configura la suscripción de Code Engine Kafka. Los valores de password y apikey son los mismos en la clave de servicio para la instancia de servicio de Event Streams. También puede utilizar el mandato ibmcloud resource service-key myeventstream-key para recuperar la información de clave de servicio.

  3. Inicialice el plugin Event Streams relativo a la instancia de servicio de Event Streams.

    ibmcloud es init --instance-name myeventstream
    
  4. Crea un tema Event Streams.

    ibmcloud es topic-create kafka-topic1
    

Configuración de una app de ejemplo Code Engine para generar mensajes Kafka

Para esta guía de aprendizaje, configure una aplicación Code Engine para que actúe como productor de sucesos de mensajes Kafka. La finalidad de esta aplicación kafka-sender-app es conectarse a la instancia de Event Streams y producir (enviar) mensajes Kafka a un receptor de los mensajes (consumidorKafka ). Esta aplicación que genera sucesos para mensajes Kafka utiliza la Code Engine Kafka para enviar mensajes Kafka. Esta imagen de remitente de ejemplo requiere la variable de entorno BROKERS y un secreto que incluya las credenciales de password.

  1. Cree un secreto con credenciales que sean necesarias para los ejemplos de Code Engine Kafka. Por ejemplo, cree el secreto kafka-subscription-secret, para que contenga las credenciales necesarias para la app de ejemplo del remitente Kafka y la suscripción de sucesos Kafka, que utiliza el ejemplo del destinatario Kafka. Estas credenciales son necesarias para la app de remitente Kafka de ejemplo y la suscripción de suceso Code Engine Kafka para comunicarse con la instancia de servicio para Event Streams. Aunque no es necesario que cree este secreto antes de crear la app del remitente Kafka y la suscripción de sucesos, esta acción simplifica los pasos necesarios.

    Para crear el secreto kafka-subscription-secret, añada una variable de entorno literal para password y username. Para obtener más información, consulte crear un secreto con la CLI.

    • Especifique la clave username con el valor de user que se lista en los detalles de las credenciales de servicio en la instancia de servicio de Event Streams. Para la instancia de servicio de Event Streams, este valor es token. Esta clave es necesaria para la autenticación entre la suscripción de sucesos Code Engine Kafka y el intermediario de mensajes Kafka.

    • Especifique la clave password con el valor de apikey que se lista en los detalles de las credenciales de servicio en la instancia de servicio de Event Streams. Esta clave es necesaria para el ejemplo del remitente y para habilitar las comunicaciones entre la suscripción de sucesos Code Engine Kafka y el intermediario de mensajes Kafka.

      ibmcloud ce secret create --name kafka-subscription-secret --from-literal password=<value_of_apikey> --from-literal username=<value_of_user>
      

      Por ejemplo:

      ibmcloud ce secret create --name kafka-subscription-secret --from-literal password=abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh --from-literal username=token
      
  2. Cree el kafka-sender-app con la siguiente información.

    • Especifique la opción --image para hacer referencia a la imagen de contenedor icr.io/codeengine/kafka-sender. Esta imagen se crea a partir de sender.go, que está disponible en el GitHub de Samples for IBM Cloud Code Engine. Esta app de remitente de ejemplo requiere las credenciales de password que están almacenadas en kafka-subscription-secret y requiere la variable de entorno BROKERS.

    • Especifique la opción --env-from-secret para hacer referencia al secreto completo, kafka-subscription-secret, que contiene las credenciales de password.

    • Especifique la opción --env para añadir una variable de entorno literal, BROKERS, y proporcione el nombre de uno de los hosts de intermediarios listados en los detalles de las credenciales de servicio en la instancia de servicio de Event Streams. Sin embargo, si desea especificar más de un nombre de host de intermediario, utilice el formato --env BROKERS-broker1,broker2,broker3.

    • (opcional) Especifique la opción --min-scale=1 para que la aplicación siempre tenga una instancia en ejecución y no se escale a cero. La configuración de la app para que siempre haya una instancia de ejecución es útil cuando se ven los registros. Si está ejecutando en un entorno de producción, tenga en cuenta el coste de mantener una instancia en ejecución de la app o si desea que Code Engine se escale automáticamente a cero. De forma predeterminada, la aplicación se escala a cero cuando no se utiliza.

      ibmcloud ce app create --name kafka-sender-app --image icr.io/codeengine/kafka-sender --env-from-secret kafka-subscription-secret --env BROKERS=broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --min-scale 1
      
  3. Después de desplegar esta app, ejecute el mandato app get para confirmar que la app está en estado ready.

    ibmcloud ce app get -n kafka-sender-app
    

Ha creado la aplicación kafka-sender-app para producir mensajes Kafka para suscripciones de sucesos de Code Engine y ha creado el secreto kafka-subscription-secret que contiene las credenciales necesarias.

Configuración de una suscripción de Code Engine Kafka

Para que Code Engine funcione con sucesos de Kafka, configure una suscripción de sucesos de Code Engine Kafka para conectarse a intermediarios de sucesos de Kafka y escuchar sucesos de Kafka. Además, configure una aplicación Code Engine para que actúe como receptor de los sucesos de Kafka. La suscripción de sucesos de Kafka define la relación entre el productor (remitente) y el consumidor (destinatario) de sucesos de Kafka.

La suscripción de suceso Code Engine Kafka se conecta al intermediario de mensajes Kafka y envía solicitudes HTTP Post para cada mensaje Kafka entrante a la aplicación receptora. Para obtener más información sobre la información que se incluye con los sucesos de Kafka, consulte Cabeceras HTTP e información de cuerpo para sucesos que se entregan a apps.

  1. Cree una aplicación Code Engine para que actúe como consumidor de sucesos de mensajes Kafka y reciba los sucesos Kafka. Por ejemplo, cree una aplicación denominada kafka-receiver-app que utilice la imagen icr.io/codeengine/kafka-receiver. Esta imagen se crea a partir de receiver.go, que está disponible en el GitHub de Samples for IBM Cloud Code Engine. Este ejemplo no requiere ninguna variable de entorno. Puede especificar opcionalmente la opción --min-scale=1, de forma que la aplicación siempre tenga una instancia en ejecución y no se escale a cero. La configuración de la app para que siempre haya una instancia de ejecución es útil cuando se ven los registros. Si está ejecutando en un entorno de producción, tenga en cuenta el coste de mantener una instancia en ejecución de la app o si desea que Code Engine se escale automáticamente a cero. De forma predeterminada, la aplicación se escala a cero cuando no se utiliza.

    ibmcloud ce app create -n kafka-receiver-app --image icr.io/codeengine/kafka-receiver --min-scale 1
    

    De forma predeterminada, los sucesos se direccionan al URL raíz de la aplicación de destino. Puede enviar sucesos a un destino diferente dentro de la app utilizando la opción --path. Por ejemplo, si la suscripción indica --path /event, el suceso se envía a https://<base application URL>/events.

  2. Después de desplegar esta app, ejecute el mandato app get para confirmar que la app está en estado ready.

    ibmcloud ce app get -n kafka-receiver-app
    
  3. Cree una suscripción de suceso de Code Engine Kafka para los sucesos de Kafka utilizando el mandato ibmcloud ce sub kafka create. Utilice el secreto de kafka-subscription-secret que ha creado anteriormente para acceder al intermediario de mensajes. Especifique la información de intermediario basada en la información de credenciales de servicio para el recurso Kafka. Para este ejemplo, puede obtener la información de intermediario de la salida del mandato ibmcloud resource service-key myeventstream-key. Tenga en cuenta que debe especificar una opción --broker para cada intermediario del tema. La opción --destination especifica el recurso Code Engine que recibe los sucesos.

    ibmcloud ce sub kafka create --name mykafkasubscription --destination kafka-receiver-app --secret kafka-subscription-secret --topic kafka-topic1 --broker broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker  broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    
  4. Muestra los detalles de la suscripción a eventos Kafka.

    ibmcloud ce sub kafka get -n mykafkasubscription
    

    Salida de ejemplo

    Getting Kafka event subscription 'mykafkasubscription'...
    OK
    
    Name:          mykafkasubscription
    [...]
    Destination Type:                 app
    Destination:                      kafka-receiver-app
    Brokers:
    broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    Consumer Group:                   knative-kafka-source-a4072fe1-1dfa-4470-9d07-bf7a0ff8e340
    Topics:
    kafka-topic1
    Secret key reference (user):      kafka-subscription-secret.username
    Secret key reference (password):  kafka-subscription-secret.password
    Ready:                            true
    
    Conditions:
    Type                     OK    Age  Reason
    ConnectionEstablished    true  24s
    InitialOffsetsCommitted  true  24s
    Ready                    true  24s
    Scheduled                true  24s
    SinkProvided             true  24s
    
    Events:
    Type     Reason           Age  Source                  Messages
    Normal   FinalizerUpdate  26s  kafkasource-controller  Updated "mykafkasubscription" finalizers
    

Prueba de la suscripción

Ahora que se ha creado la suscripción de sucesos de Kafka, que hace referencia a la aplicación kafka-receiver-app, utilice kafka-sender-app para enviar sucesos de mensaje a la aplicación receptora.

  1. Obtenga el URL público de la app de destino, kafka-sender-app utilizando el mandato ibmcloud ce app get con la opción --output url para buscar el URL de la app.

    ibmcloud ce app get -n kafka-sender-app --output url
    

    Salida de ejemplo

    https://kafka-sender-app.abcdabcdabc.us-south.codeengine.appdomain.cloud
    
  2. Ejecute la aplicación de generador de sucesos Kafka, kafka-sender-app para enviar sucesos a la aplicación Code Engine de destino. Llame a la aplicación kafka-sender-app con curl y especifique valores para el tema y el número de mensajes. Utilice la salida del comando ibmcloud ce app get para encontrar la URL pública de su aplicación. Asegúrese de ajustar el valor a curl entre comillas para asegurarse de que se trata como una sola serie. Por ejemplo:

    curl "<public_URL_of_Kafka_sender_app>?topic=<your_topic_name>&num=<number_of_messages_to_produce>"
    

    Por ejemplo:

    curl "https://kafka-sender-app.abcdabcdabc.us-south.codeengine.appdomain.cloud?topic=kafka-topic1&num=1"
    
  3. Ver sucesos en registros. Cuando se crea la suscripción de sucesos de Kafka con un intermediario, temas y un secreto de acceso que son válidos, y tiene una aplicación Kafka que genera mensajes sobre ese tema (como kafka-sender-app), puede ver sucesos en los registros para la aplicación Code Engine de destino que recibe mensajes Kafka, como kafka-receiver-app. Cuando utilice la aplicación receptora Kafka (icr.io/codeengine/kafka-receiver), busque Event data en los registros de la aplicación receptora para ver los mensajes recibidos.

    ibmcloud ce app logs -n kafka-receiver-app
    

    Salida de ejemplo

    Getting logs for all instances of application 'kafka-receiver-app'...
    OK
    
    kafka-receiver-app-00001-deployment-66976f7988-9xttm/user-container:
    2022/03/31 22:19:45 Listening on port 8080
    2022/03/31 22:19:46 ----------
    2022/03/31 22:19:46 Path: /
    2022/03/31 22:19:46 Header: Accept-Encoding=[gzip]
    2022/03/31 22:19:46 Header: Ce-Id=[partition:0/offset:167]
    2022/03/31 22:19:46 Header: Ce-Source=[/apis/v1/namespaces/glxo4k7nj7d/kafkasources/mykafkasubscription#kafka-topic1]
    2022/03/31 22:19:46 Header: Ce-Specversion=[1.0]
    2022/03/31 22:19:46 Header: Ce-Subject=[partition:0#167]
    2022/03/31 22:19:46 Header: Ce-Time=[2022-03-31T22:19:36.499Z]
    2022/03/31 22:19:46 Header: Ce-Type=[dev.knative.kafka.event]
    2022/03/31 22:19:46 Header: Content-Length=[8]
    2022/03/31 22:19:46 Header: Forwarded=[for=172.30.208.213;proto=http, for=127.0.0.6]
    2022/03/31 22:19:46 Header: K-Proxy-Request=[activator]
    2022/03/31 22:19:46 Header: Traceparent=[00-b033708685c715a7c2384cdf05797785-65540b0937e9b0ce-00]
    2022/03/31 22:19:46 Header: User-Agent=[Go-http-client/1.1]
    2022/03/31 22:19:46 Header: X-B3-Parentspanid=[e1a785d7fdbead6c]
    2022/03/31 22:19:46 Header: X-B3-Sampled=[1]
    2022/03/31 22:19:46 Header: X-B3-Spanid=[abcde9901e6bf83f]
    2022/03/31 22:19:46 Header: X-B3-Traceid=[abcde490a426573772fa0bf60caf5ddb]
    2022/03/31 22:19:46 Header: X-Envoy-Attempt-Count=[1]
    2022/03/31 22:19:46 Header: X-Forwarded-For=[172.30.208.213, 127.0.0.6, 127.0.0.6]
    2022/03/31 22:19:46 Header: X-Forwarded-Proto=[http]
    2022/03/31 22:19:46 Header: X-Request-Id=[abcdeb4e-c5ac-abcd-abcd-60e6278abcde]
    2022/03/31 22:19:46 Event data: test1: 1
    

    Ten en cuenta que la información de registro de las aplicaciones sólo dura una hora. Para obtener más información sobre la visualización de registros para apps (o trabajos), consulte Visualización de registros.

Actualización de la suscripción

Para actualizar una suscripción de suceso con la CLI, utilice el mandato ibmcloud ce subscription kafka update. El ejemplo siguiente actualiza el nombre del tema.

ibmcloud ce sub kafka update -n mykafkasubscription --topic kafka-topic2

Puede utilizar el mandato ibmcloud ce subscription kafka update para actualizar los valores de la suscripción Kafka. Sin embargo, no puede modificar el valor del grupo de consumidores con este mandato. Si desea actualizar la suscripción para que haga referencia a un tema diferente, asegúrese de que el tema Kafka existe antes de actualizar la suscripción.

Tutorial de limpieza para la suscripción a Kafka

¿Está preparado para suprimir la suscripción de Kafka, las aplicaciones de envío y recepción y el secreto? Puede utilizar los mandatos ibmcloud ce app delete, ibmcloud ce sub kafka delete y ibmcloud ce sub kafka delete. Opcionalmente, puede utilizar la opción -f para forzar la eliminación del componente sin confirmación.

Cuando suprime la suscripción Kafka, la supresión no suprime la aplicación a la que hace referencia la suscripción.

Para eliminar la suscripción,

ibmcloud ce sub kafka delete --name mykafkasubscription -f

Para eliminar la aplicación de recepción de mensajes Kafka,

ibmcloud ce app delete --name kafka-receiver-app -f

De forma similar, puede eliminar el kafka-sender-app.

ibmcloud ce app delete --name kafka-sender-app -f

Para eliminar el kafka-subscription-secret,

ibmcloud ce secret delete --name kafka-subscription-secret -f

¿Está preparado para suprimir la instancia de servicio de Event Streams ? La opción --recursive especifica que se eliminen todos los recursos de la instancia de servicio, que incluye la clave de servicio asociada.

ibmcloud resource service-instance-delete myeventstream --recursive -f

Próximos pasos

Para obtener más información sobre cómo trabajar con suscripciones de sucesos de Kafka, consulte Trabajar con el productor de sucesos de Kafka.

¿Desea ver más ejemplos de código? Consulte los ejemplos de para IBM Cloud Code Engine GitHub.