IBM Cloud Docs
使用 Apache Hudi 目录

使用 Apache Hudi 目录

本主题描述了运行 Spark 应用程序以将数据摄入到 Apache Hudi 目录中的过程。

  1. 使用 Apache Hudi 目录创建存储器以存储 Spark 应用程序中使用的数据。 要使用 Apache Hudi 目录创建存储器,请参阅 添加存储器-目录对

  2. 将存储器与外部 Spark 引擎相关联。 有关更多信息,请参阅 使目录与引擎相关联

  3. 创建 Cloud Object Storage (COS) 以存储 Spark 应用程序。 要创建 Cloud Object Storage 和存储区,请参阅 创建存储区

  4. 在 watsonx.data中注册 Cloud Object Storage。 有关更多信息,请参阅 添加存储器-目录对

  5. 将以下 Spark 应用程序 (Python 文件) 保存到本地计算机。 此处,hudi_demo.py

    Python Spark 应用程序演示以下功能:

    • 它会在 Apache Hudi 目录(您创建的用于存储数据的目录)中创建一个数据库。 此处,<database_name>
    • 它在 <database_name> 数据库中创建了一个表,即 <table_name>。
    • 它将数据插入 <table_name> 并执行 SELECT 查询操作。
    • 使用后,它会删除表和模式。
        from pyspark.sql import SparkSession
    
        def init_spark():
            spark = SparkSession.builder \
                .appName("CreateHudiTableInCOS") \
                .enableHiveSupport() \
                .getOrCreate()
            return spark
    
        def main():
    
            try:
                spark = init_spark()
                spark.sql("show databases").show()
                spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show()
                spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING HUDI OPTIONS ('primaryKey' 'id', hoodie.write.markers.type= 'direct', hoodie.embed.timeline.server= 'false')").show()
                spark.sql("insert into <database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show()
                spark.sql("select * from spark_catalog.<database_name>.<table_name>").show()
                spark.sql("drop table spark_catalog.<database_name>.<table_name>").show()
                spark.sql("drop schema spark_catalog.<database_name> CASCADE").show()
    
            finally:
                spark.stop()
    
        if __name__ == '__main__':
            main()
    
  6. 将 Spark 应用程序上载到 COS,请参阅 上载数据

  7. 要提交包含驻留在 Cloud Object Storage中的数据的 Spark 应用程序,请指定参数值并运行以下 curl 命令

    curl --request POST \
    --url https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications \
    --header 'Authorization: Bearer <token>' \
    --header 'Content-Type: application/json' \
    --data '{
    		"conf": {
            "spark.serializer" : "org.apache.spark.serializer.KryoSerializer",
            "spark.hadoop.fs.s3a.bucket.<data_storage_name>.endpoint" : "<your_data_bucket_direct_endpoint>",
            "spark.hadoop.fs.s3a.bucket.<data_storage_name>.access.key" : "<data_bucket_access_key>",
            "spark.hadoop.fs.s3a.bucket.<data_storage_name>.secret.key" : "<data_bucket_secret_key>",
            "spark.hadoop.fs.s3a.path.style.access" : "true",
            "spark.hadoop.fs.s3a.impl" : "org.apache.hadoop.fs.s3a.S3AFileSystem",
            "spark.hive.metastore.uris" : "<metastore URL>",
            "spark.hive.metastore.use.SSL" : "true",
            "spark.hive.metastore.truststore.path" : "file:///opt/ibm/jdk/lib/security/cacerts",
            "spark.hive.metastore.truststore.password" : "changeit",
            "spark.hive.metastore.truststore.type" : "JKS",
            "spark.hive.metastore.client.auth.mode" : "PLAIN",
            "spark.hive.metastore.client.plain.username" : "ibmlhapikey",
            "spark.hive.metastore.client.plain.password" : "<wxd_api_key>",
            "spark.driver.extraJavaOptions" : "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true",
            "spark.executor.extraJavaOptions" : "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true",
            "spark.hadoop.hive.metastore.schema.verification" : "false",
            "spark.hadoop.hive.metastore.schema.verification.record.version" : "false",
        "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
           "spark.kryo.registrator": "org.apache.spark.HoodieSparkKryoRegistrar",
           "spark.sql.catalog.spark_catalog.type": "hudi",
            "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
    
    		},
    		"application": "s3a://<data_storage_name>/hudi_demo.py"
    	}
    

参数值:

  • <region>: 供应分析引擎实例的区域。

  • <iae-instance-guid>: 分析引擎实例 GUID。 要获取此信息,请参阅 检索无服务器实例的详细信息

  • <token>:不记名标记。 有关生成令牌的更多信息,请参阅 IAM 令牌

  • <your_data_bucket_direct_endpoint> : 用于访问数据存储区的直接端点。 例如,s3.us-south.cloud-object-storage.appdomain.cloud for a Cloud Object storage bucket in us-south region. 有关更多信息,请参阅 服务凭证

  • <data_bucket_access_key> : 云对象存储器 (数据存储器) 的访问密钥。 有关更多信息,请参阅 使用 CLI 创建 HMAC 凭证

  • <data_bucket_secret_key> : 云对象存储器 (数据存储器) 的密钥。 有关更多信息,请参阅 使用 CLI 创建 HMAC 凭证

  • <metastore URL>:目录的 URL。 更多信息,请参阅 获取 MDS 端点

  • <wxd_api_key>: 要生成 API 密钥,请登录到 watsonx.data 控制台,浏览至“概要文件”>“概要文件和设置”>“API 密钥”,并生成新的 API 密钥。

  1. 提交 Spark 应用程序后,您将收到包含应用程序标识和 Spark 版本的确认消息。 将其保存以供参考。
  2. 登录到 watsonx.data 集群,访问“引擎详细信息”页面。 在“应用程序”选项卡中,使用应用程序标识列出应用程序并跟踪阶段。 有关更多信息,请参阅 查看和管理应用程序