IBM Cloud Docs
使用不同的表格式

使用不同的表格式

适用于火花机 面筋加速火花机

本主题介绍了运行 Spark 应用程序的过程,该程序将数据摄取到不同的表格式,如 Apache Hudi、Apache Iceberg 或 Delta Lake 目录。

  1. 创建具有所需目录的存储(目录可以是 Apache Hudi、Apache Iceberg 或 Delta Lake ),以存储 Spark 应用程序中使用的数据。 要创建存储器,请参阅 添加存储器-目录对

  2. 将存储器与本机 Spark 引擎相关联。 有关更多信息,请参阅 使目录与引擎相关联

  3. 创建 Cloud Object Storage (COS) 以存储 Spark 应用程序。 要创建 Cloud Object Storage 和存储区,请参阅 创建存储区

  4. 在 watsonx.data中注册 Cloud Object Storage。 有关更多信息,请参阅 添加存储器-目录对

  5. 根据您选择的目录,将以下 Spark 应用程序 (Python 文件) 保存到本地计算机。 在此处,iceberg_demo.pyhudi_demo.pydelta_demo.py 并将 Spark 应用程序上载到 COS,请参阅 上载数据

  6. 要提交包含驻留在 Cloud Object Storage中的数据的 Spark 应用程序,请指定参数值并从下表运行 curl 命令。

    • Apache Iceberg

    示例文件演示了以下功能:

    • 从watsonx.data访问表格

    • 向watsonx.data输入数据

    • 修改watsonx.data中的模式

    在watsonx.data 中执行表格维护活动。

    必须将数据插入到 COS 存储区中。 更多信息,请参阅 将样本数据插入 COS 数据桶

    Python应用程序:冰山Python文件

    提交Python应用程序的Curl命令:

    curl --request POST \
    --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
    --header 'Authorization: Bearer <token>' \
    --header 'Content-Type: application/json' \
    --header 'LhInstanceId: <instance_id>' \
    --data '{  "application_details": {
               "conf": {
                      "spark.hadoop.wxd.apiKey":"Basic <user-authentication-string>"    },
                       "application": "s3a://<application-bucket-name>/iceberg.py"  }
            }'
    

    参数值:

    • <wxd_host_name>:watsonx.data云实例的主机名。

    • <instance_id>:来自 watsonx.data 实例 URL 的实例 ID。 例如,1609968977179454。

    • <spark_engine_id>:本地 Spark 引擎的引擎 ID。

    • <token>:不记名标记。 有关生成令牌的更多信息,请参阅 IAM 令牌

    • <user-authentication-string>:值必须是用户 ID 和 API 密钥的 base 64 编码字符串。 有关格式的更多信息,请参阅注释。

    • Apache胡迪

    PythonSpark 应用程序演示了以下功能:

    • 它会在 Apache Hudi 目录(您创建的用于存储数据的目录)中创建一个数据库。 这里,"<database_name>.

    • 它在“<database_name> 数据库中创建了一个表,即”<table_name>

    • 它将数据插入 "<table_name> 并执行 SELECT 查询操作。

    • 使用后,它会删除表和模式。

    Python应用程序:

    from pyspark.sql import SparkSession
            def init_spark():            spark = SparkSession.builder
            .appName("CreateHudiTableInCOS")
            .enableHiveSupport()
            .getOrCreate()
            return spark
            def main():
            try:
            spark = init_spark()
            spark.sql("show databases").show()
            spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show()
            spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING HUDI OPTIONS ('primaryKey' 'id', hoodie.write.markers.type= 'direct', hoodie.embed.timeline.server= 'false')").show()
            spark.sql("insert into <database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show()
            spark.sql("select * from spark_catalog.<database_name>.<table_name>").show()
            spark.sql("drop table spark_catalog.<database_name>.<table_name>").show()
            spark.sql("drop schema spark_catalog.<database_name> CASCADE").show()
            inally:
            spark.stop()
            if __name__ == '__main__':
            main()
    
    

    参数值:

    • <database_name>:指定要创建的数据库名称。
    • <table_name>:指定要创建的表的名称。
    • <data_storage_name>:指定所创建的 Apache Hudi 存储的名称。

    用于提交 Python 应用程序的 Curl 命令

    
    curl --request POST
        --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications
        --header 'Authorization: Bearer <token>'
        --header 'Content-Type: application/json'
        --header 'LhInstanceId: <instance_id>'
        --data '{ \n \n    "application_details": {
                "conf": {
                        "spark.sql.catalog.spark_catalog.type": "hive",
                        "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
                        "spark.hadoop.wxd.apiKey":"Basic <user-authentication-string>"        },
                        "application": "s3a://<data_storage_name>/hudi_demo.py"    }}
    

    参数值:

    • <wxd_host_name>:watsonx.data云实例的主机名。

    • <instance_id>:来自 watsonx.data 实例 URL 的实例 ID。 例如,1609968977179454。

    • <spark_engine_id>:本地 Spark 引擎的引擎 ID。

    • <token>:不记名标记。 有关生成令牌的更多信息,请参阅 IAM 令牌

    • <user-authentication-string>:值必须是用户 ID 和 API 密钥的 base 64 编码字符串。 有关格式的更多信息,请参阅注释。

    • Delta Lake

    PythonSpark 应用程序演示了以下功能:

    • 它会在 Delta Lake 目录(您创建的用于存储数据的目录)中创建一个数据库。 这里,"<database_name>.

    • 它在“<database_name> 数据库中创建了一个表,即”<table_name>

    • 它将数据插入 "<table_name> 并执行 SELECT 查询操作。

    • 使用后,它会删除表和模式。

    Python应用程序:

    from pyspark.sql import SparkSession
    import os
        def init_spark():
             spark = SparkSession.builder.appName("lh-hms-cloud")
             .enableHiveSupport().getOrCreate()
             return spark
             def main():
                 spark = init_spark()
                 spark.sql("show databases").show()
                         spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show()
                         spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING DELTA").show()
                         spark.sql("insert into spark_catalog.<database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show()
                         spark.sql("select * from spark_catalog.<database_name>.<table_name>").show()
                         spark.sql("drop table spark_catalog.<database_name>.<table_name>").show()
                         spark.sql("drop schema spark_catalog.<database_name> CASCADE").show()
                         spark.stop()
                         if __name__ == '__main__':
                         main()
    
    

    参数值:

    • <database_name>:指定要创建的数据库名称。
    • <table_name>:指定要创建的表的名称。
    • <data_storage_name>:指定所创建的 Apache Hudi 存储的名称。

    用于提交 Python 应用程序的 Curl 命令

    curl --request POST
    --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications
    --header 'Authorization: Bearer <token>'
    --header 'Content-Type: application/json'
    --header 'LhInstanceId: <instance_id>'
    --data '{        "application_details": {
           "conf": {
           "spark.sql.catalog.spark_catalog" : "org.apache.spark.sql.delta.catalog.DeltaCatalog",
           "spark.sql.catalog.spark_catalog.type" : "hive",
           "spark.hadoop.wxd.apiKey":"<user-authentication-string>"        },
           "application": "s3a://<database_name>/delta_demo.py"        }    }
    

    参数值

    • <wxd_host_name>:watsonx.data云实例的主机名。
    • <instance_id> 实例 ID:watsonx.data 集群实例 URL 中的实例 ID。 例如,1609968977179454。
    • <spark_engine_id>:本地 Spark 引擎的引擎 ID。
    • <token>:不记名标记。 有关生成令牌的更多信息,请参阅 IAM 令牌
    • <user-authentication-string>:值必须是用户 ID 和 API 密钥的 base 64 编码字符串。 有关格式的更多信息,请参阅以下说明。

    <user-authentication-string> 的值必须采用格式 echo -n 'ibmlhapikey_<username>:<user_apikey>' | base64。 此处,<user_id> 是其 API 密钥用于访问数据存储区的用户的 IBM Cloud 标识。 此处的 <IAM_APIKEY> 是访问对象存储区的用户的 API 密钥。 要生成 API 密钥,请登录到 watsonx.data 控制台,浏览至“概要文件> 概要文件和设置> API 密钥”并生成新的 API 密钥。 如果生成新的 API 密钥,那么旧的 API 密钥将变为无效。

  7. 提交 Spark 应用程序后,您将收到包含应用程序标识和 Spark 版本的确认消息。 将其保存以供参考。

  8. 登录到 watsonx.data 集群,访问“引擎详细信息”页面。 在“应用程序”选项卡中,使用应用程序标识列出应用程序并跟踪阶段。 有关更多信息,请参阅 查看和管理应用程序