IBM Cloud Docs
Spark SQL 数据跳过

Spark SQL 数据跳过

数据跳过功能可根据与每个对象关联的汇总元数据跳过不相关的数据对象或文件,从而显著提高 SQL 查询的性能。

数据跳过功能使用开源 Xskipper 库,通过 Apache Spark 来创建、管理和部署数据跳过索引。 请参阅 Xskipper-可扩展数据跳过框架(Xskipper-An Extensible Data Skipping Framework)

有关如何使用 Xskipper 的更多详细信息,请参阅:

除了 Xskipper 中的开源功能外,还提供了以下功能:

地理空间数据跳过

还可以在使用 时空库中的 地理空间函数 查询地理空间数据集时使用数据跳过。

  • 要在包含纬度和经度列的数据集中通过数据跳过来简化操作,您可以收集纬度和经度列的最小/最大索引。
  • 通过使用内置 Xskipper 插件,可以在具有几何图形列 (UDT 列) 的数据集中使用数据跳过。

以下部分将展示如何使用地理空间插件。

设置地理空间插件

要使用插件,请使用“注册”模块来加载相关实现:

  • 对于 Scala:

    import com.ibm.xskipper.stmetaindex.filter.STMetaDataFilterFactory
    import com.ibm.xskipper.stmetaindex.index.STIndexFactory
    import com.ibm.xskipper.stmetaindex.translation.parquet.{STParquetMetaDataTranslator, STParquetMetadatastoreClauseTranslator}
    import io.xskipper._
    
    Registration.addIndexFactory(STIndexFactory)
    Registration.addMetadataFilterFactory(STMetaDataFilterFactory)
    Registration.addClauseTranslator(STParquetMetadatastoreClauseTranslator)
    Registration.addMetaDataTranslator(STParquetMetaDataTranslator)
    
  • 对于 Python:

    from xskipper import Xskipper
    from xskipper import Registration
    
    Registration.addMetadataFilterFactory(spark, 'com.ibm.xskipper.stmetaindex.filter.STMetaDataFilterFactory')
    Registration.addIndexFactory(spark, 'com.ibm.xskipper.stmetaindex.index.STIndexFactory')
    Registration.addMetaDataTranslator(spark, 'com.ibm.xskipper.stmetaindex.translation.parquet.STParquetMetaDataTranslator')
    Registration.addClauseTranslator(spark, 'com.ibm.xskipper.stmetaindex.translation.parquet.STParquetMetadatastoreClauseTranslator')
    

构建索引

要构建索引,可以使用 addCustomIndex API:

  • 对于 Scala:

    import com.ibm.xskipper.stmetaindex.implicits._
    
    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    
    xskipper
      .indexBuilder()
      // using the implicit method defined in the plugin implicits
      .addSTBoundingBoxLocationIndex("location")
      // equivalent
      //.addCustomIndex(STBoundingBoxLocationIndex("location"))
      .build(reader).show(false)
    
  • 对于 Python:

    xskipper = Xskipper(spark, dataset_path)
    
    # adding the index using the custom index API
    xskipper.indexBuilder() \
            .addCustomIndex("com.ibm.xskipper.stmetaindex.index.STBoundingBoxLocationIndex", ['location'], dict()) \
            .build(reader) \
            .show(10, False)
    

支持的功能

受支持的地理空间函数列表包括以下几项:

  • ST_Distance
  • ST_Intersects
  • ST_Contains
  • ST_Equals
  • ST_Crosses
  • ST_Touches
  • ST_Within
  • ST_Overlaps
  • ST_EnvelopesIntersect
  • ST_IntersectsInterior

加密索引

如果使用 Parquet 元数据存储,那么可以选择使用 Parquet Modular Encryption (PME) 来加密元数据。 要实现此目的,可以将元数据本身存储为 Parquet 数据集,然后使用 PME 对其进行加密。 此功能适用于所有输入格式,例如,以 CSV 格式存储的数据集可以使用 PME 来加密其元数据。

在以下部分中,除非另有说明,否则在提及页脚和列等内容时,它们都与元数据对象相关,而不是与索引数据集中的对象相关。

可通过以下方式对索引加密进行模块化和细化:

  • 每个索引都可以进行加密(使用每个索引的密钥粒度)或保留为明文
  • “页脚 + 对象名称”列:
    • 元数据对象的“页脚”列,它本身就是一个 Parquet 文件,其中包含:
      • 元数据对象的模式,其中显示所收集的所有索引的类型、参数和列名称。 例如,您可以了解 BloomFilter 是在列 city 上定义的,其误报概率为 0.1
      • 原始数据集或表名(如果存在 Hive 元存储表)的完整路径。
    • “对象名称”列存储所有索引对象的名称。
  • “页脚 + 元数据”列可以:
    • 都使用相同密钥进行加密。 这是缺省值。 在此情况下,包含元数据的 Parquet 对象的明文页脚配置处于加密页脚方式,并且“对象名称”列使用所选密钥进行加密。

    • 都为明文。 在此情况下,包含元数据的 Parquet 对象处于明文页脚方式,并且“对象名称”列未加密。

      如果至少有一个索引被标记为“已加密”,那么无论是否启用明文页脚方式,都必须配置页脚密钥。 如果设置了明文页脚,那么页脚密钥仅用于防篡改。 请注意,在这种情况下,“对象名称”列不可防篡改。

      如果配置了页脚密钥,那么必须至少已加密一个索引。

