IBM Cloud Docs
NPSaaS e Kafka

NPSaaS e Kafka

Visão geral

O Apache Kafka é um sistema de mensagens de publicação / assinatura, que pode ser usado para mover dados entre aplicativos populares

Depois de integrar sua instância do IBM® Netezza® Performance Server for IBM Cloud Pak® for Data as a Service com o Kafka por meio do conector Kafka JDBC, é possível usar o NPSaaS como um dos seguintes:

  • Uma origem de dados, que traz dados para Kafka.
  • Um dissipador de dados, que lê dados do Kafka

Usando NPSaaS como uma fonte de dados

Uma empresa de comércio eletrônico armazena suas listagens de produtos em um banco de dados NPSaaS. Para aperfeiçoar a experiência de procura no aplicativo e acessar a analítica em tempo real, os apps consumidores (por exemplo, Elasticsearch e Apache Flink) têm acesso às listagens.

Nesse caso, os dados são lidos do NPSaaS por meio do conector de origem Kafka JDBC e o Kafka flui os dados. Os aplicativos consumidores leem a partir do fluxo e processam os dados.

A imagem a seguir ilustra o fluxo de dados NPSaaS como uma origem de dados.

NPSaaS como uma origem de dados
Imagem 1. O diagrama descreve como o Kafka lê dados de Netezza por meio do conector de origem JDBC e permite que os apps consumidores os acessem.

Usando NPSaaS como um coletor de dados

Para melhorar os resultados dos pacientes, identificar com eficiência os fatores de risco e fornecer tempos de intervenção mais rápidos, um hospital extrai insights significativos de dados fisiológicos. Diferentes conjuntos de dados de vários canais são analisados conforme eles chegam.

Nesse caso, os dados recebidos são transmitidos por meio do Kafka e, em seguida, calculados. Os produtores são fontes de dados fisiológicos que vêm de diferentes canais.

Após os dados serem processados, eles são armazenados no NPSaaS para propósitos de registro de histórico do paciente por meio do conector dissipador Kafka JDBC.

A imagem a seguir ilustra o fluxo de dados para NPSaaS como um coletor de dados.

NPSaaS como um coletor de dados
Imagem 2. O diagrama descreve como os dados recebidos de vários produtores são transmitidos e calculados pelo Kafka por meio do driver JDBC e armazenados no Netezza.

Integrando NPSaaS e Kafka

Se desejar integrar sua instância do NPSaaS com Kafka, deve-se usar o conector Kafka JDBC.

O conector Kafka JDBC tem suporte para conectores JDBC de origem e destino. Com o conector de origem, é possível transferir dados do NPSaaS para tópicos do Kafka. Com o conector sink, é possível transferir dados de tópicos do Kafka para NPSaaSusando o conector Kafka JDBC.

Configurando o conector JDBC Kafka

