NPSaaS 和 Kafka
概述
Apache Kafka 是一个发布/预订消息传递系统,可用于在热门应用程序之间移动数据。
通过 Kafka JDBC 连接器将 IBM® Netezza® Performance Server for IBM Cloud Pak® for Data as a Service 实例与 Kafka 集成后,可以使用 NPSaaS 作为下列其中一项:
- 将数据引入到 Kafka的数据源。
- 数据接收器,用于从 Kafka读取数据。
使用NPSaaS作为数据源
电子商务公司将其产品列表存储在 NPSaaS 数据库中。 为了简化应用程序内搜索体验并访问实时分析,使用者应用程序 (例如,Elasticsearch 和 Apache Flink) 有权访问列表。
在这种情况下,将从 NPSaaS 通过 Kafka JDBC 源连接器和 Kafka 流式采集数据。 使用者应用程序从流中读取并进一步处理数据。
下图说明了作为数据源的数据流 NPSaaS。

将 NPSaaS 用作数据接收器
为了改善患者结果,高效识别风险因素,并提供更快的干预时间,医院从生理数据中提取有意义的洞察。 来自不同通道的不同数据集在到达时进行分析。
在这种情况下,传入数据将流经 Kafka,然后进行计算。 生产者是来自不同渠道的生理数据的来源。
处理数据后,通过 Kafka JDBC 接收器连接器将其存储在 NPSaaS 上,以用于患者历史记录目的。
下图说明了作为数据接收器的 NPSaaS 的数据流。

