JDBC pushdown
IBM Presto can push down the processing of join queries or part of join queries into the connected JDBC data source.
Join operation
Presto transforms join queries into a specific internal representation (join operation) called a 'PlanNode'. By effectively using join operation and query optimization techniques, Presto performs complex join queries efficiently.
Join pushdown
Presto allows the connector to delegate the 'Join operation (PlanNode)' to the underlying JDBC data source. The connector should be able to process the join predicate at the data source level. This process, which is known as 'join pushdown', significantly improves query performance.
Query optimization
Presto analyzes join queries to create an optimal 'Join operation (PlanNode)'. This optimisation process involves inferencing and restructuring the query to improve performance.
The following are some examples for Presto inferencing which makes structural difference in join query:
Presto inferencing | User provided query | Result of query inferencing |
---|---|---|
Inferencing to remove join condition | select * from postgresql.pg.mypg_table1 t1 join postgresql.pg.mypg_table2 t2 on t1.pgfirsttablecolumn=t2.pgsecondtablecolumn where t1.pgfirsttablecolumn=100 |
select * from postgresql.pg.mypg_table1 t1,postgresql.pg.mypg_table2 t2 where t1.pgfirsttablecolumn=100 and t2.pgfirsttablecolumn=100 |
Inferencing to create inner join from right, left or cross join | SELECT e.first_name_varchar, p.project_name_varchar, p.risk_level_tinyint FROM mysql.tm_lakehouse_engine_db_1.employee e CROSS JOIN mysql.tm_lakehouse_engine_db_1.projects p WHERE e.age_tinyint > p.risk_level_tinyint; |
SELECT e.first_name_varchar, p.project_name_varchar, p.risk_level_tinyint FROM mysql.tm_lakehouse_engine_db_1.employee e , mysql.tm_lakehouse_engine_db_1.projects p WHERE e.age_tinyint > p.risk_level_tinyint; |
The left joins and right joins may be internally converted into inner joins and may get pushed down toward a specific connector. Also, there exists a case where the inner joins may not get pushed down toward a specific connector due to some conditions.
Key considerations for Join query pushdown in Presto JDBC data source
Presto validates 'join operation(PlanNode)' specifications to perform 'join pushdown'. These specifications vary for each data source and connector. However, the following are some generic conditions to be met for a join
to be pushed
down in a JDBC connector.
- Connector Support: The JDBC connector must be able to process the join operation. Complex operations involving Presto functions or operators may prevent pushdown.
- Join Type and Conditions: The join should typically be an inner join with at least one common column between the tables. The join conditions should involve compatible data types and operators.
- Database Compatibility: The underlying database must support the specific join operation and data types involved. For more information, see Data types and operators that support join pushdown feature.
- Table Grouping: Tables from the same connector and meeting the required criteria can be grouped for pushdown.
- Configuration: Join pushdown is controlled using a combination of session-level properties.
To enable join pushdown, set the following session flags:
Enables pushdown to eligible data sources but will only work for equi-joins (joins with equality conditions)
SET SESSION optimizer_inner_join_pushdown_enabled = true;
Enables pushdown for non-equi joins (joins with inequality or range-based conditions) to eligible data sources. Needs to be set to true along with the above flag.
SET SESSION optimizer_inequality_join_pushdown_enabled = true;
Enables partial predicate pushdown at the catalog level. This allows pushing down applicable filter conditions to the data source along with join clauses.
SET SESSION <catalogName>.partial_predicate_push_down = true;
For example,
SET SESSION postgresql.partial_predicate_push_down = true;
While this is not mandatory, it is recommended, as certain queries rely on this flag for pushdown to be effective.
For example, when you use some aggregate, math operation or data type conversion along with join query, it is converted to Presto functions and applied to 'join' operation. For any 'join' query that creates intermediate Presto function, that query cannot be handled by the connector and hence push down is not performed.
Example | Query |
---|---|
For query which creates Presto function | abs(int_clumn)=int_column2; int_sum_column=int_value1_column1+int_value1_column2 cast(varchar_20_column, varchar(100) )=varchar100_column |
Verifying join pushdown
To check whether a join query has been pushed down to the underlying data source, you can examine the query's EXPLAIN plan.
Consider the following points to verify whether a join query has been pushed down to the underlying data source:
- If the plan does not show a 'join' operator, it means that a complete pushdown happened.
- If there is less number of join operators than original join query, it means that partial pushdown happened.
- If the number of join operators is same as original query, it means that no pushdown happened.
The following example explains the verifying join pushdown results:
Join query:
SELECT order_id, c_customer_id FROM postgresql.public.orders o INNER JOIN postgresql.public.customer c ON c.c_customer_id=o.customer_id;
Original Presto plan:
- Output[PlanNodeId 9][order_id, c_customer_id] => [order_id:integer, c_customer_id:char(16)]
- RemoteStreamingExchange[PlanNodeId 266][GATHER] => [order_id:integer, c_customer_id:char(16)]
- InnerJoin[PlanNodeId 4][(""customer_id"" = ""c_customer_id"")][$hashvalue, $hashvalue_11] => [order_id:integer, c_customer_id:char(16)]
Distribution: PARTITIONED
- RemoteStreamingExchange[PlanNodeId 264][REPARTITION][$hashvalue] => [customer_id:char(16), order_id:integer, $hashvalue:bigint]
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: ?}
- ScanProject[PlanNodeId 0,326][table = TableHandle {connectorId='postgresql', connectorHandle='JdbcTableHandle{connectorId=postgresql, schemaTableName=public.orders, catalogName=null, schemaName=public, tableName=orders, joinTables=Optional.empty}', layout='Optional[{domains=ALL, additionalPredicate={}}]'}, projectLocality = LOCAL] => [customer_id:char(16), order_id:integer, $hashvalue_10:bigint]
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
$hashvalue_10 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(customer_id), BIGINT'0')) (3:6)
LAYOUT: {domains=ALL, additionalPredicate={}}
order_id := JdbcColumnHandle{connectorId=postgresql, columnName=order_id, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=int4, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (3:6)
customer_id := JdbcColumnHandle{connectorId=postgresql, columnName=customer_id, jdbcTypeHandle=JdbcTypeHandle{jdbcType=1, jdbcTypeName=bpchar, columnSize=16, decimalDigits=0, arrayDimensions=null}, columnType=char(16), nullable=true, comment=Optional.empty} (3:6)
- LocalExchange[PlanNodeId 297][HASH][$hashvalue_11] (c_customer_id) => [c_customer_id:char(16), $hashvalue_11:bigint]
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: ?}
- RemoteStreamingExchange[PlanNodeId 265][REPARTITION][$hashvalue_12] => [c_customer_id:char(16), $hashvalue_12:bigint]
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: ?}
- ScanProject[PlanNodeId 1,327][table = TableHandle {connectorId='postgresql', connectorHandle='JdbcTableHandle{connectorId=postgresql, schemaTableName=public.customer, catalogName=null, schemaName=public, tableName=customer, joinTables=Optional.empty}', layout='Optional[{domains=ALL, additionalPredicate={}}]'}, projectLocality = LOCAL] => [c_customer_id:char(16), $hashvalue_13:bigint]
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
$hashvalue_13 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(c_customer_id), BIGINT'0')) (4:12)
LAYOUT: {domains=ALL, additionalPredicate={}}
c_customer_id := JdbcColumnHandle{connectorId=postgresql, columnName=c_customer_id, jdbcTypeHandle=JdbcTypeHandle{jdbcType=1, jdbcTypeName=bpchar, columnSize=16, decimalDigits=0, arrayDimensions=null}, columnType=char(16), nullable=true, comment=Optional.empty} (4:12)
Pushdown Presto plan:
- Output[PlanNodeId 9][order_id, c_customer_id] => [order_id:integer, c_customer_id:char(16)]
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: ?}
- RemoteStreamingExchange[PlanNodeId 261][GATHER] => [order_id:integer, c_customer_id:char(16)]
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: ?}
- TableScan[PlanNodeId 287][TableHandle {connectorId='postgresql', connectorHandle='JdbcTableHandle{connectorId=postgresql, schemaTableName=public.orders, catalogName=null, schemaName=public, tableName=orders, joinTables=Optional[[JdbcTableHandle{connectorId=postgresql, schemaTableName=public.orders, catalogName=null, schemaName=public, tableName=orders, joinTables=Optional.empty}, JdbcTableHandle{connectorId=postgresql, schemaTableName=public.customer, catalogName=null, schemaName=public, tableName=customer, joinTables=Optional.empty}]]}', layout='Optional[{domains=ALL, additionalPredicate={}}]'}] => [customer_id:char(16), order_id:integer, c_customer_id:char(16)]
Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
LAYOUT: {domains=ALL, additionalPredicate={}}
c_customer_id := JdbcColumnHandle{connectorId=postgresql, columnName=c_customer_id, jdbcTypeHandle=JdbcTypeHandle{jdbcType=1, jdbcTypeName=bpchar, columnSize=16, decimalDigits=0, arrayDimensions=null}, columnType=char(16), nullable=true, comment=Optional.empty} (4:12)
customer_id := JdbcColumnHandle{connectorId=postgresql, columnName=customer_id, jdbcTypeHandle=JdbcTypeHandle{jdbcType=1, jdbcTypeName=bpchar, columnSize=16, decimalDigits=0, arrayDimensions=null}, columnType=char(16), nullable=true, comment=Optional.empty} (3:6)
order_id := JdbcColumnHandle{connectorId=postgresql, columnName=order_id, jdbcTypeHandle=JdbcTypeHandle{jdbcType=4, jdbcTypeName=int4, columnSize=10, decimalDigits=0, arrayDimensions=null}, columnType=integer, nullable=true, comment=Optional.empty} (3:6)
Conditions to enable JDBC join pushdown
JDBC join pushdown makes queries run faster, sometimes up to 10 times faster. However, it is important to use it carefully. If used incorrectly, it may slow down query processing.
The following are the key considerations to enable JDBC join pushdown:
-
If you have only a limited number of records as the join operation results from huge tables:
- The join should return a relatively small subset of records from large tables.
- Ideal join conditions typically return less than 10% of the total records.
-
Avoid Cross Joins:
- If the join criteria lead to cross-join result, then you should not use join pushdown capabilities. Cross joins (which produce the Cartesian product of two tables) can degrade the performance.
-
Database Optimization:
- The database system should be optimized to handle join queries efficiently, especially non-equi-joins. Presto is able to perform join pushdown for equi-join condition (=) and for non-equi-join conditions (<, >, <=, >=, !=, <>). Almost every database is able to handle the equi-join effectively. But not all databases are optimized for non-equi-join conditions.
-
Aggregation:
- Only select queries are pushed down. A query which does aggregation (For example, select count()) is not pushed down and may affect the performance.
You can see performance gains as the number of rows of the table increase and the join result restrict to less than 10% of total records. See the following statistics to understand the behavior:
Join query | Join query | Performance improvement |
---|---|---|
SELECT A.ASSIGNMENT_ID, A.ROLE, A.IS_CURRENT, B.ASSIGNMENT_ID, B.DEFAULT_INT FROM DB2.DB2.ASSIGNMENTS A JOIN DB2.DB2.STUDENT B ON A.ASSIGNMENT_ID = B.ASSIGNMENT_ID; |
When Joining tables: ASSIGNMENTS with 10 million rows and STUDENT with 5000 rows Join Result is 50 rows. |
5x |
SELECT A.ASSIGNMENT_ID, A.ROLE, A.IS_CURRENT, B.ASSIGNMENT_ID, B.DEFAULT_INT FROM DB2.DB2.ASSIGNMENTS A JOIN DB2.DB2.STUDENT B ON A.ASSIGNMENT_ID = B.ASSIGNMENT_ID; |
When Joining tables: ASSIGNMENTS with 20 million rows and STUDENT with 5000 rows Join Result is 50 rows. |
10x |
The statistics that are shown in the table is an approximate example, there is a chance of value variations based on the database and environment.
Data types and operators that support join pushdown
The following data types and operators support join pushdown feature:
Data types | Operators |
---|---|
BigInt | = , <, >, <=, >= , != and <> |
Boolean | =, !=, <> |
Integer | = , <, >, <=, >= , != and <> |
TINYINT | = , <, >, <=, >= , != and <> |
SMALLINT | = , <, >, <=, >= , != and <> |
Floating-point | = , <, >, <=, >= , != and <> |
REAL | = , <, >, <=, >= , != and <> |
DOUBLE | = , <, >, <=, >= , != and <> |
DECIMAL | = , <, >, <=, >= , != and <> |
VARCHAR | = , <, >, <=, >= , != and <> |
CHAR | = , <, >, <=, >= , != and <> |
Federated join pushdown
A federated join is a join query which contains tables from multiple catalogs.
In a federated join, Presto groups tables based on their JDBC catalog to optimize pushdown. However, pushdown is only possible if:
- JDBC data source: The tables are from a JDBC data source.
- Pushdown specifications: The join operation meets the specific requirements for pushdown, such as compatible data types and operators. For more information, see Key considerations for Join query pushdown in Presto JDBC data source.
If a table doesn't meet these criteria, it is excluded from the pushdown and is processed by Presto directly. The following table shows an example for federated join:
Federated join |
---|
SELECTi1.customer_last_name, A.ASSIGNMENT_ID, B.FIRST_NAME, t2.custkey FROMDB2_CATALOG.DB2.ASSIGNMENTS_10_MILLION A JOINDB2_CATALOG.DB2.CUSTOMER_10_MILLION BONA.EMPLOYEE_ID = B.CUST_ID JOINDB2_CATALOG.DB2.JOIN_TABLE_50_ROWS CONA.ASSIGNMENT_ID = C.ASSIGNMENT_ID JOINpostgres_catalog.pg.customer t1onB.CUST_ID = t1.custkey Joinpostgres_catalog.pg.orders t2ONt1.custkey = t2.orderkey Joinpostgres_catalog.pg.products t3ONt3.productkey = t2.productkey JOINiceberg_catalog.ice.customer i1ONi1.customer = B.CUST_ID |
Here, the query has 3 catalogs namely db2_catalog, postgres_catalog and iceberg_catalog. db2_catalog (db2 catalog) has 3 tables postgres_catalog (postgres catalog) has 3 tables iceberg_catalog (iceberg catalog) has 1 table |
For the preceding federated join query, join pushdown optimizer creates Join Operation for the catalogs db2_catalog and posgres_catalog as shown in the following table. As the iceberg_catalog is not a JDBC data source, it will skip and return back to Presto for processing.
Field | Description |
---|---|
Representation of DB2_CATALOG Join Operation that is created by join pushdown optimizer |
Select t2.custkey From DB2_CATALOG.DB2.ASSIGNMENTS_10_MILLION A, DB2_CATALOG.DB2.CUSTOMER_10_MILLION B, DB2_CATALOG.DB2.JOIN_TABLE_50_ROWS C where A.EMPLOYEE_ID = B.CUST_ID and A.ASSIGNMENT_ID = C.ASSIGNMENT_ID; |
Representation of posgres_catalog Join Operation join pushdown optimizer creates |
Select A.ASSIGNMENT_ID, B.FIRST_NAME From posgres_catalog.pg.customer t1, posgres_catalog.pg.orders t2, posgres_catalog.pg.products t3 where t1.custkey = t2.orderkey and t3.productkey = t2.productkey; |
After the optimisation of join pushdown, the preceding federated query is processed by Presto as follows:
select i1.customer_last_name, A.ASSIGNMENT_ID, B.FIRST_NAME, t2.custkey from (result of DB2_CATALOG Join pushdown) join ( result of posgres_catalog Join pushdown) on B.CUST_ID = t1.custkey join iceberg_catalog.ice.customer i1ONi1.customer = B.CUST_ID
List of data sources that support join pushdown
The following data sources support join pushdown feature:
- IBM Db2
- PostgreSQL
- Informix
- IBM Netezza
- MySQL
- Oracle
- SQL Server
- Teradata
- Amazon Redshift
- SingleStore
- Snowflake
- HANA
- Apache Phoenix
Benefits of join pushdown
The following are the benefits of join pushdown feature:
- Improved overall query performance.
- Reduced network traffic between IBM presto and the data source.
- Reduced load on the remote data source.
- Significant cost reduction due to the limited number of databases hit.
Limitations of join pushdown
The following are the limitations of join pushdown feature:
- Dependency on Filter Pushdown: Join pushdown relies on existing filter pushdown capabilities of the underlying database. Any limitations or inefficiencies in filter pushdown impact join pushdown performance.
- Database Compatibility: Join pushdown is limited to queries that the underlying database can understand. Complex queries involving filters, projections, conditions, or special keywords may not be pushed down.
- Self-Join Limitations: The current implementation of self-joins is not optimal and has dependencies on the table handle cache. By default, the table handle cache is turned off in Presto and if it is enabled, then self-join pushdown fails.
Viewing JDBC Pushdown feature in UI
- Log in to the watsonx.data instance.
- Go to Configurations.
- Click the JDBC pushdown tile. The JDBC pushdown details page opens.
- Click the Documentation link to know more about the JDBC pushdown feature in watsonx.data.