外部メタストアとして IBM Cloud Data Engine を使用
非推奨
IBM Cloud Data Engine は、データレイク用の IBM Cloudの中央サービスです。 これは、 IBM Cloud Object Storage および Kafkaからのストリーム取り込み、データ準備、ETL、およびデータ照会を提供します。 また、Hiveメタストアと互換性のあるカタログ内の表およびビューも管理し、他のビッグデータ・エンジンおよびサービスがそれに接続できます。 IBM Cloud Data Engineの概要 を参照してください。
IBM Cloud Data Engine の各インスタンスには、IBM Cloud Object Storage 上にあるデータの表定義を登録および管理するために使用できるデータベース・カタログが含まれています。 カタログ構文は Hive Metastore 構文に対応しています。 IBM Cloud Data Engine を使用して、 IBM Analytics Engine Spark クラスターの外部でメタデータを外部化できます。
前提条件
前提条件は以下のとおりです。
- IBM Cloud Data Engine インスタンスの作成
- Cloud Object Storage へのデータの保管
- スキーマの作成
IBM Cloud Data Engine インスタンスの作成
標準プランを使用して IBM Cloud Data Engine インスタンスを作成します。 Data Engine を参照してください。
Data Engine インスタンスをプロビジョンした後、以下を実行します。
- インスタンスの CRN を書き留めます。
- インスタンスへのアクセス権限を持つアカウント・レベルの API キーまたはサービス ID レベルの API キーを作成します。
- このサービス ID には、 Data Engine インスタンスと IBM Cloud Object Storage バケットの両方に対するアクセス権限を付与する必要があります。
その後、必要に応じて、インスタンス・レベルまたはアプリケーション・レベルでデフォルトのメタストア構成を使用するように IBM Analytics Engine インスタンスを構成できます。
IBM Cloud Data Engine は、さまざまなエンドポイント (ロケーション) のインスタンスの作成をサポートします。 1 つのインスタンス内に、データを保管するためのさまざまな IBM Cloud Object Storage バケットが作成されます。 データ・バケットは、さまざまなエンドポイント (リージョン) に対して作成できます。 データ・エンジン・インスタンス (Thrift) とデータ・バケットのエンドポイントが異なります。 システムでサポートされている正しいエンドポイントを選択していることを確認してください。
• インスタンスの作成時にリージョンに適用できるエンドポイント (Thrift) について詳しくは、 Thrift エンドポイント を参照してください。
• 現在サポートされているデータ・エンジン・エンドポイントについて詳しくは、 データ・エンジン・エンドポイント を参照してください。
Cloud Object Storage
データを生成してクラウド・オブジェクト・ストレージに保管します。 以下の通常の PySpark アプリケーション (この例では generate-and-store-data.py
という名前) を実行します。このアプリケーションは、 IBM Cloud Object Storage上のいずれかの場所に Parquet データを保管します。
例:
次のように入力します。
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("dataengine-generate-store-parquet-data").getOrCreate()
sc = spark.sparkContext
return spark,sc
def generate_and_store_data(spark,sc):
data =[("India","New Delhi"),("France","Paris"),("Lithuania","Vilnius"),("Sweden","Stockholm"),("Switzerland","Bern")]
columns=["Country","Capital"]
df=spark.createDataFrame(data,columns)
df.write.mode("overwrite").parquet("cos://mybucket.mycosservice/countriescapitals.parquet")
def main():
spark,sc = init_spark()
generate_and_store_data(spark,sc)
if __name__ == '__main__':
main()
スキーマの作成
データ・エンジンでメタストア・テーブル・スキーマ定義を作成します。 Data Engine をメタストアとして使用している場合、標準の Spark SQL 構文を使用して表を作成することはできません。 表を作成するには、以下の 2 つの方法があります。
-
Data Engine ユーザー・インターフェースから、または標準の Data Engine API ( Data Engine サービス REST V3 API を参照) または Python SDK ( ibmcloudsql を参照)。
例:
次のように入力します。
CREATE TABLE COUNTRIESCAPITALS (Country string,Capital string) USING PARQUET LOCATION cos://us-south/mybucket/countriescapitals.parquet
上記の例では、COS バケットの場所 (//us-south/mybucket/countriescapitals.parquet) は
us-south
(地域バケット) と見なされます。 他のリージョンを使用している場合は、 データ・エンジン・エンドポイント から対応する別名を選択します。
-
PySpark アプリケーション内からプログラマチックに行うには、
create_table_data_engine.py
という PySpark の以下のコード・スニペットを使用します。例:
次のように入力します。
import requests import time def create_data_engine_table(api_key,crn): headers = { 'Authorization': 'Basic Yng6Yng=', } data = { 'apikey': api_key, 'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', } response = requests.post('https://iam.cloud.ibm.com/identity/token', headers=headers, data=data) token = response.json()['access_token'] headers_token = { 'Accept': 'application/json', 'Authorization': f"Bearer {token}", } params = { 'instance_crn': crn, } json_data = { 'statement': 'CREATE TABLE COUNTRIESCAPITALS (Country string,Capital string) USING PARQUET LOCATION cos://us-south/mybucket/countriescapitals.parquet', } response = requests.post('https://api.dataengine.cloud.ibm.com/v3/sql_jobs', params=params, headers=headers_token, json=json_data) job_id = response.json()['job_id'] time.sleep(10) response = requests.get(f'https://api.dataengine.cloud.ibm.com/v3/sql_jobs/{job_id}', params=params, headers=headers_token) if(response.json()['status']=='completed'): print(response.json())
上記の例では、COS バケットの場所 (
cos://us-south/mybucket/countriescapitals.parquet
) はus-south
(地域バケット) と見なされます。 他のリージョンを使用している場合は、 データ・エンジン・エンドポイント から対応する別名を選択します。上記のアプリケーションのペイロード
create_table_data_engine_payload.json
も、正確な標準 Data Engine 別名 (この場合は「us-south」) を使用して Data Engine 資格情報を提供する必要があります。例:
次のように入力します。
{ "application_details": { "conf": { "spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME", "spark.hadoop.fs.cos.us-south.access.key": "CHANGEME", "spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME" }, "application": "cos://mybucket.us-south/create_table_data_engine.py", "arguments": ["<CHANGEME-CRN-DATA-ENGINE-INSTANCE>","<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>"] } }
パラメーター値:
上記の例では、
us-south
の Regional COS バケットが考慮されます。 他のリージョンを使用している場合は、 データ・エンジン・エンドポイント から対応する別名を選択します。標準別名を選択していることを確認してください。
Data Engine パラメーターの完全なリストを渡すことによる表からのデータの読み取り
SQL 照会を使用して、メタストア・テーブルからデータを読み取ることができます。
select_query_data_engine.py
という名前の以下のアプリケーションで Spark SQL を使用して、表からデータを読み取ります。
from pyspark.sql import SparkSession
import time
def init_spark():
spark = SparkSession.builder.appName("dataengine-table-select-test").getOrCreate()
sc = spark.sparkContext
return spark,sc
def select_query_data_engine(spark,sc):
tablesDF=spark.sql("SHOW TABLES")
tablesDF.show()
statesDF=spark.sql("SELECT * from COUNTRIESCAPITALS");
statesDF.show()
def main():
spark,sc = init_spark()
select_query_data_engine(spark,sc)
if __name__ == '__main__':
main()
SELECT コマンドを機能させるには、 IBM Cloud Object Storage ID を標準の Data Engine 別名の 1 つとして渡す必要があります。この例では、 us-south
を使用しています。 予期したものを渡さないと、 Configuration parse exception: Access KEY is empty. Please provide valid access key
というエラーが表示される場合があります。
select_query_data_engine_payload.json
:
{
"application_details": {
"conf": {
"spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME",
"spark.hadoop.fs.cos.us-south.access.key": "CHANGEME",
"spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME",
"spark.hive.metastore.truststore.password" : "changeit",
"spark.hive.execution.engine":"spark",
"spark.hive.metastore.client.plain.password":"APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE",
"spark.hive.metastore.uris":"thrift://catalog.us.dataengine.cloud.ibm.com:9083",
"spark.hive.metastore.client.auth.mode":"PLAIN",
"spark.hive.metastore.use.SSL":"true",
"spark.hive.stats.autogather":"false",
"spark.hive.metastore.client.plain.username":"<CHANGEME-CRN-DATA-ENGINE-INSTANCE>",
# for spark 3.3 and spark 3.4
"spark.hive.metastore.truststore.path":"/opt/ibm/jdk/lib/security/cacerts",
"spark.sql.catalogImplementation":"hive",
"spark.sql.hive.metastore.jars":"/opt/ibm/connectors/data-engine/hms-client/*",
"spark.sql.hive.metastore.version":"3.0",
"spark.sql.warehouse.dir":"file:///tmp",
"spark.sql.catalogImplementation":"hive",
"spark.hadoop.metastore.catalog.default":"spark"
},
"application": "cos://mybucket.us-south/select_query_data_engine.py"
}
}
上記の例では、 us-south
の Regional COS バケットが考慮されます。 他のリージョンを使用している場合は、 データ・エンジン・エンドポイント から対応する別名を選択します。 また、メタストア URL は、 thrift://catalog.us.dataengine.cloud.ibm.com:9083
として提供されます。 その他の適用可能なエンドポイント
(Thrift) について詳しくは、 Thrift エンドポイント を参照してください。
パラメーター値:
- CRN-DATA-ENGINE-INSTANCE: データ・エンジン・インスタンスの crn を指定します。
標準別名を選択していることを確認してください。
簡便な API を使用して表からデータを読み取る
アプリケーションで IBM Cloud Data Engine 接続 API を指定して Hive メタストアのクイック・テストを行う場合は、以下の PySpark の例に示す便利な API を使用できます。
この例では、 IBM Cloud Data Engine Hive メタストア・パラメーターをアプリケーションに渡す必要はありません。 SparkSessionWithDataengine.enableDataengine
を呼び出すと、追加の IBM Cloud Data Engine Hive メタストア・パラメーターなしで IBM Cloud Data Engine への接続が初期化されます。
dataengine-job-convenience_api.py:
from dataengine import SparkSessionWithDataengine
from pyspark.sql import SQLContext
import sys
from pyspark.sql import SparkSession
import time
def dataengine_table_test(spark,sc):
tablesDF=spark.sql("SHOW TABLES")
tablesDF.show()
statesDF=spark.sql("SELECT * from COUNTRIESCAPITALS");
statesDF.show()
def main():
if __name__ == '__main__':
if len (sys.argv) < 2:
exit(1)
else:
crn = sys.argv[1]
apikey = sys.argv[2]
session_builder = SparkSessionWithDataengine.enableDataengine(crn, apikey, "public", "/opt/ibm/connectors/data-engine/hms-client")
spark = session_builder.appName("Spark DataEngine integration test").getOrCreate()
sc = spark.sparkContext
dataengine_table_test (spark,sc)
if __name__ == '__main__':
main()
以下は、 dataengine-job-convenience_api.py
の利便性のためのペイロードです。
例:
次のように入力します。
{
"application_details": {
"conf": {
"spark.hadoop.fs.cos.us-south.endpoint": "CHANGEME",
"spark.hadoop.fs.cos.us-south.access.key": "CHANGEME",
"spark.hadoop.fs.cos.us-south.secret.key": "CHANGEME"
}
"application": "cos://mybucket.us-south/dataengine-job-convenience_api.py",
"arguments": ["<CHANGEME-CRN-DATA-ENGINE-INSTANCE>","<APIKEY-WITH-ACCESS-TO-DATA-ENGINE-INSTANCE>"]
}
}
パラメーター値: 上記の例では、 us-south
の Regional COS バケットが考慮されます。 他のリージョンを使用している場合は、 データ・エンジン・エンドポイント から対応する別名を選択します。
標準別名を選択していることを確認してください。
クラウド Object Storage エンドポイント
Cloud Object Storage インスタンスに、サポートされているエンドポイントの 1 つがあります。Data Engine は、すべてのパブリックとプライベートの Object Storage エンドポイントをサポートします。 スペースを節約するために、記載している別名を完全なエンドポイント名の代わりに使用することができます。
テザリング・エンドポイントの別名 (dal-us-geo
など、クロス・リージョン・ドメイン内の特定のエンドポイント) はレガシーと見なされます。 追って通知があるまでは機能しますが、いずれ非推奨になる予定です。 その準備として、対応するクロス・リージョン・エンドポイントの別名 (us-geo
など) が使用されるようにアプリケーションを更新してください。
Data Engine は、外部エンドポイントが照会で指定された場合でも、常に内部エンドポイントを使用して Object Storageと対話します。 照会の結果ロケーションは、常に、外部エンドポイント名を指しています。 API を使用してプログラムで Data Engine と対話する場合は、API から返される外部エンドポイント名の代わりに、内部エンドポイント名を使用して結果を読み取ることができます。
以下の表に、現在サポートされている Data Engine エンドポイントの例をいくつか示します。
クロス・リージョン・エンドポイント名 | 別名 |
---|---|
s3.us.cloud-object-storage.appdomain.cloud |
us-geo |
s3.eu.cloud-object-storage.appdomain.cloud |
eu-geo |
s3.ap.cloud-object-storage.appdomain.cloud |
ap-geo |
リージョン別エンドポイント名 | 別名 |
---|---|
s3.eu-de.cloud-object-storage.appdomain.cloud |
eu-de |
s3.eu-gb.cloud-object-storage.appdomain.cloud |
eu-gb |
`s3.us-south.cloud-object-storage.appdomain.cloud | us-south |