Hi ONE DATA Community,
let’s suppose I am using a Database Connection Load processor and I have subsequent filter operations defined in ONE DATA query or Data-Filter processors.
Let’s furthermore assume, that I am using a preconfigured connection to an Oracle DB in the processor.
Are these filters being pushed down to the database by Spark/ONE DATA or is the complete data being loaded over the network into the Spark context and the filters getting applied there?
I just tried it out on one of our customer’s instances with a filter and rowcount, and the count+filter defined in ONE DATA is 5-10 times slower, than directly writing the count+filter into the database-connection load processor.
Not every operation is pushed down automatically. Some operations cannot be offloaded to the source DB at all.
For the Database Connection Load processor, all of the SQL Select Statement is executed on the source DB as a first citizen SQL query. The results are transported to OD. So if the count is part of the SQL, it will be executed on the source DB.
If you develop the same logic with our processors (load a table via DT or Connection Load with a simple SELECT * statement, add a data filter, add a row count processor), only the filters are pushed down to the source DB. The count is executed in ONE DATA which in turn implies that the data to be counted is loaded from the source just for the counting. The filtering, however should happen on the source DB so that only data left from the filtering will get transferred.
You can have a look at the Spark execution plans in either Full Debug mode, via aforementioned explain query or the Spark UI. In the physical plans you will see the pushed filters.
Please also note, that this feature completely stems from Spark handling its data sources and query plan optimization. The support is limited and to my knowledge, consists of filters and column pruning.
Thanks for your fast help! @peter.dahlberg :
“== Physical Plan ==
*(2) HashAggregate(keys=, functions=[count(1)])
± Exchange SinglePartition
± *(1) HashAggregate(keys=, functions=[partial_count(1)])
± *(1) Project
± (1) Scan JDBCRelation(( SELECT
/+ parallel(16) */
) foo) [numPartitions=1]  PushedFilters: [*IsNotNull(VALIDFLAG), *EqualTo(VALIDFLAG,1)],
It seems, that at least the filter was pushed, but also the partial_count? Do you know from this statement?
@Flogge Thanks for your in-depth explanation, I will check this again when I can see the full query plans of the debugmode, once we have an update of ONE DATA
The partial_count is from spark so spark needs to load the data and count it, Only the Scan JDBCRelation … part is what gets applied directly to the database.
At least for Parquet spark is also is able to optmize counting but It might well be that it doesn’t support that for JDBC.
But it’s also an important point that you mentioned that the filter pushdown completely stems from the query plan optimization. This means every filter pushed down must be known to the optimizer. This especially excludes joins.
@peter.dahlberg yeah, for PARQUET it’s a bit different. Still, PARQUET operations are completely handled by Spark and its libraries. The PARQUET file format supports some stuff out of the box but still these features are utilized by Spark. There is no PARQUET-“Service” acting on its own getting predicates/functions pushed. It’s just a way of interaction.
For JDBC-based data retrieval, the “actual systems” are transparent to Spark, the work there is actually done by another system.
However, there is a lot of customization possible somewhere deep in the guts that is Spark Catalyst and Tree Resolver. I’ve scratched the surface there once; it was awesome and frightening at the same time. I’m quite sure that certain functions could be offloaded to the JDBC sources with a deep dive into the query engine(s). So far, I don’t know of any production ready lib that enhances functionality here. Someone in the need of a pet project?