IBM Cloud Docs
使用 IBM Cloud Data Engine 作为外部元存储 

使用 IBM Cloud Data Engine 作为外部元存储 

已停用

IBM Cloud Data Engine 是 IBM Cloud的数据湖中央服务。 它提供来自 IBM Cloud Object Storage 和 Kafka的流采集,数据准备,ETL 和数据查询。 它还可以管理与 Hive Metastore 兼容的目录中的表和视图,并且其他大数据引擎和服务都可以与之相连。 请参阅 IBM Cloud Data Engine概述

IBM Cloud Data Engine 的每个实例都包含一个数据库目录,可用于在 IBM Cloud Object Storage上注册和管理数据的表定义。 目录语法与 Hive metastore 语法兼容。 您可以使用 IBM Cloud Data Engine 在 IBM Analytics Engine Spark 集群外部外部化元数据。

先决条件

以下是先决条件:

  • 创建 IBM Cloud Data Engine 实例
  • 在 Cloud Object Storage 中存储数据
  • 创建模式

创建 IBM Cloud Data Engine 实例

使用标准套餐创建 IBM Cloud Data Engine 实例。 请参阅 Data Engine

供应 Data Engine 实例后:

  1. 记下实例的 CRN。
  2. 创建具有实例访问权的帐户级别 API 密钥或服务标识级别 API 密钥。
  3. 应该授予此服务标识对 Data Engine 实例以及 IBM Cloud Object Storage 存储区的访问权。

然后,可以根据需要在实例级别或应用程序级别配置 IBM Analytics Engine 实例以使用缺省元存储器配置。

IBM Cloud Data Engine 支持为不同端点 (位置) 创建实例。 在实例中,将创建不同的 IBM Cloud Object Storage 存储区以存储数据。 可以为不同的端点 (区域) 创建数据存储区。 数据引擎实例 (thrift) 和数据存储区的端点不同。 确保选择系统支持的正确端点。
• 有关创建实例时适用于您所在区域的端点 (thrift) 的更多信息,请参阅 Thrift 端点
• 有关当前支持的数据引擎端点的更多信息,请参阅 数据引擎端点

在 Cloud Object Storage中存储数据

生成数据并将其存储在云对象存储器中。 运行以下常规 PySpark 应用程序 (在此示例中称为 generate-and-store-data.py ),该应用程序将 Parquet 数据存储在 IBM Cloud Object Storage上的某个位置。

示例:

输入:

from pyspark.sql import SparkSession

def init_spark():
    spark = SparkSession.builder.appName("dataengine-generate-store-parquet-data").getOrCreate()
    sc = spark.sparkContext
    return spark,sc

def generate_and_store_data(spark,sc):
    data =[("India","New Delhi"),("France","Paris"),("Lithuania","Vilnius"),("Sweden","Stockholm"),("Switzerland","Bern")]
    columns=["Country","Capital"]
    df=spark.createDataFrame(data,columns)
    df.write.mode("overwrite").parquet("cos://mybucket.mycosservice/countriescapitals.parquet")

def main():
    spark,sc = init_spark()
    generate_and_store_data(spark,sc)
if __name__ == '__main__':
    main()

创建模式

