IBM Cloud Docs
使用 REST producer API

使用 REST producer API

Event Streams 提供 REST API 以帮助您将现有系统连接到 Event Streams Kafka 集群。 通过使用 API,您可以将 Event Streams 与支持 RESTful API 的任何系统集成。

REST 生产者 API 仅作为 Event Streams 标准和企业套餐的一部分提供。

REST producer API 是可缩放的 REST 接口,用于通过安全 HTTP 端点向 Event Streams 生成消息。 将事件数据发送到 Event Streams,使用 Kafka 技术处理数据订阅源,并利用 Event Streams 功能来管理数据。

使用 API 将现有系统连接到 Event Streams。 从系统创建针对 Event Streams 的生成请求,包括指定消息密钥、头、以及您想要将消息写入的主题。

访问 REST 生产者 API

您必须检索从服务实例的服务凭证对象或服务密钥连接到 API 所需的 URL 和凭证详细信息。 有关创建这些对象的更多信息,请参阅 连接到 Event Streams

API 端点的 URL 在 kafka_http_url 属性中提供。

认证

受支持的认证机制是使用不记名令牌。 要使用 IBM Cloud CLI 获取令牌,请首先登录到 IBM Cloud,然后运行以下命令:

ibmcloud iam oauth-tokens

将此令牌放在 HTTP 请求的 Authorization 头中,格式为 Bearer<token>。 支持 API 密钥或 JWT 令牌。

使用 REST 生产者 API 生成消息

使用生产者 API 的 v2 端点将类型为 textbinaryJSONavro 的消息发送到主题。 通过 v2 端点,您可以通过指定 avro 数据类型的模式来使用 Event Streams 模式注册表。

以下代码显示了使用 curl 发送 text 类型的消息的示例:

curl -v -X POST \
-H "Authorization: Bearer $token" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
  "headers": [
    {
      "name": "colour",
      "value": "YmxhY2s="
    }
  ],
  "key": {
    "type": "text",
    "data": "Test Key"
  },
  "value": {
    "type": "text",
    "data": "Test Value"
  }
}' \
"$kafka_http_url/v2/topics/$topic_name/records"

有关 API 的更多信息,请参阅 Event Streams REST Producer API 参考

生成符合模式的消息

通过 REST 生产者 API 的 v2 端点,您可以生成消息,使消息键和值符合模式。 可以为键和值指定不同的模式。 支持的序列化器为 confluent,支持的数据类型为 avro。 模式将创建并存储在 Event Streams 模式注册表中。 有关更多信息,请参阅 Event Streams 模式注册表

允许以下模式命名策略:

  • 主题命名策略: 主题的名称用于派生模式工件标识。 The ID takes the form "<topicName>-key" for key and "<topicName>-value" for value, where topicName is the name of topic.

  • 记录命名策略: 模式中记录的名称用于派生模式工件标识。 The ID takes the form "<composite-recordName>-key" for key and "<composite-recordName>-value" for value. If the schema namespace field is specified, the composite-recordName takes the value of "<namespace>.<recordName>", otherwise it takes the value of "<recordName>".

  • TopicRecord 命名策略: 主题的名称和记录的名称都用于派生模式工件标识。 The ID takes the form "<topicName>-<recordName>-key" for key and "<topicName>-<composite-recordName>-value" for value, where topicName is the name of topic. If the schema namespace field is specified, the composite-recordName takes the value of "<namespace>.<recordName>", otherwise it takes the value of "<recordName>".

以下代码显示了使用 curl 发送符合 schema 字段下指定的模式的消息的示例:

curl -v -X POST \
-H "Authorization: Bearer $token" -H "Content-Type: application/json" -H "Accept: application/json" \
-d '{
  "value": {
    "type": "avro",
    "schema": "{\"namespace\": \"com.eventstreams.samples\",\"type\": \"record\",\"name\": \"recordValueName\",\"fields\": [{\"name\": \"valueName\", \"type\": \"string\"}]}",
    "schema_name_strategy": "record",
    "data": "{\"valueName\": \"sampleValueName\"}"
  }
}' \
"$kafka_http_url/v2/topics/$topic_name/records?serializer=confluent"

从现有端点迁移到 REST 生产者 API 的 v2 端点

对 v2 端点进行了若干改进,以更好地使用 API 标准并使其与 API 标准保持一致。 要充分利用这些改进,请对现有应用程序进行更改。

以下注意事项可帮助您规划迁移:

  1. 访问 REST 生产者 API:

    您可以通过获取服务实例的 kafka_http_url 属性值,以与现有 URL 相同的方式访问 v2 端点。 要使用的路径为 /v2/topics/<topic_name>/records

    Example URL: https://service-instance-adsf1234asdf1234asdf1234-0000.us-south.containers.appdomain.cloud/v2/topics/topic_name/records
    
  2. 认证:

    受支持的认证机制是不记名令牌。 为了增强安全性,不再接受使用 API 密钥进行基本认证。

    Example Header:  -H "Authorization: Bearer $token"
    
  3. 头:

    将 Content-Type 和 Accept 头设置为 application/json

    Example Headers:  -H "Content-Type: application/json" -H "Accept: application/json"
    
  4. 有效内容:

    以 JSON 格式提供 v2 端点的有效内容。 可以在有效内容中定义消息密钥,头和数据。 您可以使用 base64 编码的值以列表形式指定头。 但是,消息键和头是可选的。

    Example payload:
    {
      "headers": [
      {
       "name": "colour",
       "value": "YmxhY2s="
      },
      "key": {
       "type": "text",
       "data": "Test Key"
      },
      "value": {
       "type": "text",
       "data": "Test Value"
      }
     }
    

    必须为键和值对象下的字段类型指定下列其中一种受支持的数据类型:

    a. 文本: 所提供的数据将验证为由线性字符序列组成的纯文本格式。

    b. 二进制: 提供的数据将验证为 base64-encoded 二进制格式。

    c. JSON: 将以 JSON 格式验证所提供的数据。

    d. Avro: 数据验证为 Apache Avro 数据格式

  5. 错误响应:

    错误响应包含 traceerror messageerror codemore_infotarget 属性。

    Example error response:
    {
     "trace": "a222e93c-e5f9-435d-b275-5d4919ea87ed",
     "error": {
      "code": "invalid_type",
      "message": "'type' field in 'value' object is required ...",
      "more_info": "https://cloud.ibm.com/apidocs/event-streams/restproducer_v2",
      "target": {
       "type": "field",
       "name": "type"
      }
     }
    }
    

限制

使用 REST 生产者 API 时,作为请求有效内容传递的消息的最大大小存在限制。 有效内容的最大大小限制为 64 K。

API 参考

有关 v2 端点 API 的完整详细信息,请参阅 Event Streams REST Producer v2 API 参考

有关现有端点 API 的完整详细信息,请参阅 Event Streams REST Producer API 参考