IBM Cloud Docs
Using IBM Cloud Databases for PostgreSQL as external metastore 

Using IBM Cloud Databases for PostgreSQL as external metastore 

You can use IBM Cloud Databases for PostgreSQL to externalize metadata outside the IBM Analytics Engine Spark cluster.

  1. Create an IBM Cloud Databases for PostgreSQL instance. See Databases for PostgreSQL.

    Choose the configurations based on your requirements. Make sure to choose Both public & private network for the endpoint configuration. After you have created the instance and the service instance credentials, make a note of the database name, port, user name, password and certificate.

  2. Upload the Databases for PostgreSQL certificate to an IBM Cloud Object Storage bucket where you are maintaining your application code.

    To access Databases for PostgreSQL, you need to provide a client certificate. Get the Base64 decoded certificate from the service credentials of the Databases for PostgreSQL instance and upload the file (name it say, postgres.cert) to a Object Storage bucket in a specific IBM Cloud location. Later you will need to download this certificate and make it available in the IBM Analytics Engine instance Spark workloads for connecting to the metastore

  3. Customize the IBM Analytics Engine instance to include the Databases for PostgreSQL certificate. See Script based customization.

    This step customizes the IBM Analytics Engine instance to make the Databases for PostgreSQL certificate available to all Spark workloads run against the instance through the library set.

    1. Upload the customization_script.py from the page in Script based customization to a IBM Cloud Object Storage bucket.

    2. Run postgres-cert-customization-submit.json that uses the spark-submit REST API to customize the instance. Note that the code references postgres.cert that you uploaded to IBM Cloud Object Storage.

      {
          "application_details": 
          {
             "application": "/opt/ibm/customization-scripts/customize_instance_app.py",
          "arguments": ["{\"library_set\":{\"action\":\"add\",\"name\":\"certificate_library_set\",\"script\":{\"source\":\"py_files\",\"params\":[\"https://s3.direct.<CHANGME>.cloud-object-storage.appdomain.cloud\",\"<CHANGEME_BUCKET_NAME>\",\"postgres.cert\",\"<CHANGEME_ACCESS_KEY>\",\"<CHANGEME_SECRET_KEY>\"]}}}"],
          "py-files": "cos://CHANGEME_BUCKET_NAME.mycosservice/customization_script.py"
          }
      } 
      

      Note that the library set name certificate_library_set must match the value of the Databases for PostgreSQL metastore connection parameter ae.spark.librarysets that you specified.

  4. Specify the following Databases for PostgreSQL metastore connection parameters as part of the Spark application payload or as instance defaults. Make sure that you use the private endpoint for the "spark.hadoop.javax.jdo.option.ConnectionURL" parameter below:

    "spark.hadoop.javax.jdo.option.ConnectionDriverName": "org.postgresql.Driver",
    "spark.hadoop.javax.jdo.option.ConnectionUserName": "ibm_cloud_<CHANGEME>",
    "spark.hadoop.javax.jdo.option.ConnectionPassword": "<CHANGEME>",
    "spark.sql.catalogImplementation": "hive",
    "spark.hadoop.hive.metastore.schema.verification": "false",
    "spark.hadoop.hive.metastore.schema.verification.record.version": "false",
    "spark.hadoop.datanucleus.schema.autoCreateTables":"true",
    "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:postgresql://<CHANGEME>.databases.appdomain.CHANGEME/ibmclouddb?sslmode=verify-ca&sslrootcert=/home/spark/shared/user-libs/certificate_library_set/custom/postgres.cert&socketTimeout=30",
    "ae.spark.librarysets":"certificate_library_set"
    
  5. Set up the Hive metastore schema in the Databases for PostgreSQL instance because there are no tables in the public schema of Databases for PostgreSQL database when you create the instance. This step executes the Hive schema related DDL so that metastore data can be stored in them. After running the following Spark application called postgres-create-schema.py, you will see the Hive metadata tables created against the "public" schema of the instance.

    from pyspark.sql import SparkSession
    import time
    def init_spark():
      spark = SparkSession.builder.appName("postgres-create-schema").getOrCreate()
      sc = spark.sparkContext
      return spark,sc
    def create_schema(spark,sc):
      tablesDF=spark.sql("SHOW TABLES")
      tablesDF.show()
      time.sleep(30)
    def main():
      spark,sc = init_spark()
      create_schema(spark,sc)
    if __name__ == '__main__':
      main()
    
  6. Now run the following script called postgres-parquet-table-create.py to create a Parquet table with metadata from IBM Cloud Object Storage in the Databases for PostgreSQL database.

    from pyspark.sql import SparkSession
    import time
    def init_spark():
      spark = SparkSession.builder.appName("postgres-create-parquet-table-test").getOrCreate()
      sc = spark.sparkContext
      return spark,sc
    def generate_and_store_data(spark,sc):
      data =[("1","Romania","Bucharest","81"),("2","France","Paris","78"),("3","Lithuania","Vilnius","60"),("4","Sweden","Stockholm","58"),("5","Switzerland","Bern","51")]
      columns=["Ranking","Country","Capital","BroadBandSpeed"]
      df=spark.createDataFrame(data,columns)
      df.write.parquet("cos://<CHANGEME-BUCKET>.mycosservice/broadbandspeed")
    def create_table_from_data(spark,sc):
      spark.sql("CREATE TABLE MYPARQUETBBSPEED (Ranking STRING, Country STRING, Capital STRING, BroadBandSpeed STRING) STORED AS PARQUET  location 'cos://CHANGEME-BUCKET.mycosservice/broadbandspeed/'")
      df2=spark.sql("SELECT * from MYPARQUETBBSPEED")
      df2.show()
    def main():
      spark,sc = init_spark()
      generate_and_store_data(spark,sc)
      create_table_from_data(spark,sc)
      time.sleep(30)
    if __name__ == '__main__':
      main()
    
  7. Run the following PySpark script called postgres-parquet-table-select.py to access this Parquet table with metadata from another Spark workload:

    from pyspark.sql import SparkSession
    import time
    def init_spark():
      spark = SparkSession.builder.appName("postgres-select-parquet-table-test").getOrCreate()
      sc = spark.sparkContext
      return spark,sc
    def select_data_from_table(spark,sc):
      df=spark.sql("SELECT * from MYPARQUETBBSPEED")
      df.show()
    def main():
      spark,sc = init_spark()
      select_data_from_table(spark,sc)
      time.sleep(60)
    if __name__ == '__main__':
     main()