Querying Snowflake Open Catalog using Spark engine

About this task

You can query remote Snowflake Tableflow tables using the IBM® watsonx.data Spark engine through zero-copy data federation. Within watsonx.data Spark, object names are treated as case-insensitive by default. As a result, the use of quoted identifiers is not required when accessing schemas and tables that are created in Snowflake.

For general information about Snowflake Open Catalog integration, see Integrating Snowflake Open Catalog in watsonx.data.

Before you begin

Ensure that the following prerequisites are met before proceeding.

Snowflake requirements:

  • An active Snowflake Open Catalog account
  • Access to a Snowflake Query Workspace
  • A service connection with a valid client ID and client secret for authentication
  • The REST Catalog endpoint associated with your Snowflake Open Catalog account

watsonx.data requirements:

  • A provisioned watsonx.data Spark engine

Storage requirements:

  • A Google Cloud Storage (GCS) bucket located in the same region as the Snowflake Open Catalog account to ensure optimal performance and compatibility

Procedure

  1. Run the following in Snowflake workspace to create catalog integration
CREATE OR REPLACE CATALOG INTEGRATION <catalog_integration_name>
  CATALOG_SOURCE=POLARIS
  TABLE_FORMAT=ICEBERG
  REST_CONFIG = (
    CATALOG_URI = '<catalog_uri>'
    CATALOG_NAME = '<open_catalog_name>'
  )
  REST_AUTHENTICATION = (
    TYPE = OAUTH
    OAUTH_CLIENT_ID = 'abc123xyz'
    OAUTH_CLIENT_SECRET = 'secret456def'
    OAUTH_ALLOWED_SCOPES = ('PRINCIPAL_ROLE:ALL')
  )
  ENABLED=TRUE;
  1. Create an external volume by following the steps:

    1. Navigate to Snowflake UI.
    2. Go to CatalogExternal DataExternal Volumes.
    3. Click create.
    4. Choose cloud provider as Google cloud storage and click Next.
    5. Go to Google Cloud console and create a custom role that has the permissions required to access the GCS bucket. Once completed go to Snowflake UI and click Next.
    6. Configure with GCS bucket details and click Next:
      • Name: Choose a descriptive name
      • Storage Location: gcs://<gcs_bucket_name>/<path>
    7. Once connection is verified, click on create.
  2. Create a catalog-linked database.

CREATE OR REPLACE DATABASE <database_name>
  LINKED_CATALOG = (
    CATALOG = '<catalog_integration_name>'
  )
  EXTERNAL_VOLUME = '<external_volume_name>';
  1. Create a schema if it does not exist.
CREATE SCHEMA IF NOT EXISTS <database_name>.<catalog_integration_name>;

Within watsonx.data Spark, object names are treated as case-insensitive by default. As a result, the use of quoted identifiers is not required when accessing schemas and tables that are created in Snowflake.

  1. Create Iceberg table.
CREATE OR REPLACE ICEBERG TABLE <database_name>.<catalog_name>.<table_name>(
  col1 data_type,
  col2 data_type,
  col3 data_type
);
  1. Insert data into the Iceberg table.
INSERT INTO <database_name>.<catalog_name>.<table_name>
(col1, col2, col3)
VALUES
  ('John', 'Doe', 100, '2024-01-10'),
  ('Jane', 'Smith', 250, '2024-02-15'),
  ('Alice', 'Johnson', 300, '2024-03-20');
  1. Validate the table in Snowflake
SELECT * FROM <database_name>.<catalog_name>.<table_name>;
  1. Follow the steps to access the table in watsonx.data using Spark

Shell Script:

  • Submits Spark applications to the watsonx.data engine via REST API
  • Configures Spark runtime properties
  • References the Python application stored in object storage

Shell Script (Generic Template)

#!/bin/bash
# Spark Iceberg performance test – watsonx.data

app_endpoint="<cpd_cluster_uri>"
engine_id="<engine_id_inside_wxd>"
instance_id="<wxd_instance_id>"
cos_bucket_location="<bucket_location>"
cos_file_path="<file_name>"

app_location="${cos_bucket_location}${cos_file_path}"

app_name="spark_gcs_sf1_$(date +%s)"