在使用索引加密之前,应检查 PME 上的文档,并确保您熟悉这些概念。

使用索引加密时,只要在任何 Xskipper API 中配置了 key,它就始终是标签 NEVER the key itself

要使用索引加密:

  1. 按照所有这些步骤来确保启用了 PME。 请参阅 PME(PME)

  2. 执行所有 常规 PME 配置,包括密钥管理配置。

  3. 为数据集创建加密元数据:

    1. 按照常规流程创建元数据。
    2. 配置一个页脚密钥。 如果要设置纯文本页脚 + 对象名列,请将 io.xskipper.parquet.encryption.plaintext.footer 设置为 true (请参阅下面的样本)。
    3. IndexBuilder 中,对于要加密的每个索引,添加要用于该索引的密钥的标签。

    要在查询期间使用元数据或刷新现有元数据,除了确保密钥可访问所需的 常规 PME 设置 (与读取加密数据集所需的配置完全相同) 之外,无需其他设置。

样本

以下样本显示了使用名为 k1 的密钥作为页脚 + 对象名密钥的元数据创建,以及使用名为 k2 的密钥作为加密 MinMax for temp 的密钥的元数据创建,同时还创建了 ValueList for city(保留为纯文本)。

  • 对于 Scala:

    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    // Configuring the JVM wide parameters
    val jvmComf = Map(
      "io.xskipper.parquet.mdlocation" -> md_base_location,
      "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION")
    Xskipper.setConf(jvmConf)
    // set the footer key
    val conf = Map(
      "io.xskipper.parquet.encryption.footer.key" -> "k1")
    xskipper.setConf(conf)
    xskipper
      .indexBuilder()
      // Add an encrypted MinMax index for temp
      .addMinMaxIndex("temp", "k2")
      // Add a plaintext ValueList index for city
      .addValueListIndex("city")
      .build(reader).show(false)
    
  • 对于 Python

    xskipper = Xskipper(spark, dataset_path)
    # Add JVM Wide configuration
    jvmConf = dict([
    ("io.xskipper.parquet.mdlocation", md_base_location),
    ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")])
    Xskipper.setConf(spark, jvmConf)
    # configure footer key
    conf = dict([("io.xskipper.parquet.encryption.footer.key", "k1")])
    xskipper.setConf(conf)
    # adding the indexes
    xskipper.indexBuilder() \
            .addMinMaxIndex("temp", "k1") \
            .addValueListIndex("city") \
            .build(reader) \
            .show(10, False)
    

如果想要将“页脚 + 对象名称”保留为明文方式(如上所述),那么需要添加配置参数:

  • 对于 Scala:

    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    // Configuring the JVM wide parameters
    val jvmComf = Map(
      "io.xskipper.parquet.mdlocation" -> md_base_location,
      "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION")
    Xskipper.setConf(jvmConf)
    // set the footer key
    val conf = Map(
      "io.xskipper.parquet.encryption.footer.key" -> "k1",
      "io.xskipper.parquet.encryption.plaintext.footer" -> "true")
    xskipper.setConf(conf)
    xskipper
      .indexBuilder()
      // Add an encrypted MinMax index for temp
      .addMinMaxIndex("temp", "k2")
      // Add a plaintext ValueList index for city
      .addValueListIndex("city")
      .build(reader).show(false)
    
  • 对于 Python

    xskipper = Xskipper(spark, dataset_path)
    # Add JVM Wide configuration
    jvmConf = dict([
    ("io.xskipper.parquet.mdlocation", md_base_location),
    ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")])
    Xskipper.setConf(spark, jvmConf)
    # configure footer key
    conf = dict([("io.xskipper.parquet.encryption.footer.key", "k1"),
    ("io.xskipper.parquet.encryption.plaintext.footer", "true")])
    xskipper.setConf(conf)
    # adding the indexes
    xskipper.indexBuilder() \
            .addMinMaxIndex("temp", "k1") \
            .addValueListIndex("city") \
            .build(reader) \
            .show(10, False)
    

支持旧元数据

Xskipper 无缝支持由 MetaIndexManager 创建的旧元数据。 旧元数据可用于跳过,因为下一次刷新操作会自动执行 Xskipper 元数据更新。

如果在列示索引或运行 describeIndex 操作时在索引前面看到 DEPRECATED_SUPPORTED,那么不推荐使用元数据版本,但仍支持该版本,并且跳过将起作用。 下一次刷新操作将自动更新元数据。