Event Streams
IBM Cloud® Functions is deprecated. Existing Functions entities such as actions, triggers, or sequences will continue to run, but as of 28 December 2023, you can’t create new Functions entities. Existing Functions entities are supported until October 2024. Any Functions entities that still exist on that date will be deleted. For more information, see Deprecation overview.
IBM Cloud® Functions provides a pre-installed package for publishing and consuming messages with Event Streams.
Package entities
Package | Type | Description |
---|---|---|
/whisk.system/messaging |
Pre-installed package | Publishing and consume messages with the native Kafka API. |
/messaging/messageHubProduce |
Action | Deprecated |
/messaging/messageHubFeed |
Feed | Reacts when messages are posted to an Event Streams instance |
The /messaging/messageHubProduce
action is deprecated. To maintain optimal performance, migrate your usage of the /messaging/messageHubProduce
action to use a persistent connection when data is produced to Event Streams/Kafka.
The /messaging/messageHubProduce
action is not available in Tokyo or Sydney regions.
To learn more about producing messages, check out the Event Streams documentation.
Binding the /whisk.system/messaging
package to your Event Streams instance
If you're using IBM Cloud® Functions from the IBM Cloud, you can use the IBM Cloud® Functions CLI plug-in to bind a service to an action or package. If you do not bind your service, you must pass your credentials each time you use the action or package.
Before you begin
You must have an instance of Event Streams. To create an instance, see Event Streams documentation.
-
Create a
/whisk.system/messaging
package binding that is configured for your Event Streams account. In this example, the package name isMyEventStreamBind
.ibmcloud fn package bind /whisk.system/messaging MyEventStreamBind
-
Verify that the package binding exists.
ibmcloud fn package list
Example output
packages /<namespace_ID>/MyEventStreamBind
-
Get the name of the service instance that you want to bind to an action or package.
ibmcloud resource service-instances
Example output
Name Location State Type EventStreams-0s us-south active service_instance
-
Get the name of the credentials that are defined for the service instance you got in the previous step.
ibmcloud resource service-keys --instance-name EventStreams-0s
Example output
Name State Created At Service credentials-1 active Sat Oct 27 03:26:52 UTC 2018 Service credentials-2 active Sun Jan 27 22:14:58 UTC 2019
You can create service keys by using the
ibmcloud resource service-keys
. -
Bind the service to the package that you created in the first step. In the example, this package is called
MyEventStreamBind
.ibmcloud fn service bind messagehub MyEventStreamBind --instance EventStreams-0s --keyname 'Service credentials-1'
Example output
Credentials 'Service credentials-1' from 'messagehub' service instance 'EventStreams-0s' bound to 'MyEventStreamBind'.
-
Verify that the credentials are successfully bound.
ibmcloud fn package get MyEventStreamBind parameters
Example output
ok: got package MyEventStreamBind ... "parameters": [ { "key": "bluemixServiceName", "value": "messagehub" }, { "key": "endpoint", "value": "openwhisk.ng.bluemix.net" }, { "key": "__bx_creds", "value": { "messagehub": { "api_key": "2RxXWBVUdR5-8GDXrYhpm7zMCB5dNtJ1vB3YfaI3o7", "apikey": "2RxXWBVUdR5-8GDXrYhpm7zMCB5dNtJ1vB3YfaI3o7", "credentials": "Service credentials-1", "iam_apikey_description": "Auto generated apikey during resource-key operation for Instance - crn:v1:bluemix:public:messagehub:us-south:a/6ef045fb2b43266cfe8e6745dd2ec098:1244a768-a7e5-3c48-a6d2-1ab7c8b63d57::", "iam_apikey_name": "auto-generated-apikey-dc334c91-3503-480f-a812-49bff8bedc60", "iam_role_crn": "crn:v1:bluemix:public:iam::::serviceRole:Manager", "iam_serviceid_crn": "crn:v1:bluemix:public:iam-identity::a/6ef045fd2b43266cfe8e6388dd2ec098::serviceid:ServiceId-e2e58c4f-9bc2-4bbe-cbc9-0d679e7a0d85", "instance": "EventStreams-0s", "instance_id": "1244a768-a7e5-3c48-a6d2-1ab7c8b63d57", "kafka_admin_url": "https://5syh8qrs9rpj0zds.svc01.us-south.eventstreams.cloud.ibm.com", "kafka_brokers_sasl": [ "broker-1-5syh8qrs9rpj0zds.kafka.svc01.us-south.eventstreams.cloud.ibm.com:9093", "broker-4-5syh8qrs9rpj0zds.kafka.svc01.us-south.eventstreams.cloud.ibm.com:9093", "broker-3-5syh8qrs9rpj0zds.kafka.svc01.us-south.eventstreams.cloud.ibm.com:9093", "broker-2-5syh8qrs9rpj0zds.kafka.svc01.us-south.eventstreams.cloud.ibm.com:9093", "broker-5-5syh8qrs9rpj0zds.kafka.svc01.us-south.eventstreams.cloud.ibm.com:9093", "broker-0-5syh8qrs9rpj0zds.kafka.svc01.us-south.eventstreams.cloud.ibm.com:9093" ], "kafka_http_url": "https://5syh8qrs9rpj0zds.svc01.us-south.eventstreams.cloud.ibm.com", "password": "2RxXBVUdR5-9GDXrYhpmzMCB5dNtJ1vB3YfaI3o7", "user": "token" } } } ]
In this example, the credentials for the Event Streams service belong to a parameter named __bx_creds
.
Setting up an Event Streams package outside IBM Cloud
If you want to set up your Event Streams outside of IBM Cloud, you must manually create a package binding for your Event Streams service. You need the Event Streams service credentials and connection information.
Create a package binding that is configured for your Event Streams service.
ibmcloud fn package bind /whisk.system/messaging myMessageHub -p kafka_brokers_sasl "[\"kafka01-prod01.messagehub.services.us-south.bluemix.net:9093\", \"kafka02-prod01.messagehub.services.us-south.bluemix.net:9093\", \"kafka03-prod01.messagehub.services.us-south.bluemix.net:9093\"]" -p user <your Event Streams user> -p password <your Event Streams password> -p kafka_admin_url https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443
Creating a trigger that listens to an Event Streams instance
To create a trigger that reacts when messages are posted to an Event Streams instance, use the feed that is named /messaging/messageHubFeed
. The feed action supports the following parameters:
Name | Type | Description |
---|---|---|
kafka_brokers_sasl |
JSON array of strings | This parameter is an array of <host>:<port> strings that comprise the brokers in your Event Streams instance. |
user |
String | Your Event Streams username. |
password |
String | Your Event Streams password. |
topic |
String | The topic that you want the trigger to listen to. |
kafka_admin_url |
URL string | The URL of the Event Streams admin REST interface. |
isJSONData |
Boolean (Optional - default=false) | When set to true , the provider attempts to parse the message value as JSON before passing it along as the trigger payload. |
isBinaryKey |
Boolean (Optional - default=false) | When set to true , the provider encodes the key value as Base64 before passing it along as the trigger payload. |
isBinaryValue |
Boolean (Optional - default=false) | When set to true , the provider encodes the message value as Base64 before passing it along as the trigger payload. |
To create the trigger, follow these steps.
-
Bind the
/whisk.system/messaging
package to your Event Streams instance. -
Create a trigger that is fired when new messages are posted to your Event Streams topic. Replace
<namespace_ID>
with your namespace ID,<binding>
with your package binding name, and<mytopic>
with the topic in your Event Streams instance that you want the trigger to listen to.ibmcloud fn trigger create MyMessageHubTrigger -f /<namespace_ID>/<binding>/messageHubFeed -p topic <mytopic>
To find the path to the
messageHubFeed
trigger, runibmcloud fn package list | grep <binding>
, where<binding>
is the name of your package binding.
Creating a trigger for an Event Streams package outside IBM Cloud
If you want to set up your Event Streams outside of IBM Cloud, you must manually create a package binding for your Event Streams service. You need the Event Streams service credentials and connection information.
-
Create a package binding that is configured for your Event Streams service.
ibmcloud fn package bind /whisk.system/messaging myMessageHub -p kafka_brokers_sasl " [\"broker-1-9eyy8dkv3rrj0wdn.kafka.svc01.us-south.eventstreams.cloud.ibm.com:9093\", \"broker-1-9eyy8dkv3rrj0wdn.kafka.svc02.us-south.eventstreams.cloud.ibm.com:9093\", \"broker-1-9eyy8dkv3rrj0wdn.kafka.svc03.us-south.eventstreams.cloud.ibm.com:9093\"]" -p user <your Event Streams user> -p password <your Event Streams password> -p kafka_admin_url https://9eyy8dkv3rrj0wdn.svc01.us-south.eventstreams.cloud.ibm.com
-
Now you can create a trigger by using your new package that is fired when new messages are posted to your Event Streams topic.
ibmcloud fn trigger create MyMessageHubTrigger -f myMessageHub/messageHubFeed -p topic mytopic -p isJSONData true
Listening for messages
After a trigger is created, the system monitors the specified topic in your messaging service. When new messages are posted, the trigger is fired.
The payload of that trigger contains a messages
field, which is an array of messages that are posted from the last time the trigger is fired. Each message object in the array contains the following fields:
topic
partition
offset
key
value
In Kafka terms, the fields are self-evident. However, key
has a feature that is called isBinaryKey
that allows the key
to transmit binary data. Additionally, the value
requires special consideration.
Fields isJSONData
and isBinaryValue
are available to handle JSON and binary messages. These fields, isJSONData
, and isBinaryValue
, cannot be used together.
As an example, if isBinaryKey
was set to true
when the trigger was created, the key
is encoded as a Base64 string when returned from they payload of a fired trigger.
If a key
of Some key
is posted with isBinaryKey
set to true
, the trigger payload resembles the following example.
{
"messages": [
{
"partition": 0,
"key": "U29tZSBrZXk=",
"offset": 421760,
"topic": "mytopic",
"value": "Some value"
}
]
}
If the isJSONData
parameter was set to false
(or not set at all) when the trigger was created, the value
field is the raw value of the posted message. However, if isJSONData
was set to true
when the trigger was created, the system attempts to parse this value as a JSON object, on a best-effort basis. If parsing is successful, then the value
in the trigger payload is the resulting JSON object.
If a message of {"title": "Some string", "amount": 5, "isAwesome": true}
is posted with isJSONData
set to true
, the trigger payload might look something like the
following example.
{
"messages": [
{
"partition": 0,
"key": null,
"offset": 421760,
"topic": "mytopic",
"value": {
"amount": 5,
"isAwesome": true,
"title": "Some string"
}
}
]
}
However, if the same message content is posted with isJSONData
set to false
, the trigger payload would look like the following example.
{
"messages": [
{
"partition": 0,
"key": null,
"offset": 421761,
"topic": "mytopic",
"value": "{\"title\": \"Some string\", \"amount\": 5, \"isAwesome\": true}"
}
]
}
Similar to isJSONData
, if isBinaryValue
was set to true
during trigger creation, the resultant value
in the trigger payload is encoded as a Base64 string.
If a value
of Some data
is posted with isBinaryValue
set to true
, the trigger payload might look something like the following example.
{
"messages": [
{
"partition": 0,
"key": null,
"offset": 421760,
"topic": "mytopic",
"value": "U29tZSBkYXRh"
}
]
}
If the same message is posted without isBinaryData
set to true
, the trigger payload would resemble the following example.
{
"messages": [
{
"partition": 0,
"key": null,
"offset": 421760,
"topic": "mytopic",
"value": "Some data"
}
]
}
Messages are batched
Notice that the trigger payload contains an array of messages. If these messages are produced to your messaging system quickly, the feed attempts to batch up the posted messages into a single firing of your trigger. Batch processing allows the messages to be posted to your trigger more rapidly and efficiently.
Keep in mind when you are coding actions that are fired by your trigger, that the number of messages in the payload is technically unbounded, but is always greater than 0. See the following example of a batched message (note the change in the offset value).
{
"messages": [
{
"partition": 0,
"key": null,
"offset": 100,
"topic": "mytopic",
"value": {
"amount": 5
}
},
{
"partition": 0,
"key": null,
"offset": 101,
"topic": "mytopic",
"value": {
"amount": 1
}
},
{
"partition": 0,
"key": null,
"offset": 102,
"topic": "mytopic",
"value": {
"amount": 999
}
}
]
}