confVal='{
  "spark.hadoop.fs.cos.<bucketname>.endpoint":"<endpoint_to_the_bucket>",
  "spark.hadoop.fs.cos.<bucketname>.access.key":"<bucket_access_key>",
  "spark.hadoop.fs.cos.<bucketname>.secret.key":"<bucket_secret_key>",
  "spark.hadoop.google.cloud.auth.service.account.enable": "true",
  "spark.hadoop.google.cloud.auth.service.account.json.keyfile": "<path_to_the_json_key_file_mounted>",
  "spark.sql.parquet.enableVectorizedReader": "false",
  "spark.sql.iceberg.vectorization.enabled": "false",
  "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
  "spark.kryo.registrationRequired": "false",
  "spark.kryo.unsafe": "false",
  "spark.sql.adaptive.enabled": "false",
  "spark.sql.adaptive.coalescePartitions.enabled": "false",
  "spark.sql.catalog.spark_sf_catalog.io-impl": "org.apache.iceberg.hadoop.HadoopFileIO",
  "spark.eventLog.enabled": "true",
  "spark.sql.catalog.spark_sf_catalog.uri": "<snowflake_catalog_uri>",
  "spark.sql.catalog.spark_sf_catalog.warehouse": "<snowflake_catalog_name>",
  "spark.sql.catalog.spark_sf_catalog.scope": "PRINCIPAL_ROLE:<principal_role_of_the_catalog>",
  "spark.sql.catalog.spark_sf_catalog.type": "rest",
  "spark.sql.catalog.spark_sf_catalog": "org.apache.iceberg.spark.SparkCatalog",
  "spark.sql.defaultCatalog": "<catalog_name>",
  "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "spark.sql.catalog.spark_sf_catalog.credential": "<client_id>:<client_secret>"
}'

Python Script:

  • Initializes Spark session
  • Connects to the Iceberg REST catalog
  • Queries and validates table data

Python script (Generic template)

from pyspark.sql import SparkSession
from pyspark import SparkConf
import os
import logging
from typing import Optional

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Constants
CATALOG_NAME = "<catalog_name>"  # Configured in job submission script as defaultCatalog
SCHEMA_NAME = "<schema_name>"
TABLE_NAME = "<table_name>"
GCS_LOCATION = "<gcs_bucket_location>"
CONFIG_FILE_PATH = "<path_to_the_json_key_file>"


def init_spark() -> SparkSession:
    """
    Initialize Spark session with GCS configuration.

    Returns:
        SparkSession: Configured Spark session
    """
    logger.info("Initializing Spark session...")

    conf = SparkConf()

    spark = SparkSession.builder \
        .appName("Snowflake-GCS-Integration") \
        .config(conf=conf) \
        .getOrCreate()

    logger.info("Spark session initialized successfully")
    return spark


def query_table(spark: SparkSession) -> None:
    """
    Query the Snowflake table through Spark.

    Args:
        spark: Active Spark session
    """
    logger.info(f"Querying table: {CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}")

    # Query the table
    df = spark.sql(f"SELECT * FROM {CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}")

    # Show results
    logger.info("Query results:")
    df.show()

    # Get row count
    count = df.count()
    logger.info(f"Total rows: {count}")


def main():
    """
    Main execution function.
    """
    try:
        # Initialize Spark session
        spark = init_spark()

        # Query the table
        query_table(spark)

        logger.info("Job completed successfully")

    except Exception as e:
        logger.error(f"Job failed with error: {str(e)}")
        raise
    finally:
        if 'spark' in locals():
            spark.stop()
            logger.info("Spark session stopped")


if __name__ == "__main__":
    main()

Results

You can now query Iceberg tables from Snowflake Open Catalog using Spark. The queries execute directly on the data in the external storage location without copying data into watsonx.data.

Example queries and outputs

Example Results:

+---------------+--------------+------------+-----------------+
|FIRST_NAME     |LAST_NAME     |AMOUNT      |CREATE_DATE      |
+---------------+--------------+------------+-----------------+
|John           |Doe           |100         |2024-01-10       |
|Jane           |Smith         |250         |2024-02-15       |
|Alice          |Johnson       |300         |2024-03-20       |
+---------------+--------------+------------+-----------------+

Outcome

The setup enables direct, zero-copy querying of Snowflake Iceberg tables from watsonx.data, eliminating the need for data replication.

Related information