Using IBM Cloud Data Engine as external metastore
Deprecated
IBM Cloud Data Engine is IBM Cloud's central service for data lakes. It provides stream ingestion, data preparation, ETL, and data query from IBM Cloud Object Storage and Kafka. It also manages tables and views in a catalog that is compatible with Hive metastore and other big data engines and services can connect to it. See Overview of IBM Cloud Data Engine.
Each instance of IBM Cloud Data Engine includes a database catalog that you can use to register and manage table definitions for your data on IBM Cloud Object Storage. Catalog syntax is compatible with Hive metastore syntax. You can use IBM Cloud Data Engine to externalize metadata outside the IBM Analytics Engine Spark cluster.
Pre-requisites
The following are the pre-requisites:
- Creating IBM Cloud Data Engine instance
- Storing data in Cloud Object Storage
- Creating schema
Creating IBM Cloud Data Engine instance
Create an IBM Cloud Data Engine instance by using the Standard plan. See Data Engine.
After you have provisioned the Data Engine instance:
- Make a note of the CRN of the instance.
- Create an account-level API key or service ID level API key with access to the instance.
- This service ID should be granted access to both the Data Engine instance as well as the IBM Cloud Object Storage bucket.
You can then configure your IBM Analytics Engine instance to use the default metastore configuration either at instance level or at application level as needed.
IBM Cloud Data Engine supports creating instances for different endpoints(location). Within an instance, different IBM Cloud Object Storage buckets are created to store data. The data buckets can be created for different end points(region).
The endpoints for the data engine instance(thrift) and the data bucket are different. Ensure that you select the correct endpoints that are supported by the system.
• For more information about the applicable endpoint(thrift) for your
region while creating instance, see Thrift endpoint.
• For more information on the currently supported data engine endpoints, see
Data engine endpoints.
Storing data in Cloud Object Storage
Generate and store data in cloud object storage. Run the following regular PySpark application, called generate-and-store-data.py
in this example, which stores Parquet data in some location on IBM Cloud Object Storage.
Example:
Enter:
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("dataengine-generate-store-parquet-data").getOrCreate()
sc = spark.sparkContext
return spark,sc
def generate_and_store_data(spark,sc):
data =[("India","New Delhi"),("France","Paris"),("Lithuania","Vilnius"),("Sweden","Stockholm"),("Switzerland","Bern")]
columns=["Country","Capital"]
df=spark.createDataFrame(data,columns)
df.write.mode("overwrite").parquet("cos://mybucket.mycosservice/countriescapitals.parquet")
def main():
spark,sc = init_spark()
generate_and_store_data(spark,sc)
if __name__ == '__main__':
main()
Creating schema
Create the metastore table schema definition in the data engine. Note that you can't use standard Spark SQL syntax to create tables when using Data Engine as a metastore. There are two ways that you can use to create a table:
-
From the Data Engine user interface or by using, the standard Data Engine API (see Data Engine service REST V3 API) or Python SDK (see ibmcloudsql).
Example:
Enter:
CREATE TABLE COUNTRIESCAPITALS (Country string,Capital string) USING PARQUET LOCATION cos://us-south/mybucket/countriescapitals.parquet
In the above example, the location (//us-south/mybucket/countriescapitals.parquet) of the COS bucket is considered as
us-south
, which is the regional bucket. If you are using any other region, select the corresponding alias from the Data engine endpoints. -
Programmatically from within your PySpark application by using the following code snippet for PySpark called
create_table_data_engine.py
:Example:
Enter:
import requests import time def create_data_engine_table(api_key,crn): headers = { 'Authorization': 'Basic Yng6Yng=', } data = { 'apikey': api_key, 'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', } response = requests.post('https://iam.cloud.ibm.com/identity/token', headers=headers, data=data) token = response.json()['access_token'] headers_token = { 'Accept': 'application/json', 'Authorization': f"Bearer {token}", } params = { 'instance_crn': crn, } json_data = { 'statement': 'CREATE TABLE COUNTRIESCAPITALS (Country string,Capital string) USING PARQUET LOCATION cos://us-south/mybucket/countriescapitals.parquet', } response = requests.post('https://api.dataengine.cloud.ibm.com/v3/sql_jobs', params=params, headers=headers_token, json=json_data) job_id = response.json()['job_id'] time.sleep(10) response = requests.get(f'https://api.dataengine.cloud.ibm.com/v3/sql_jobs/{job_id}', params=params, headers=headers_token) if(response.json()['status']=='completed'): print(response.json())
In the above example, the location (
cos://us-south/mybucket/countriescapitals.parquet
) of the COS bucket is considered asus-south
, which is the regional bucket. If you are using any other region, select the corresponding alias from the Data engine endpoints.The payload for the above application
create_table_data_engine_payload.json
also needs to provide the Data Engine credentials with the exact standard Data Engine alias, in this case: "us-south".Example:
Enter:
{ "application_details": { "conf": { "spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME", "spark.hadoop.fs.cos.us-south.access.key": "CHANGEME", "spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME" }, "application": "cos://mybucket.us-south/create_table_data_engine.py", "arguments": ["<CHANGEME-CRN-DATA-ENGINE-INSTANCE>","<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>"] } }
Parameter values:
In the above example, the regional COS bucket from
us-south
is considered. If you are using any other region, select the corresponding alias from the Data engine endpoints.Make sure that you select the standard aliases.
Reading data from table by passing full list of Data Engine parameters
You can read the data from the metastore table using the SQL querry.
Read the data from the table by using the Spark SQL in the following application called select_query_data_engine.py
:
from pyspark.sql import SparkSession
import time
def init_spark():
spark = SparkSession.builder.appName("dataengine-table-select-test").getOrCreate()
sc = spark.sparkContext
return spark,sc
def select_query_data_engine(spark,sc):
tablesDF=spark.sql("SHOW TABLES")
tablesDF.show()
statesDF=spark.sql("SELECT * from COUNTRIESCAPITALS");
statesDF.show()
def main():
spark,sc = init_spark()
select_query_data_engine(spark,sc)
if __name__ == '__main__':
main()
Note that for the SELECT command to work, you must pass the IBM Cloud Object Storage identifiers as one of the standard Data Engine aliases, in this example, we have used us-south
. If you do not pass the expected ones, you might
see the following error: Configuration parse exception: Access KEY is empty. Please provide valid access key
.
select_query_data_engine_payload.json
:
{
"application_details": {
"conf": {
"spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME",
"spark.hadoop.fs.cos.us-south.access.key": "CHANGEME",
"spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME",
"spark.hive.metastore.truststore.password" : "changeit",
"spark.hive.execution.engine":"spark",
"spark.hive.metastore.client.plain.password":"APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE",
"spark.hive.metastore.uris":"thrift://catalog.us.dataengine.cloud.ibm.com:9083",
"spark.hive.metastore.client.auth.mode":"PLAIN",
"spark.hive.metastore.use.SSL":"true",
"spark.hive.stats.autogather":"false",
"spark.hive.metastore.client.plain.username":"<CHANGEME-CRN-DATA-ENGINE-INSTANCE>",
# for spark 3.4
"spark.hive.metastore.truststore.path":"/opt/ibm/jdk/lib/security/cacerts",
"spark.sql.catalogImplementation":"hive",
"spark.sql.hive.metastore.jars":"/opt/ibm/connectors/data-engine/hms-client/*",
"spark.sql.hive.metastore.version":"3.0",
"spark.sql.warehouse.dir":"file:///tmp",
"spark.sql.catalogImplementation":"hive",
"spark.hadoop.metastore.catalog.default":"spark"
},
"application": "cos://mybucket.us-south/select_query_data_engine.py"
}
}
In the above example, the regional COS bucket from us-south
is considered. If you are using any other region, select the corresponding alias from the Data engine endpoints. Also, the metastore URL is provided
as thrift://catalog.us.dataengine.cloud.ibm.com:9083
. For more information about other applicable endpoints(thrift), see Thrift endpoint.
Parameter value:
- CRN-DATA-ENGINE-INSTANCE: specify the crn of the data engine instance.
Make sure that you select the standard aliases.
Read Data from table using simpler Convenience API
If you want to do a quick test of the Hive metastore by specifying IBM Cloud Data Engine connection API in your application, you can use the convenience API shown in the following PySpark example.
In this example, there is no need to pass any IBM Cloud Data Engine Hive metastore parameters to your application. The call to SparkSessionWithDataengine.enableDataengine
initializes the connections to IBM Cloud Data Engine without
the additional IBM Cloud Data Engine Hive metastore parameters.
dataengine-job-convenience_api.py:
from dataengine import SparkSessionWithDataengine
from pyspark.sql import SQLContext
import sys
from pyspark.sql import SparkSession
import time
def dataengine_table_test(spark,sc):
tablesDF=spark.sql("SHOW TABLES")
tablesDF.show()
statesDF=spark.sql("SELECT * from COUNTRIESCAPITALS");
statesDF.show()
def main():
if __name__ == '__main__':
if len (sys.argv) < 2:
exit(1)
else:
crn = sys.argv[1]
apikey = sys.argv[2]
session_builder = SparkSessionWithDataengine.enableDataengine(crn, apikey, "public", "/opt/ibm/connectors/data-engine/hms-client")
spark = session_builder.appName("Spark DataEngine integration test").getOrCreate()
sc = spark.sparkContext
dataengine_table_test (spark,sc)
if __name__ == '__main__':
main()
The following is the payload for the convenience dataengine-job-convenience_api.py
:
Example:
Enter:
{
"application_details": {
"conf": {
"spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME",
"spark.hadoop.fs.cos.us-south.access.key": "CHANGEME",
"spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME"
}
"application": "cos://mybucket.us-south/dataengine-job-convenience_api.py",
"arguments": ["<CHANGEME-CRN-DATA-ENGINE-INSTANCE>","<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>"]
}
}
Parameter values: In the above example, the regional COS bucket from us-south
is considered. If you are using any other region, select the corresponding alias from the Data engine endpoints.
Make sure that you select the standard aliases.
Cloud Object Storage endpoints
Your Cloud Object Storage instance has one of the supported endpoints. Data Engine supports all public and private Object Storage endpoints. To save space, you can use the alias that is shown instead of the full endpoint name.
Aliases to tethering endpoints (specific endpoints within cross region domains, for example, dal-us-geo
) are considered legacy. They continue to work until further notice but are planned to be deprecated sometime in the future.
To be prepared, update your applications to use the alias of the corresponding cross region endpoint (for example, us-geo
).
Data Engine always uses the internal endpoint to interact with Object Storage, even if an external endpoint was specified in the query. The result location for a query always indicates the external endpoint name. When you interact with Data Engine programmatically through the API, you can use the internal endpoint name to read results instead of the external endpoint name that is returned by the API.
The following tables list some examples of currently supported Data Engine endpoints.
Cross region endpoint name | Alias |
---|---|
s3.us.cloud-object-storage.appdomain.cloud |
us-geo |
s3.eu.cloud-object-storage.appdomain.cloud |
eu-geo |
s3.ap.cloud-object-storage.appdomain.cloud |
ap-geo |
Regional endpoint name | Alias |
---|---|
s3.eu-de.cloud-object-storage.appdomain.cloud |
eu-de |
s3.eu-gb.cloud-object-storage.appdomain.cloud |
eu-gb |
`s3.us-south.cloud-object-storage.appdomain.cloud | us-south |