使用 Kubernetes Service 将 Event Streams 连接到 IBM Cloud Object Storage
获取要在 Kubernetes Service 集群中运行的 Kafka Connect 运行时。 然后,启动IBM Cloud® Object StorageSink 连接器,将来自Event Streams中Kafka主题的数据归档到IBM Cloud® Object Storage服务的实例中。
连接器从Kafka中获取批量消息,并将消息数据作为对象上传到云Object Storage服务中的一个数据桶。
步骤 1. 安装必备软件
确保已安装以下软件和服务:
-
Event Streams 实例 - 标准套餐或企业套餐。 您需要创建证书。
-
云Object Storage服务的一个实例,至少有一个数据桶。
-
IBM Cloud® Kubernetes Service 集群。 可以免费供应一个集群用于测试目的。
您还需要对群集进行 CLI 访问。 有关更多信息,请参阅设置 CLI 和 API。
-
最近版本的 Kubectl。
步骤 2. 克隆 kafka-connect 存储库
克隆包含必需文件的以下两个存储库:
步骤 3. 创建 Kafka Connect 配置
-
必须仅设置此配置一次。Event Streams 将其存储以供将来使用。
从 event-streams-samples 项目中,浏览至
kafka-connect/IKS directory
,编辑connect-distributed.properties
文件,并将<BOOTSTRAP_SERVERS>
替换为一个位置和<APIKEY>
在三个位置使用 Event Streams 凭证。以逗号分隔的列表形式提供
<BOOTSTRAP_SERVERS>
。 如果无效,则会出现错误。<APIKEY>
在机器上以明文显示,但在推送到 IBM Cloud® Kubernetes Service时为私钥。为了实现可靠性和可伸缩性,Kafka Connect 可以运行多个工作程序。 如果 Kubernetes Service 如果集群有多个节点,而您又想要多个 Connect Worker,请编辑
kafka-connect.yaml
文件并编辑 条目replicas: 1
。 -
然后,运行以下命令:
运行以下命令创建秘密:
kubectl create secret generic connect-distributed-config --from-file=connect-distributed.properties
运行以下命令创建配置图:
kubectl create configmap connect-log4j-config --from-file=connect-log4j.properties
步骤 4. 部署 Kafka Connect
运行以下命令应用 kafka-connect.yaml
文件中的配置:
kubectl apply -f ./kafka-connect.yaml
步骤 5. 验证 Kafka Connect 是否正在运行
要验证 Kafka Connect 是否正在运行,请将端口转发到端口 8083 上的 kafkaconnect-service,如以下示例中所示:
kubectl port-forward service/kafkaconnect-service 8083
保持用于端口转发的终端打开,使用另一个终端进行下一步操作。
连接 REST API 可在 http://localhost:8083
. 如果您想了解有关 API 的更多信息,请参阅
KafkaConnect REST 接口。
现在,KafkaConnect 运行时已部署并运行在Kubernetes Service 中。 接下来,配置并启动Object Storage连接器。
步骤 6。 配置 cos-接收器 JSON 文件
编辑位于 kafka-connect-ibmcos-sink/config/
中的 cos-sink.json
文件,以便至少在您所需的属性中填写您的信息。 虽然配置属性cos.object.deadline.seconds、cos.interval.seconds 和cos.object.records被列为可选属性,但您必须将其中至少一个属性设置为非默认值。
cos-sink.json 文件属性
用自己的值替换 cos-sink.json
文件中的占位符。
文件属性 | 描述 |
---|---|
cos.api.key | 必需。 用于连接云Object Storage服务实例的 API 密钥。 |
cos.bucket.location | 必需。 云 Object Storage 服务存储区的位置。 例如,对于 区域存储区 eu-gb ,或者对于 全局存储区 eu 。 |
cos.bucket.name | 必需。 要将数据写入的云Object Storage服务桶的名称。 |
cos.bucket.resiliency | 必需。 云 Object Storage 存储区的弹性。 必须是下列其中一个值:cross-region、regional 或 single-site。 |
cos.service.crn | 必需。 云Object Storage服务实例的 CRN。 确保输入正确的 CRN: 它是以双冒号结尾的资源实例标识,例如,crn:v1:staging:public:cloud-object-storage:global:a/8c226dc8c8bfb9bc3431515a16957954:b25fe12c-9cf5-4ee8-8285-2c7e6ae707f6:: 。 |
cos.endpoint.visibility | 可选。 指定 public 以通过公用因特网连接到 Cloud Object Storage 服务。 指定私有,以便从IBM Cloud网络内部运行的连接器(例如,从IBM Cloud Kubernetes Service群集)进行连接。 缺省值为 public。 |
cos.object.deadline.seconds | 可选。 从读取Kafka 的第一条记录到将读取的所有记录写入 CloudObject Storage对象之间的秒数(以连接任务实例的挂钟时间为准)。 在生成到主题的 Kafka 记录之间长时间暂停的情况下,此属性很有用。 它确保此连接器接收到的任何记录始终在指定时间内写入 Object Storage。 |
cos.object.interval.seconds | 可选。 The number of seconds (as measured by the timestamps in Kafka records) between reading the first record from Kafka, and writing all of the records read so far into a Cloud Object Storage object. |
cos.object.records | 可选。 要合并到一个对象中的Kafka记录的最大数量。 |
获取使用 IBM Cloud 控制台的 Object Storage 凭证
- 在仪表板上找到您的 Object Storage 服务。
- 单击服务磁贴。
- 单击服务凭证。
- 单击新建凭证。
- 填写新凭证的详细信息,例如名称和角色,并单击添加。 凭证列表中将显示新的凭证。
- 使用“查看凭据”点击该凭据,以 JSON 格式显示详细信息。
步骤 7. 通过配置启动连接器
运行以下命令,根据上一步提供的配置启动Object Storage连接器。
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors --data "@./cos-sink.json"
步骤 8。 监视连接器
您可以通过转至以下位置来检查连接器。
http://localhost:8083/connectors/cos-sink/status
如果连接器的状态不是“正在运行”,请重新启动连接器。
步骤 9。 删除连接器
您可以使用以下命令来删除连接器。
curl -X DELETE
http://localhost:8083/connectors/cos-sink