Understanding DataFilters in PySpark: A Comprehensive Guide to Filters in Spark Pipeline

Understanding DataFilters in PySpark

When working with large datasets, especially those stored in formats like Parquet, it’s essential to understand how filtering works at different levels of the Spark pipeline. One critical component to grasp is DataFilter, which is often confused with or overlooked in favor of more prominent concepts like PartitionFilter and PushedFilter. In this article, we’ll delve into what DataFilter means and how it differs from these other types of filters.

What are Filters in Spark?

Filters are a crucial part of the Spark pipeline. They allow us to select specific data based on certain conditions, which are expressed using predicates. These predicates can be anything from simple equality checks (a == b) to more complex comparisons (e.g., a > 10).

Predicates are used to describe what we’re looking for in our data. When Spark encounters a filter, it must decide how best to apply this condition to the data without having access to the actual data itself.

Understanding PartitionFilters

One type of filter is the PartitionFilter. These filters operate on partition columns, which are columns that are used to partition the data into smaller chunks (often based on directories in Parquet files). By applying a filter to these columns, we can quickly exclude parts of our dataset from further processing.

For example, let’s consider a scenario where we’re working with a large Parquet file that contains information about products. We might want to only process rows where the product ID is less than 10.

spark.read.parquet("./datafilter.parquet").filter(col("colA") < 10).explain

== Physical Plan ==
*(1) Filter (isnotnull(colA#167) AND (colA#167 < 10))
+- *(1) ColumnarToRow
   +- FileScan parquet [colB#165,colC#166,colA#167] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [isnotnull(colA#167), (colA#167 < 10)], PushedFilters: [], ReadSchema: struct<colB:int,colC:int>

In this case, the filter operates on the partition column colA, allowing us to quickly exclude rows based on their product ID.

Understanding PushedFilters

Another type of filter is the PushedFilter. These filters are similar to PartitionFilter but apply to non-partition columns. Unlike PartitionFilter, which uses metadata stored in the Parquet file, PushedFilter operates directly against the data itself.

For example, let’s modify our previous scenario to include a filter on column colB.

spark.read.parquet("./datafilter.parquet").filter(col("colB") < 10).explain

== Physical Plan ==
*(1) Filter ((isnotnull(colB#179) AND isnotnull(colC#180)) AND (colB#179 < colC#180))
+- *(1) ColumnarToRow
   +- FileScan parquet [colB#179,colC#180,colA#181] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), LessThan(colB,10)], ReadSchema: struct<colB:int,colC:int>

In this case, the filter applies directly to column colB, using its predicate to exclude rows based on whether their value is less than 10.

Understanding Non PushedFilters

Finally, there’s a third type of filter known as Non PushedFilter or simply “non-pushed filter.” This category includes filters that can’t be pushed down to the metadata of the Parquet file. These are often more complex conditions that require accessing the full data set.

Using our previous example but now applying a condition on both columns colB and colC, we see:

spark.read.parquet("./datafilter.parquet").filter(col("colB") < col("colC")).explain

== Physical Plan ==
*(1) Filter ((isnotnull(colB#179) AND isnotnull(colC#180)) AND (colB#179 < colC#180))
+- *(1) ColumnarToRow
   +- FileScan parquet [colB#179,colC#180,colA#181] Batched: true, DataFilters: [isnotnull(colB#179), isnotnull(colC#180), (colB#179 < colC#180)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), IsNotNull(colC)], ReadSchema: struct<colB:int,colC:int>

In this scenario, the filter cannot be pushed down to the metadata of the Parquet file. As a result, we need to process all rows first and then apply the filter in memory.

DataFilters

Now that we’ve explored PartitionFilter, PushedFilter, and Non PushedFilter, let’s turn our attention to DataFilter.

DataFilter is essentially synonymous with PushedFilter. While they might seem like different concepts at first glance, both refer to the ability to push down predicates to filter data based on its metadata. However, there’s an important distinction: a PushedFilter must be able to be applied directly to the file metadata without requiring access to the actual data.

To clarify:

  • A PartitionFilter applies to partition columns.
  • A DataFilter (or PushedFilter) applies to non-partition columns and can be pushed down to filter on the underlying file’s metadata.
  • A Non PushedFilter cannot be pushed down in this manner and requires reading all data first.

Example

To illustrate these concepts, let’s create a simple example that demonstrates how we use filters to process a Parquet dataset. We start by importing necessary libraries and defining our sample data.

import org.apache.spark.sql.functions.col

val df = Seq(
  (1,2,3),
  (2,2,3),
  (3,20,300),
  (1,24,299),
).toDF("colA", "colB", "colC")

df.write.partitionBy("colA").mode("overwrite").parquet("datafilter.parquet")

Now that we’ve created our sample Parquet file and defined our data, let’s examine how Spark processes it when applying different types of filters.

PartitionFilter Example

We’ll start by filtering on a partition column (colA), as shown in this example:

spark.read.parquet("./datafilter.parquet").filter(col("colA") < 10).explain

== Physical Plan ==
*(1) Filter (isnotnull(colA#167) AND (colA#167 < 10))
+- *(1) ColumnarToRow
   +- FileScan parquet [colB#165,colC#166,colA#167] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [isnotnull(colA#167), (colA#167 < 10)], PushedFilters: [], ReadSchema: struct<colB:int,colC:int>

Here, the filter is applied directly to colA, utilizing its metadata for quick filtering.

PushedFilter Example

Let’s modify our previous example by applying a filter on column colB. The result:

spark.read.parquet("./datafilter.parquet").filter(col("colB") < 10).explain

== Physical Plan ==
*(1) Filter ((isnotnull(colB#179) AND isnotnull(colC#180)) AND (colB#179 < colC#180))
+- *(1) ColumnarToRow
   +- FileScan parquet [colB#179,colC#180,colA#181] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), LessThan(colB,10)], ReadSchema: struct<colB:int,colC:int>

In this case, the filter is also applied directly to colB, utilizing its metadata for quick filtering.

Non PushedFilter Example

Finally, let’s apply a more complex condition on columns colB and colC. The result:

spark.read.parquet("./datafilter.parquet").filter(col("colB") < col("colC")).explain

== Physical Plan ==
*(1) Filter ((isnotnull(colB#179) AND isnotnull(colC#180)) AND (colB#179 < colC#180))
+- *(1) ColumnarToRow
   +- FileScan parquet [colB#179,colC#180,colA#181] Batched: true, DataFilters: [isnotnull(colB#179), isnotnull(colC#180), (colB#179 < colC#180)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), IsNotNull(colC)], ReadSchema: struct<colB:int,colC:int>

Here, the filter cannot be pushed down to the metadata of the Parquet file. As a result, we need to process all rows first and then apply the filter in memory.

Conclusion

In this article, we’ve explored DataFilter, one of several key components that help Spark efficiently process large datasets stored in formats like Parquet. By understanding how filters work at different levels of the Spark pipeline — including PartitionFilter, PushedFilter, and Non PushedFilter — you can better tailor your processing to meet specific performance needs for your datasets.

Whether using filters based on metadata or directly applying conditions to the data, choosing the right approach depends heavily on the characteristics of your dataset.


Last modified on 2025-01-02