将 NPSaaS 与 Kafka 集成
如果要将 NPSaaS 实例与 Kafka集成,那么必须使用 Kafka JDBC 连接器。
Kafka JDBC 连接器支持源和接收器 JDBC 连接器。 通过源连接器,可以将数据从 NPSaaS 传输到 Kafka 主题。 通过接收器连接器,您可以使用 Kafka JDBC 连接器将数据从 Kafka 主题传输到 NPSaaS。
设置 JDBC Kafka 连接器
必须通过编辑 plugin.path将驱动程序安装在 Kafka的库中。
-
设置 Java。
要使 Kafka 工作,您需要 Java 8 或更高版本; 对于 JDBC 连接器,您需要 Java 11。
sudo yum install -y java-11-openjdk-headless
-
将 Netezza JDBC 驱动程序复制到 Kafka
libs
cp path/to/nzjdbc3.jar kafka/libs
-
设置连接器。
在该示例中,使用 Aiven 连接器。
a) 下载连接器。
curl -SLO https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.6.2/jdbc-connector-for-apache-kafka-6.6.2.tar
b) 解包。
tar xvf jdbc-connector-for-apache-kafka-6.6.2.tar
c) 在 Kafka
config/connect-[distributed/standalone].properties
中编辑plugin.path
以指向抽取的文件夹。 通过此操作,Kafka 可以查找并装入插件和连接器 JAR。 -
启动分布式连接。
connect-distributed.sh config/connect-distributed.properties
输出:
[2022-06-29 02:29:43,356] INFO Started o.e.j.s.ServletContextHandler@5562c2c9{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:915) [2022-06-29 02:29:43,356] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:303) [2022-06-29 02:29:43,356] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
-
注册连接器。
在尝试注册连接器之前,数据库和表必须存在于 NPSaaS 上。
-
对于源连接器,运行以下命令。
curl -s -X POST -H "Content-Type: Application/json" --data '{ "name": "test-source-jdbc", "config": { "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url":"jdbc:netezza://localhost:5480/db1;user=user1;password=secret","connection.user":"user1","connection.password":"secret","dialect.name":"GenericDatabaseDialect","topic.prefix":"my","mode": "bulk", "poll.interval.ms":"60000", "table.whitelist":"TEST_TABLE", "batch.max.rows":"10000000" } }' http://localhost:8083/connectors | jq { "name": "test-source-jdbc", "config": { "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:netezza://localhost:5480/db1;user=user1;password=secret, "connection.user": ""user1, "connection.password": "secret", "dialect.name": "GenericDatabaseDialect", "topic.prefix": "my", "mode": "bulk", "poll.interval.ms": "60000", "table.whitelist": "TEST_TABLE", "batch.max.rows": "10000000", "name": "test-source-jdbc" }, "tasks": [], "type": "source" }
-
对于接收器连接器,运行以下命令。
curl -s -X POST -H "Content-Type: Application/json" --data '{ "name": "test-sink", "config": { "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector", "tasks.max": "2", "connection.url":"jdbc:netezza://localhost:5480/db1;user=user1;password=secret","connection.user":"user1","connection.password":"secret","dialect.name":"GenericDatabaseDialect","topics": "TEST_TABLE", "insert.mode": "insert" } }' http://localhost:8083/connectors | jq { "name": "test-sink", "config": { "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector", "tasks.max": "2", "connection.url": "jdbc:netezza://localhost:5480/db1;user=user1;password=secret", "connection.user": "user1", "connection.password": "secret", "dialect.name": "GenericDatabaseDialect", "topics": "TEST_TABLE", "insert.mode": "insert", "name": "test-sink" }, "tasks": [], "type": "sink" }
-
-
验证注册是否成功。
a) 列出连接器。
curl -s http://localhost:8083/connectors/ | jq [ "test-source-jdbc", "test-sink" ]
b) 检查连接器的状态。
curl -s http://localhost:8083/connectors/test-source-jdbc/status | jq { "name": "test-source-jdbc", "connector": { "state": "RUNNING", "worker_id": "10.11.112.15:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.11.112.15:8083" } ], "type": "source" }
您可以通过检查 NPSaaS 日志来验证连接器是否正常工作。
-
对于源连接器:
2022-06-29 04:13:09.057046 PDT [27743] DEBUG: QUERY: SELECT 1 2022-06-29 04:13:09.061443 PDT [27743] DEBUG: QUERY: SELECT * FROM "DB1"."USER1"."TEST_TABLE" ANALYZE 2022-06-29 04:13:09.064209 PDT [27743] DEBUG: QUERY: SELECT * FROM "DB1"."USER1"."TEST_TABLE"
-
对于接收器连接器:
2022-06-29 09:08:32.379442 PDT [18976] DEBUG: QUERY: CREATE EXTERNAL TABLE bulkETL_18976_0 ( c0 nvarchar(8),c1 nvarchar(10),c2 nvarchar(4),c3 nvarchar(6),c4 nvarchar(7),c5 nvarchar(9),c6 nvarchar(7),c7 nvarchar(9),c8 nvarchar(12),c9 nvarchar(12),c10 nvarchar(25),c11 nvarchar(13),c12 nvarchar(1) ) USING ( DATAOBJECT('/tmp/junk') REMOTESOURCE 'jdbc' DELIMITER ' ' ESCAPECHAR '\' CTRLCHARS 'YES' CRINSTRING 'YES' ENCODING 'INTERNAL' MAXERRORS 1 QUOTEDVALUE 'YES' ); 2022-06-29 09:08:32.525024 PDT [18976] DEBUG: QUERY: INSERT INTO "TEST_TABLE"("C1","C2","C3","C4","C5","C6","C7","C8","DATE_PROD","TIME_PROD","TIMESTMP","TIMETZ_PROD","C18") VALUES(bulkETL_18976_0.c0,bulkETL_18976_0.c1,bulkETL_18976_0.c2,bulkETL_18976_0.c3,bulkETL_18976_0.c4,bulkETL_18976_0.c5,bulkETL_18976_0.c6,bulkETL_18976_0.c7,bulkETL_18976_0.c8,bulkETL_18976_0.c9,bulkETL_18976_0.c10,bulkETL_18976_0.c11,bulkETL_18976_0.c12)