在数据引擎中创建元存储表模式定义。 请注意,将 Data Engine 用作元存储器时,无法使用标准 Spark SQL 语法来创建表。 有两种方法可用于创建表:

  • 从 Data Engine 用户界面或通过使用标准 Data Engine API (请参阅 Data Engine 服务 REST V3 API) 或 Python SDK (请参阅 ibmcloudsql)。

    示例:

    输入:

        CREATE TABLE COUNTRIESCAPITALS (Country string,Capital string) 
        USING PARQUET 
        LOCATION cos://us-south/mybucket/countriescapitals.parquet
    

    在以上示例中,COS 存储区的位置 (//us-south/mybucket/countriescapitals.parquet) 被视为 us-south,即区域存储区。 如果您正在使用任何其他区域,请从 数据引擎端点 中选择相应的别名。

  • 通过对名为 create_table_data_engine.py 的 PySpark 使用以下代码片段,以编程方式从 PySpark 应用程序中获取:

    示例:

    输入:

        import requests
        import time
        def create_data_engine_table(api_key,crn):
            headers = {
            'Authorization': 'Basic Yng6Yng=',
            }
            data = {
            'apikey': api_key,
            'grant_type': 'urn:ibm:params:oauth:grant-type:apikey',
            }
            response = requests.post('https://iam.cloud.ibm.com/identity/token', headers=headers, data=data)
            token = response.json()['access_token']
    
            headers_token = {
            'Accept': 'application/json',
            'Authorization': f"Bearer {token}",
            }
            params = {
            'instance_crn': crn,
            }
            json_data = {
            'statement': 'CREATE TABLE COUNTRIESCAPITALS (Country string,Capital string) USING PARQUET LOCATION cos://us-south/mybucket/countriescapitals.parquet',
            }
            response = requests.post('https://api.dataengine.cloud.ibm.com/v3/sql_jobs', params=params, headers=headers_token, json=json_data)
            job_id = response.json()['job_id']
            time.sleep(10)
            response = requests.get(f'https://api.dataengine.cloud.ibm.com/v3/sql_jobs/{job_id}', params=params, headers=headers_token)
            if(response.json()['status']=='completed'):
                print(response.json())
    

    在以上示例中,COS 存储区的位置 (cos://us-south/mybucket/countriescapitals.parquet) 被视为 us-south,即区域存储区。 如果您正在使用任何其他区域,请从 数据引擎端点 中选择相应的别名。

    上述应用程序 create_table_data_engine_payload.json 的有效内容还需要提供具有精确标准 Data Engine 别名 (在本例中为 "us-south") 的 Data Engine 凭证。

    示例:

    输入:

        {
            "application_details": {
                "conf": {
                    "spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME",
                    "spark.hadoop.fs.cos.us-south.access.key": "CHANGEME",
                    "spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME"
                },
            "application": "cos://mybucket.us-south/create_table_data_engine.py",
            "arguments": ["<CHANGEME-CRN-DATA-ENGINE-INSTANCE>","<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>"]
            }
        }
    

    参数值:

    在以上示例中,将考虑来自 us-south 的区域 COS 存储区。 如果您正在使用任何其他区域,请从 数据引擎端点 中选择相应的别名。

    确保选择标准别名。

通过传递 Data Engine 参数的完整列表从表中读取数据

您可以使用 SQL 查询从元存储表中读取数据。

在名为 select_query_data_engine.py 的以下应用程序中使用 Spark SQL 从表中读取数据:

from pyspark.sql import SparkSession
import time

def init_spark():
spark = SparkSession.builder.appName("dataengine-table-select-test").getOrCreate()
sc = spark.sparkContext
return spark,sc

def select_query_data_engine(spark,sc):
tablesDF=spark.sql("SHOW TABLES")
tablesDF.show()
statesDF=spark.sql("SELECT * from COUNTRIESCAPITALS");
statesDF.show()

def main():
spark,sc = init_spark()
select_query_data_engine(spark,sc)

if __name__ == '__main__':
main()

请注意,要使 SELECT 命令起作用,必须将 IBM Cloud Object Storage 标识作为标准 Data Engine 别名之一传递,在此示例中,我们已使用 us-south。 如果未传递期望的错误,那么可能会看到以下错误: Configuration parse exception: Access KEY is empty. Please provide valid access key

select_query_data_engine_payload.json:

{
        "application_details": {
            "conf": {
                "spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME",
                "spark.hadoop.fs.cos.us-south.access.key": "CHANGEME",
                "spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME",
                "spark.hive.metastore.truststore.password" : "changeit",
                "spark.hive.execution.engine":"spark",
                "spark.hive.metastore.client.plain.password":"APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE",
                "spark.hive.metastore.uris":"thrift://catalog.us.dataengine.cloud.ibm.com:9083",
                "spark.hive.metastore.client.auth.mode":"PLAIN",
                "spark.hive.metastore.use.SSL":"true",
                "spark.hive.stats.autogather":"false",
                "spark.hive.metastore.client.plain.username":"<CHANGEME-CRN-DATA-ENGINE-INSTANCE>",
                # for spark 3.3 and spark 3.4
                "spark.hive.metastore.truststore.path":"/opt/ibm/jdk/lib/security/cacerts",
                "spark.sql.catalogImplementation":"hive",
                "spark.sql.hive.metastore.jars":"/opt/ibm/connectors/data-engine/hms-client/*",
                "spark.sql.hive.metastore.version":"3.0",
                "spark.sql.warehouse.dir":"file:///tmp",
                "spark.sql.catalogImplementation":"hive",
                "spark.hadoop.metastore.catalog.default":"spark"
            },
            "application": "cos://mybucket.us-south/select_query_data_engine.py"
        }
    }

在以上示例中,将考虑来自 us-south 的区域 COS 存储区。 如果您正在使用任何其他区域,请从 数据引擎端点 中选择相应的别名。 此外,还提供了元存储 URL 作为 thrift://catalog.us.dataengine.cloud.ibm.com:9083。 有关其他适用端点 (thrift) 的更多信息,请参阅 Thrift 端点

参数值:

  • CRN-DATA-ENGINE-INSTANCE: 指定数据引擎实例的 crn。

确保选择标准别名。

使用更简单的便捷性 API 从表中读取数据

如果要通过在应用程序中指定 IBM Cloud Data Engine 连接 API 来快速测试 Hive metastore,那么可以使用以下 PySpark 示例中显示的便利 API。

在此示例中,不需要将任何 IBM Cloud Data Engine Hive 元存储参数传递到应用程序。 调用 SparkSessionWithDataengine.enableDataengine 将初始化与 IBM Cloud Data Engine 的连接,而不使用其他 IBM Cloud Data Engine Hive metastore 参数。

dataengine-job-convenience_api.py:

from dataengine import SparkSessionWithDataengine
from pyspark.sql import SQLContext
import sys
 from pyspark.sql import SparkSession
import time

def dataengine_table_test(spark,sc):
  tablesDF=spark.sql("SHOW TABLES")
  tablesDF.show()
  statesDF=spark.sql("SELECT * from COUNTRIESCAPITALS");
  statesDF.show()

def main():
  if __name__ == '__main__':
    if len (sys.argv) < 2:
        exit(1)
    else:
        crn = sys.argv[1]
        apikey = sys.argv[2]
        session_builder = SparkSessionWithDataengine.enableDataengine(crn, apikey, "public", "/opt/ibm/connectors/data-engine/hms-client")
        spark = session_builder.appName("Spark DataEngine integration test").getOrCreate()
        sc = spark.sparkContext
        dataengine_table_test (spark,sc)

if __name__ == '__main__':
  main()

以下是方便 dataengine-job-convenience_api.py 的有效内容:

示例:

输入:

{
    "application_details": {
        "conf": {
            "spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME",
            "spark.hadoop.fs.cos.us-south.access.key": "CHANGEME",
            "spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME"
        }
        "application": "cos://mybucket.us-south/dataengine-job-convenience_api.py",
        "arguments": ["<CHANGEME-CRN-DATA-ENGINE-INSTANCE>","<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>"]
    }
}

参数值: 在以上示例中,将考虑来自 us-south 的区域 COS 存储区。 如果您正在使用任何其他区域,请从 数据引擎端点 中选择相应的别名。

确保选择标准别名。

云 Object Storage 端点

您的云 Object Storage 实例具有其中一个受支持的端点。Data Engine 支持所有 公共和专用 Object Storage 端点。 为了节省空间,您可以使用显示的别名来代替完整端点名称。

用于连接端点 (跨区域域中的特定端点,例如 dal-us-geo) 的别名被视为旧的别名。 他们继续工作,直到另行通知,但计划在未来某个时间不推荐使用。 要进行准备,请更新应用程序以使用相应跨区域端点的别名 (例如,us-geo)。

Data Engine 始终使用内部端点与 Object Storage进行交互,即使在查询中指定了外部端点也是如此。 查询的结果位置始终指示外部端点名称。 通过 API 以编程方式与 Data Engine 进行交互时,可以使用内部端点名称来读取结果,而不是使用 API 返回的外部端点名称。

下表列出了当前受支持的 Data Engine 端点的一些示例。

表 1. 跨区域端点
跨区域端点名称 别名
s3.us.cloud-object-storage.appdomain.cloud us-geo
s3.eu.cloud-object-storage.appdomain.cloud eu-geo
s3.ap.cloud-object-storage.appdomain.cloud ap-geo
表 2. 区域端点
区域端点名称 别名
s3.eu-de.cloud-object-storage.appdomain.cloud eu-de
s3.eu-gb.cloud-object-storage.appdomain.cloud eu-gb
`s3.us-south.cloud-object-storage.appdomain.cloud us-south