使用 IBM Cloud Databases for PostgreSQL 作为外部元存储
您可以使用 IBM Cloud Databases for PostgreSQL 在 IBM Analytics Engine Spark 集群外部外部化元数据。
-
创建 IBM Cloud Databases for PostgreSQL 实例。 请参阅 Databases for PostgreSQL。
根据您的需求选择配置。 确保为端点配置选择 公用和专用网络。 创建实例和服务实例凭证后,请记下数据库名称,端口,用户名,密码和证书。
-
将 Databases for PostgreSQL 证书上载到维护应用程序代码的 IBM Cloud Object Storage 存储区。
要访问 Databases for PostgreSQL,需要提供客户机证书。 从 Databases for PostgreSQL 实例的服务凭证获取 Base64 解码的证书,并上载该文件 (名称显示为:
postgres.cert
) 到特定 IBM Cloud 位置中的 Object Storage 存储区。 稍后,您将需要下载此证书,并使其在 IBM Analytics Engine 实例 Spark 工作负载中可用,以连接到 metastore -
定制 IBM Analytics Engine 实例以包含 Databases for PostgreSQL 证书。 请参阅 基于脚本的定制。
此步骤定制 IBM Analytics Engine 实例,以使 Databases for PostgreSQL 证书可用于通过库集针对实例运行的所有 Spark 工作负载。
-
将
customization_script.py
从 基于脚本的定制 中的页面上载到 IBM Cloud Object Storage 存储区。 -
运行使用 spark-submit REST API 来定制实例的
postgres-cert-customization-submit.json
。 请注意,代码引用postgres.cert
您已上载到 IBM Cloud Object Storage。{ "application_details": { "application": "/opt/ibm/customization-scripts/customize_instance_app.py", "arguments": ["{\"library_set\":{\"action\":\"add\",\"name\":\"certificate_library_set\",\"script\":{\"source\":\"py_files\",\"params\":[\"https://s3.direct.<CHANGME>.cloud-object-storage.appdomain.cloud\",\"<CHANGEME_BUCKET_NAME>\",\"postgres.cert\",\"<CHANGEME_ACCESS_KEY>\",\"<CHANGEME_SECRET_KEY>\"]}}}"], "py-files": "cos://CHANGEME_BUCKET_NAME.mycosservice/customization_script.py" } }
请注意,库集名称
certificate_library_set
必须与您指定的 Databases for PostgreSQL metastore 连接参数ae.spark.librarysets
的值匹配。
-
-
指定以下 Databases for PostgreSQL 元存储连接参数作为 Spark 应用程序有效内容的一部分或作为实例缺省值。 确保将专用端点用于以下
"spark.hadoop.javax.jdo.option.ConnectionURL"
参数:"spark.hadoop.javax.jdo.option.ConnectionDriverName": "org.postgresql.Driver", "spark.hadoop.javax.jdo.option.ConnectionUserName": "ibm_cloud_<CHANGEME>", "spark.hadoop.javax.jdo.option.ConnectionPassword": "<CHANGEME>", "spark.sql.catalogImplementation": "hive", "spark.hadoop.hive.metastore.schema.verification": "false", "spark.hadoop.hive.metastore.schema.verification.record.version": "false", "spark.hadoop.datanucleus.schema.autoCreateTables":"true", "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:postgresql://<CHANGEME>.databases.appdomain.CHANGEME/ibmclouddb?sslmode=verify-ca&sslrootcert=/home/spark/shared/user-libs/certificate_library_set/custom/postgres.cert&socketTimeout=30", "ae.spark.librarysets":"certificate_library_set"
-
在 Databases for PostgreSQL 实例中设置 Hive metastore 模式,因为创建实例时 Databases for PostgreSQL 数据库的公共模式中没有表。 此步骤将执行与 Hive 模式相关的 DDL,以便可以将元存储数据存储在这些数据中。 运行名为
postgres-create-schema.py
的以下 Spark 应用程序后,您将看到针对实例的“公共”模式创建的 Hive 元数据表。from pyspark.sql import SparkSession import time def init_spark(): spark = SparkSession.builder.appName("postgres-create-schema").getOrCreate() sc = spark.sparkContext return spark,sc def create_schema(spark,sc): tablesDF=spark.sql("SHOW TABLES") tablesDF.show() time.sleep(30) def main(): spark,sc = init_spark() create_schema(spark,sc) if __name__ == '__main__': main()
-
现在,运行以下名为
postgres-parquet-table-create.py
的脚本,以使用 Databases for PostgreSQL 数据库中的 IBM Cloud Object Storage 中的元数据创建 Parquet 表。from pyspark.sql import SparkSession import time def init_spark(): spark = SparkSession.builder.appName("postgres-create-parquet-table-test").getOrCreate() sc = spark.sparkContext return spark,sc def generate_and_store_data(spark,sc): data =[("1","Romania","Bucharest","81"),("2","France","Paris","78"),("3","Lithuania","Vilnius","60"),("4","Sweden","Stockholm","58"),("5","Switzerland","Bern","51")] columns=["Ranking","Country","Capital","BroadBandSpeed"] df=spark.createDataFrame(data,columns) df.write.parquet("cos://<CHANGEME-BUCKET>.mycosservice/broadbandspeed") def create_table_from_data(spark,sc): spark.sql("CREATE TABLE MYPARQUETBBSPEED (Ranking STRING, Country STRING, Capital STRING, BroadBandSpeed STRING) STORED AS PARQUET location 'cos://CHANGEME-BUCKET.mycosservice/broadbandspeed/'") df2=spark.sql("SELECT * from MYPARQUETBBSPEED") df2.show() def main(): spark,sc = init_spark() generate_and_store_data(spark,sc) create_table_from_data(spark,sc) time.sleep(30) if __name__ == '__main__': main()
-
运行以下名为
postgres-parquet-table-select.py
的 PySpark 脚本,以使用来自其他 Spark 工作负载的元数据来访问此 Parquet 表:from pyspark.sql import SparkSession import time def init_spark(): spark = SparkSession.builder.appName("postgres-select-parquet-table-test").getOrCreate() sc = spark.sparkContext return spark,sc def select_data_from_table(spark,sc): df=spark.sql("SELECT * from MYPARQUETBBSPEED") df.show() def main(): spark,sc = init_spark() select_data_from_table(spark,sc) time.sleep(60) if __name__ == '__main__': main()