IBM Cloud Docs
使用 Kubernetes Service 将 Event Streams 连接到 IBM Cloud Object Storage

使用 Kubernetes Service 将 Event Streams 连接到 IBM Cloud Object Storage

获取要在 Kubernetes Service 集群中运行的 Kafka Connect 运行时。 然后,启动IBM Cloud® Object StorageSink 连接器,将来自Event Streams中Kafka主题的数据归档到IBM Cloud® Object Storage服务的实例中。

连接器从Kafka中获取批量消息,并将消息数据作为对象上传到云Object Storage服务中的一个数据桶。

步骤 1. 安装必备软件

确保已安装以下软件和服务:

  • Event Streams 实例 - 标准套餐或企业套餐。 您需要创建证书。

  • 云Object Storage服务的一个实例,至少有一个数据桶。

  • IBM Cloud® Kubernetes Service 集群。 可以免费供应一个集群用于测试目的。

    您还需要对群集进行 CLI 访问。 有关更多信息,请参阅设置 CLI 和 API

  • 最近版本的 Kubectl。

  • Git

步骤 2. 克隆 kafka-connect 存储库

克隆包含必需文件的以下两个存储库:

步骤 3. 创建 Kafka Connect 配置

  1. 必须仅设置此配置一次。Event Streams 将其存储以供将来使用。

    从 event-streams-samples 项目中,浏览至 kafka-connect/IKS directory,编辑 connect-distributed.properties 文件,并将 <BOOTSTRAP_SERVERS> 替换为一个位置和 <APIKEY> 在三个位置使用 Event Streams 凭证。

    以逗号分隔的列表形式提供 <BOOTSTRAP_SERVERS>。 如果无效,则会出现错误。

    <APIKEY> 在机器上以明文显示,但在推送到 IBM Cloud® Kubernetes Service时为私钥。

    为了实现可靠性和可伸缩性,Kafka Connect 可以运行多个工作程序。 如果 Kubernetes Service 如果集群有多个节点,而您又想要多个 Connect Worker,请编辑 kafka-connect.yaml 文件并编辑 条目 replicas: 1

  2. 然后,运行以下命令:

    运行以下命令创建秘密:

    kubectl create secret generic connect-distributed-config --from-file=connect-distributed.properties
    

    运行以下命令创建配置图:

    kubectl create configmap connect-log4j-config --from-file=connect-log4j.properties
    

步骤 4. 部署 Kafka Connect

运行以下命令应用 kafka-connect.yaml 文件中的配置:

kubectl apply -f ./kafka-connect.yaml

步骤 5. 验证 Kafka Connect 是否正在运行

要验证 Kafka Connect 是否正在运行,请将端口转发到端口 8083 上的 kafkaconnect-service,如以下示例中所示:

kubectl port-forward service/kafkaconnect-service 8083

保持用于端口转发的终端打开,使用另一个终端进行下一步操作。

连接 REST API 可在 http://localhost:8083. 如果您想了解有关 API 的更多信息,请参阅 KafkaConnect REST 接口

现在,KafkaConnect 运行时已部署并运行在Kubernetes Service 中。 接下来,配置并启动Object Storage连接器。

步骤 6。 配置 cos-接收器 JSON 文件

编辑位于 kafka-connect-ibmcos-sink/config/ 中的 cos-sink.json 文件,以便至少在您所需的属性中填写您的信息。 虽然配置属性cos.object.deadline.seconds、cos.interval.seconds 和cos.object.records被列为可选属性,但您必须将其中至少一个属性设置为非默认值。

cos-sink.json 文件属性

用自己的值替换 cos-sink.json 文件中的占位符。

cos-sink.json 文件属性
文件属性 描述
cos.api.key 必需。 用于连接云Object Storage服务实例的 API 密钥。
cos.bucket.location 必需。 云 Object Storage 服务存储区的位置。 例如,对于 区域存储区 eu-gb,或者对于 全局存储区 eu
cos.bucket.name 必需。 要将数据写入的云Object Storage服务桶的名称。
cos.bucket.resiliency 必需。 云 Object Storage 存储区的弹性。 必须是下列其中一个值:cross-region、regional 或 single-site。
cos.service.crn 必需。 云Object Storage服务实例的 CRN。 确保输入正确的 CRN: 它是以双冒号结尾的资源实例标识,例如,crn:v1:staging:public:cloud-object-storage:global:a/8c226dc8c8bfb9bc3431515a16957954:b25fe12c-9cf5-4ee8-8285-2c7e6ae707f6::
cos.endpoint.visibility 可选。 指定 public 以通过公用因特网连接到 Cloud Object Storage 服务。 指定私有,以便从IBM Cloud网络内部运行的连接器(例如,从IBM Cloud Kubernetes Service群集)进行连接。 缺省值为 public。
cos.object.deadline.seconds 可选。 从读取Kafka 的第一条记录到将读取的所有记录写入 CloudObject Storage对象之间的秒数(以连接任务实例的挂钟时间为准)。 在生成到主题的 Kafka 记录之间长时间暂停的情况下,此属性很有用。 它确保此连接器接收到的任何记录始终在指定时间内写入 Object Storage。
cos.object.interval.seconds 可选。 The number of seconds (as measured by the timestamps in Kafka records) between reading the first record from Kafka, and writing all of the records read so far into a Cloud Object Storage object.
cos.object.records 可选。 要合并到一个对象中的Kafka记录的最大数量。

获取使用 IBM Cloud 控制台的 Object Storage 凭证

  1. 在仪表板上找到您的 Object Storage 服务。
  2. 单击服务磁贴。
  3. 单击服务凭证
  4. 单击新建凭证
  5. 填写新凭证的详细信息,例如名称和角色,并单击添加。 凭证列表中将显示新的凭证。
  6. 使用“查看凭据”点击该凭据,以 JSON 格式显示详细信息。

步骤 7. 通过配置启动连接器

运行以下命令,根据上一步提供的配置启动Object Storage连接器。

curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors --data "@./cos-sink.json"

步骤 8。 监视连接器

您可以通过转至以下位置来检查连接器。

http://localhost:8083/connectors/cos-sink/status

如果连接器的状态不是“正在运行”,请重新启动连接器。

步骤 9。 删除连接器

您可以使用以下命令来删除连接器。

curl -X DELETE
http://localhost:8083/connectors/cos-sink