Você deve instalar o driver na biblioteca do Kafkaeditando plugin.path.

  1. Configure Java.

    Para que o Kafka funcione, é necessário Java 8 ou posterior; para os conectores JDBC, é necessário Java 11.

    sudo yum install -y java-11-openjdk-headless
    
  2. Copie o driver Netezza JDBC para Kafka libs

    cp path/to/nzjdbc3.jar kafka/libs
    
  3. Configure o seu conector.

    No exemplo, o conector Aiven é usado.

    a) Baixe o conector.

    curl -SLO https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.6.2/jdbc-connector-for-apache-kafka-6.6.2.tar
    

    b) Desembalar o pacote.

    tar xvf jdbc-connector-for-apache-kafka-6.6.2.tar
    

    c) Editar plugin.path em Kafka config/connect-[distributed/standalone].properties para apontar para a pasta extraída. Com isso, o Kafka pode localizar e carregar o plug-in e os jars do conector

  4. Inicie a conexão distribuída..

    connect-distributed.sh config/connect-distributed.properties
    

    Saída:

    [2022-06-29 02:29:43,356] INFO Started o.e.j.s.ServletContextHandler@5562c2c9{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:915)
    [2022-06-29 02:29:43,356] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:303)
    [2022-06-29 02:29:43,356] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)
    
  5. Registre os conectores

    O banco de dados e a tabela devem existir no NPSaaS antes de tentar registrar os conectores.

    • Para o conector de origem, execute o seguinte comando.

      curl -s -X POST -H "Content-Type: Application/json" --data '{ "name": "test-source-jdbc", "config": { "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url":"jdbc:netezza://localhost:5480/db1;user=user1;password=secret","connection.user":"user1","connection.password":"secret","dialect.name":"GenericDatabaseDialect","topic.prefix":"my","mode": "bulk", "poll.interval.ms":"60000", "table.whitelist":"TEST_TABLE", "batch.max.rows":"10000000" } }' http://localhost:8083/connectors | jq
      {
        "name": "test-source-jdbc",
        "config": {
          "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
          "tasks.max": "1",
          "connection.url": "jdbc:netezza://localhost:5480/db1;user=user1;password=secret,
          "connection.user": ""user1,
          "connection.password": "secret",
          "dialect.name": "GenericDatabaseDialect",
          "topic.prefix": "my",
          "mode": "bulk",
          "poll.interval.ms": "60000",
          "table.whitelist": "TEST_TABLE",
          "batch.max.rows": "10000000",
          "name": "test-source-jdbc"
        },
        "tasks": [],
        "type": "source"
      }
      
    • Para o conector sink, execute o comando a seguir.

      curl -s -X POST -H "Content-Type: Application/json" --data '{ "name": "test-sink", "config": { "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector", "tasks.max": "2", "connection.url":"jdbc:netezza://localhost:5480/db1;user=user1;password=secret","connection.user":"user1","connection.password":"secret","dialect.name":"GenericDatabaseDialect","topics": "TEST_TABLE", "insert.mode": "insert" } }' http://localhost:8083/connectors | jq
      {
        "name": "test-sink",
        "config": {
          "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
          "tasks.max": "2",
          "connection.url": "jdbc:netezza://localhost:5480/db1;user=user1;password=secret",
          "connection.user": "user1",
          "connection.password": "secret",
          "dialect.name": "GenericDatabaseDialect",
          "topics": "TEST_TABLE",
          "insert.mode": "insert",
          "name": "test-sink"
        },
          "tasks": [],
          "type": "sink"
        }
      
  6. Verifique se o registro foi bem-sucedido.

    a) Lista os conectores.

    curl -s http://localhost:8083/connectors/ | jq
    [
      "test-source-jdbc",
      "test-sink"
    ]
    

    b) Verifique o status dos conectores.

    curl -s http://localhost:8083/connectors/test-source-jdbc/status | jq
    {
         "name": "test-source-jdbc",
         "connector": {
             "state": "RUNNING",
             "worker_id": "10.11.112.15:8083"
      },
        "tasks": [
          {
              "id": 0,
              "state": "RUNNING",
              "worker_id": "10.11.112.15:8083"
           }
         ],
         "type": "source"
    }
    

É possível verificar se os conectores funcionam corretamente verificando os logs NPSaaS.

  • Para o conector de origem:

    2022-06-29 04:13:09.057046 PDT [27743]  DEBUG:  QUERY: SELECT 1
    2022-06-29 04:13:09.061443 PDT [27743]  DEBUG:  QUERY: SELECT * FROM "DB1"."USER1"."TEST_TABLE"
    ANALYZE
    2022-06-29 04:13:09.064209 PDT [27743]  DEBUG:  QUERY: SELECT * FROM "DB1"."USER1"."TEST_TABLE"
    
  • Para o conector do dissipador:

    2022-06-29 09:08:32.379442 PDT [18976]  DEBUG:  QUERY: CREATE EXTERNAL TABLE bulkETL_18976_0 ( c0 nvarchar(8),c1 nvarchar(10),c2 nvarchar(4),c3 nvarchar(6),c4 nvarchar(7),c5 nvarchar(9),c6 nvarchar(7),c7 nvarchar(9),c8 nvarchar(12),c9 nvarchar(12),c10 nvarchar(25),c11 nvarchar(13),c12 nvarchar(1) )  USING (  DATAOBJECT('/tmp/junk')  REMOTESOURCE 'jdbc'  DELIMITER ' '  ESCAPECHAR '\'  CTRLCHARS 'YES'  CRINSTRING 'YES'  ENCODING 'INTERNAL'  MAXERRORS 1 QUOTEDVALUE 'YES' );
    2022-06-29 09:08:32.525024 PDT [18976]  DEBUG:  QUERY: INSERT INTO "TEST_TABLE"("C1","C2","C3","C4","C5","C6","C7","C8","DATE_PROD","TIME_PROD","TIMESTMP","TIMETZ_PROD","C18") VALUES(bulkETL_18976_0.c0,bulkETL_18976_0.c1,bulkETL_18976_0.c2,bulkETL_18976_0.c3,bulkETL_18976_0.c4,bulkETL_18976_0.c5,bulkETL_18976_0.c6,bulkETL_18976_0.c7,bulkETL_18976_0.c8,bulkETL_18976_0.c9,bulkETL_18976_0.c10,bulkETL_18976_0.c11,bulkETL_18976_0.c12)