使用 Apache Hudi 目录
本主题描述了运行 Spark 应用程序以将数据摄入到 Apache Hudi 目录中的过程。
-
使用 Apache Hudi 目录创建存储器以存储 Spark 应用程序中使用的数据。 要使用 Apache Hudi 目录创建存储器,请参阅 添加存储器-目录对。
-
将存储器与外部 Spark 引擎相关联。 有关更多信息,请参阅 使目录与引擎相关联。
-
创建 Cloud Object Storage (COS) 以存储 Spark 应用程序。 要创建 Cloud Object Storage 和存储区,请参阅 创建存储区。
-
在 watsonx.data中注册 Cloud Object Storage。 有关更多信息,请参阅 添加存储器-目录对。
-
将以下 Spark 应用程序 (Python 文件) 保存到本地计算机。 此处,
hudi_demo.py
。Python Spark 应用程序演示以下功能:
- 它会在 Apache Hudi 目录(您创建的用于存储数据的目录)中创建一个数据库。 此处,
<database_name>
。 - 它在
<database_name>
数据库中创建了一个表,即 <table_name>。 - 它将数据插入 <table_name> 并执行 SELECT 查询操作。
- 使用后,它会删除表和模式。
from pyspark.sql import SparkSession def init_spark(): spark = SparkSession.builder \ .appName("CreateHudiTableInCOS") \ .enableHiveSupport() \ .getOrCreate() return spark def main(): try: spark = init_spark() spark.sql("show databases").show() spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show() spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING HUDI OPTIONS ('primaryKey' 'id', hoodie.write.markers.type= 'direct', hoodie.embed.timeline.server= 'false')").show() spark.sql("insert into <database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show() spark.sql("select * from spark_catalog.<database_name>.<table_name>").show() spark.sql("drop table spark_catalog.<database_name>.<table_name>").show() spark.sql("drop schema spark_catalog.<database_name> CASCADE").show() finally: spark.stop() if __name__ == '__main__': main()
- 它会在 Apache Hudi 目录(您创建的用于存储数据的目录)中创建一个数据库。 此处,
-
将 Spark 应用程序上载到 COS,请参阅 上载数据。
-
要提交包含驻留在 Cloud Object Storage中的数据的 Spark 应用程序,请指定参数值并运行以下 curl 命令
curl --request POST \ --url https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications \ --header 'Authorization: Bearer <token>' \ --header 'Content-Type: application/json' \ --data '{ "conf": { "spark.serializer" : "org.apache.spark.serializer.KryoSerializer", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.endpoint" : "<your_data_bucket_direct_endpoint>", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.access.key" : "<data_bucket_access_key>", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.secret.key" : "<data_bucket_secret_key>", "spark.hadoop.fs.s3a.path.style.access" : "true", "spark.hadoop.fs.s3a.impl" : "org.apache.hadoop.fs.s3a.S3AFileSystem", "spark.hive.metastore.uris" : "<metastore URL>", "spark.hive.metastore.use.SSL" : "true", "spark.hive.metastore.truststore.path" : "file:///opt/ibm/jdk/lib/security/cacerts", "spark.hive.metastore.truststore.password" : "changeit", "spark.hive.metastore.truststore.type" : "JKS", "spark.hive.metastore.client.auth.mode" : "PLAIN", "spark.hive.metastore.client.plain.username" : "ibmlhapikey", "spark.hive.metastore.client.plain.password" : "<wxd_api_key>", "spark.driver.extraJavaOptions" : "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true", "spark.executor.extraJavaOptions" : "-Dcom.sun.jndi.ldap.object.disableEndpointIdentification=true -Djdk.tls.trustNameService=true", "spark.hadoop.hive.metastore.schema.verification" : "false", "spark.hadoop.hive.metastore.schema.verification.record.version" : "false", "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", "spark.kryo.registrator": "org.apache.spark.HoodieSparkKryoRegistrar", "spark.sql.catalog.spark_catalog.type": "hudi", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog" }, "application": "s3a://<data_storage_name>/hudi_demo.py" }
参数值:
-
<region>
: 供应分析引擎实例的区域。 -
<iae-instance-guid>
: 分析引擎实例 GUID。 要获取此信息,请参阅 检索无服务器实例的详细信息。 -
<token>
:不记名标记。 有关生成令牌的更多信息,请参阅 IAM 令牌。 -
<your_data_bucket_direct_endpoint>
: 用于访问数据存储区的直接端点。 例如,s3.us-south.cloud-object-storage.appdomain.cloud for a Cloud Object storage bucket in us-south region. 有关更多信息,请参阅 服务凭证。 -
<data_bucket_access_key>
: 云对象存储器 (数据存储器) 的访问密钥。 有关更多信息,请参阅 使用 CLI 创建 HMAC 凭证。 -
<data_bucket_secret_key>
: 云对象存储器 (数据存储器) 的密钥。 有关更多信息,请参阅 使用 CLI 创建 HMAC 凭证。 -
<metastore URL>
:目录的 URL。 更多信息,请参阅 获取 MDS 端点。 -
<wxd_api_key>
: 要生成 API 密钥,请登录到 watsonx.data 控制台,浏览至“概要文件”>“概要文件和设置”>“API 密钥”,并生成新的 API 密钥。
- 提交 Spark 应用程序后,您将收到包含应用程序标识和 Spark 版本的确认消息。 将其保存以供参考。
- 登录到 watsonx.data 集群,访问“引擎详细信息”页面。 在“应用程序”选项卡中,使用应用程序标识列出应用程序并跟踪阶段。 有关更多信息,请参阅 查看和管理应用程序。