IBM Cloud Docs
使用 IBM Cloud Databases for PostgreSQL 作为外部元存储 

使用 IBM Cloud Databases for PostgreSQL 作为外部元存储 

您可以使用 IBM Cloud Databases for PostgreSQL 在 IBM Analytics Engine Spark 集群外部外部化元数据。

  1. 创建 IBM Cloud Databases for PostgreSQL 实例。 请参阅 Databases for PostgreSQL

    根据您的需求选择配置。 确保为端点配置选择 公用和专用网络。 创建实例和服务实例凭证后,请记下数据库名称,端口,用户名,密码和证书。

  2. 将 Databases for PostgreSQL 证书上载到维护应用程序代码的 IBM Cloud Object Storage 存储区。

    要访问 Databases for PostgreSQL,需要提供客户机证书。 从 Databases for PostgreSQL 实例的服务凭证获取 Base64 解码的证书,并上载该文件 (名称显示为: postgres.cert) 到特定 IBM Cloud 位置中的 Object Storage 存储区。 稍后,您将需要下载此证书,并使其在 IBM Analytics Engine 实例 Spark 工作负载中可用,以连接到 metastore

  3. 定制 IBM Analytics Engine 实例以包含 Databases for PostgreSQL 证书。 请参阅 基于脚本的定制

    此步骤定制 IBM Analytics Engine 实例,以使 Databases for PostgreSQL 证书可用于通过库集针对实例运行的所有 Spark 工作负载。

    1. customization_script.py基于脚本的定制 中的页面上载到 IBM Cloud Object Storage 存储区。

    2. 运行使用 spark-submit REST API 来定制实例的 postgres-cert-customization-submit.json。 请注意,代码引用 postgres.cert 您已上载到 IBM Cloud Object Storage。

      {
          "application_details": 
          {
             "application": "/opt/ibm/customization-scripts/customize_instance_app.py",
          "arguments": ["{\"library_set\":{\"action\":\"add\",\"name\":\"certificate_library_set\",\"script\":{\"source\":\"py_files\",\"params\":[\"https://s3.direct.<CHANGME>.cloud-object-storage.appdomain.cloud\",\"<CHANGEME_BUCKET_NAME>\",\"postgres.cert\",\"<CHANGEME_ACCESS_KEY>\",\"<CHANGEME_SECRET_KEY>\"]}}}"],
          "py-files": "cos://CHANGEME_BUCKET_NAME.mycosservice/customization_script.py"
          }
      } 
      

      请注意,库集名称 certificate_library_set 必须与您指定的 Databases for PostgreSQL metastore 连接参数 ae.spark.librarysets 的值匹配。

  4. 指定以下 Databases for PostgreSQL 元存储连接参数作为 Spark 应用程序有效内容的一部分或作为实例缺省值。 确保将专用端点用于以下 "spark.hadoop.javax.jdo.option.ConnectionURL" 参数:

    "spark.hadoop.javax.jdo.option.ConnectionDriverName": "org.postgresql.Driver",
    "spark.hadoop.javax.jdo.option.ConnectionUserName": "ibm_cloud_<CHANGEME>",
    "spark.hadoop.javax.jdo.option.ConnectionPassword": "<CHANGEME>",
    "spark.sql.catalogImplementation": "hive",
    "spark.hadoop.hive.metastore.schema.verification": "false",
    "spark.hadoop.hive.metastore.schema.verification.record.version": "false",
    "spark.hadoop.datanucleus.schema.autoCreateTables":"true",
    "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:postgresql://<CHANGEME>.databases.appdomain.CHANGEME/ibmclouddb?sslmode=verify-ca&sslrootcert=/home/spark/shared/user-libs/certificate_library_set/custom/postgres.cert&socketTimeout=30",
    "ae.spark.librarysets":"certificate_library_set"
    
  5. 在 Databases for PostgreSQL 实例中设置 Hive metastore 模式,因为创建实例时 Databases for PostgreSQL 数据库的公共模式中没有表。 此步骤将执行与 Hive 模式相关的 DDL,以便可以将元存储数据存储在这些数据中。 运行名为 postgres-create-schema.py 的以下 Spark 应用程序后,您将看到针对实例的“公共”模式创建的 Hive 元数据表。

    from pyspark.sql import SparkSession
    import time
    def init_spark():
      spark = SparkSession.builder.appName("postgres-create-schema").getOrCreate()
      sc = spark.sparkContext
      return spark,sc
    def create_schema(spark,sc):
      tablesDF=spark.sql("SHOW TABLES")
      tablesDF.show()
      time.sleep(30)
    def main():
      spark,sc = init_spark()
      create_schema(spark,sc)
    if __name__ == '__main__':
      main()
    
  6. 现在,运行以下名为 postgres-parquet-table-create.py 的脚本,以使用 Databases for PostgreSQL 数据库中的 IBM Cloud Object Storage 中的元数据创建 Parquet 表。

    from pyspark.sql import SparkSession
    import time
    def init_spark():
      spark = SparkSession.builder.appName("postgres-create-parquet-table-test").getOrCreate()
      sc = spark.sparkContext
      return spark,sc
    def generate_and_store_data(spark,sc):
      data =[("1","Romania","Bucharest","81"),("2","France","Paris","78"),("3","Lithuania","Vilnius","60"),("4","Sweden","Stockholm","58"),("5","Switzerland","Bern","51")]
      columns=["Ranking","Country","Capital","BroadBandSpeed"]
      df=spark.createDataFrame(data,columns)
      df.write.parquet("cos://<CHANGEME-BUCKET>.mycosservice/broadbandspeed")
    def create_table_from_data(spark,sc):
      spark.sql("CREATE TABLE MYPARQUETBBSPEED (Ranking STRING, Country STRING, Capital STRING, BroadBandSpeed STRING) STORED AS PARQUET  location 'cos://CHANGEME-BUCKET.mycosservice/broadbandspeed/'")
      df2=spark.sql("SELECT * from MYPARQUETBBSPEED")
      df2.show()
    def main():
      spark,sc = init_spark()
      generate_and_store_data(spark,sc)
      create_table_from_data(spark,sc)
      time.sleep(30)
    if __name__ == '__main__':
      main()
    
  7. 运行以下名为 postgres-parquet-table-select.py 的 PySpark 脚本,以使用来自其他 Spark 工作负载的元数据来访问此 Parquet 表:

    from pyspark.sql import SparkSession
    import time
    def init_spark():
      spark = SparkSession.builder.appName("postgres-select-parquet-table-test").getOrCreate()
      sc = spark.sparkContext
      return spark,sc
    def select_data_from_table(spark,sc):
      df=spark.sql("SELECT * from MYPARQUETBBSPEED")
      df.show()
    def main():
      spark,sc = init_spark()
      select_data_from_table(spark,sc)
      time.sleep(60)
    if __name__ == '__main__':
     main()