IBM Cloud Docs
Spark 用例入门

Spark 用例入门

本主题提供了使用 Python 样本对 watsonx.data 运行 Spark 用例的过程。 所有样本都是使用 Spark Python API 编写的。

先决条件

  • 供应 IBM® watsonx.data 实例。
  • 配置 IBM Analytics Engine 实例。
  • Cloud Object Storage 铲斗连接细节。
  • 为了使您的 Spark 应用程序能够与watsonx.data目录和存储,您必须具有 Metastore 管理员角色。 如果没有 Metastore 管理员权限,您将无法使用 Native Spark 引擎将数据导入存储。 有关 Spark 配置的更多信息,请参阅 与watsonx.data目录和存储

关于样本用例

样本文件演示了以下功能:

  • 从 watsonx.data 访问表

    样本 Python 文件 中的 在 Lakehouse 目录中创建数据库 部分在配置的 watsonx.data 实例中创建数据库 demodb,该实例具有名为 lakehouse 的目录。demodb 配置为将所有数据和元数据存储在 Cloud Object Storage(COS) 存储区 lakehouse-bucket 下。 它还会创建一个 iceberg 表 testTable 并对其进行访问。

  • 将数据采集到 watsonx.data

    通过 样本 python 文件 中的 将拼合数据采集到 lakehouse 表中 部分,可以将来自源 Cloud Object Storage 存储区 source-bucket 的拼合和 CSV 格式的数据采集到 watsonx.data 表中。 将拼合格式的样本数据从源 COS 存储区 source-bucket 插入到 watsonx.data 表 yellow_taxi_2022 中 (请参阅 步骤 以将样本数据插入到源 COS 存储区中)。 它还显示将 CSV 格式的数据从 COS 存储区 source-bucket 采集到数据库 demodb 中的表 zipcode 中。

  • 在 watsonx.data 中修改模式

    样本 Python 文件 中的 模式演进 部分允许您修改 watsonx.data中的数据。

  • 在 watsonx.data 中执行表维护活动

    表维护有助于保持 watsonx.data 表的性能。 Iceberg 提供现成可用的表维护过程,允许以声明式方式执行强大的表优化。 以下样本演示如何使用 Spark 执行一些表维护操作。 有关 Iceberg Spark 表维护操作的更多信息,请参阅 表操作

将样本数据插入 COS 存储区

要将数据插入 COS,请执行以下步骤。

  1. 创建 COS 存储区 (例如,source-bucket) 以存储要采集到 watsonx.data 实例中的样本数据。 有关创建 COS 存储桶的信息,请参阅 开始使用 IBM Cloud Object Storage

    作为 Object Storage 的用户,您不仅需要知道配置 Object Storage 的 API 密钥或 HMAC 密钥,还需要知道连接到 IBM Analytics Engine 的服务端点。 请参阅 选择区域和端点,了解根据 Object Storage 存储桶类型(如区域与跨区域)使用端点的更多信息。 您还可以通过在 IBM Cloud 面板上选择服务并单击导航窗格中的 Endpoint 来查看 Object Storage 服务的跨区域端点。 始终选择 直接端点。 直接端点提供更好的性能,不会产生费用。 美南交叉区域的端点示例为 s3.direct.us.cloud-object-storage.appdomain.cloud

  2. 从以下链接下载样本 csv 文件 (例如,zipcodes.csv) 和 parquet 样本数据 (例如,2022 年的 6 个月出租车数据)。

  3. 安装 IBM Cloud Object Storage 插件。 有关如何安装插件的详细信息,请参阅 IBM Cloud Object Storage CLI

  4. 使用 COS CLI 将样本数据上载到 COS 存储区。

    ibmcloud cos upload --bucket <cos_bucket_name> --key <source_file_name> --file <path_to_source_file>
    

    参数值:

    • <cos_bucket_name>: name of the bucket created in step1.
    • <source_file_name>: the name of the sample data file that you downloaded. 此处,key zipcodes.csv 是文件名 (请参阅以下示例)。
    • <path_to_source_file>: the path to the location in your machine where the file resides. 此处,path/zipcodes.csv 是文件路径 (请参阅以下示例)。

    例如:

    ibmcloud cos upload --bucket source-bucket --key zipcodes.csv --file <path/zipcodes.csv>
    

运行样本用例

执行以下步骤以运行 Spark 样本 Python 文件。

Spark 样本 Python 文件


    from pyspark.sql import SparkSession
    import os

    def init_spark():
    spark = SparkSession.builder \
        .appName("lh-hms-cloud") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<lakehouse-bucket-access-key>") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<lakehouse-bucket-secret-key>") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.access.key" ,"<source-bucket-access-key>") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.secret.key" ,"<source-bucket-secret-key>") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")

    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()

    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.demodb.testTable").show()

    def create_table_from_parquet_data(spark):
        # load parquet data into dataframe
        df = spark.read.option("header",True).parquet("s3a://source-bucket/nyc-taxi/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()

    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("s3a://source-bucket/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.zipcodes').show()

    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-06.parquet")

        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")

    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()

        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()

        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()

        # List all the snapshots
        # Expire earlier snapshots. Only latest one with compacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()

        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)

        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()


    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)


    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.demodb.testTable purge')
        spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
        spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.demodb cascade')

    def main():
        try:
            spark = init_spark()

            create_database(spark)
            list_databases(spark)

            basic_iceberg_table_operations(spark)

            # demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)

            # load data for the month of February to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)

            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)

            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            clean_database(spark)
            spark.stop()

    if __name__ == '__main__':
    main()
  1. 保存以下样本 Python 文件。
  2. 将 Python 文件上载到 Cloud Object Storage 存储区。 您必须在 Cloud Object Storage 存储区中维护 Spark 应用程序及其依赖关系,而不能将其与数据存储区混合。
  3. 为 IBM Analytics Engine 令牌生成 IAM 令牌。 有关如何生成 IAM 令牌的详细信息,请参阅 IAM 令牌
  4. 运行以下 curl 命令以提交 Spark 应用程序:
    curl https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications -H "Authorization: Bearer <iam-bearer-token>" -X POST -d '{
    "application_details": {
        "application": "s3a://<application_bucket>/lakehouse-hms-test-cloud-doc-sample.py",
        "conf": {
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.access.key": "<hmac_access_key_for_application-bucket>",
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.secret.key": "<hmac_secret_key_for_application-bucket>"
        }
    }
    }'

此样本在 us-south 区域中的 Cloud Object Storage 存储区上进行测试。 根据 Cloud Object Storage 存储区所在的区域,更改 Cloud Object Storage 端点配置中的区域。 建议在供应 Analytics Engine 实例的区域中供应 COS 存储区。

如果提交的Spark申请持续三天以上处于运行状态,则该申请将被自动终止。

与watsonx.data目录和存储

为了使您的 Spark 应用程序能够与watsonx.data目录和存储,将以下配置添加到您的应用程序有效负载:

spark.hive.metastore.client.plain.username=ibmlhapikey
spark.hive.metastore.client.plain.password=<api-key-of-the-user-which-has-metastore-admin-role>
spark.hadoop.wxd.apiKey=Basic base64(ibmlhapikey_ibmcloudid:apikey)