How Can Advanced Logical Plan Optimization Mitigate Data Skewness in Spark?

When working with massive datasets in Spark, processing billions of rows distributed across multiple cluster nodes, handling data skewness becomes a complex task. Skewed data can lead to scalability, performance, and resource optimization challenges, making it crucial to find effective solutions.

In our previous blog, we explored the concept of data skewness in Spark and how it can impact the performance of EL+T systems. In this blog, we will understand the Spark Execution Process and how it helps mitigate skewness, with a specific focus on logical plan optimization.

Also read – Is ELT The Ultimate Replacement For ETL? (Part I)

Execution in Spark Engine

Whenever spark encounters an Action it creates a Job, leading to the generation of Stages based on transformation types. Each Stage comprises multiple Tasks, with each Task representing a unit of work executed on a single executor within the cluster.

Tasks typically handle the processing of individual data partitions, allowing for concurrent execution to process different data partitions. The workflow advances to the next stage only after all Tasks in the current stage are completed.

However, data skewness, where some partitions have significantly more data, can lead to ‘imbalanced workloads and slower processing times. To address this issue, two solutions emerge:

Solution 1: Introduces a new transformation to filter out null or empty values in the referring Service Center ID, reducing skewness. Yet, this approach may be complex and time-consuming for users, demanding resources and expertise.

Solution 2: Offers an alternative by using precomputed statistics or on-the-fly calculations to identify transformations that mitigate data skewness. It involves creating a new column, replacing null or empty values in the referring Service Center ID with random values, and updating Spark’s Unresolved Logical Plan.

Updating Spark Logical Plan

The Catalyst optimizer is a pivotal component in Apache Spark, dedicated to optimizing SQL, Dataframe, and Dataset operations. It forms the backbone of Spark SQL, integrating a rule-based optimization engine and a cost-based optimizer. 

These components collaborate to enhance queries and produce an optimized physical plan for execution on the cluster.

At its core, the Catalyst optimizer works with an unresolved logical plan, which serves as a tree-like structure representing the initial query before resolving references to tables, columns, and functions. This unresolved logical plan undergoes a series of transformation rules, resulting in an improved version.

The improved version of the logical plan is then handed over to the analyzer, which resolves all references and validates the query’s integrity. Furthermore, it is instrumental in generating a detailed physical plan, outlining the execution strategy and tasks required for query execution on the cluster. 

The physical plan is subsequently executed on the cluster using Spark’s distributed computing engine.

Updating the Unresolved Logical Plan 

The Catalyst optimizer in Spark crafts an Unresolved Logical Plan in the form of a tree structure after interpreting a user’s query or DataFrame transformation, also checking if the query is semantically correct or not. 

This tree-like structure symbolizes Spark’s comprehension of the user’s intent, pinpointing the targeted tables, chosen columns, and any specified filters or joins.

However, it’s not yet a fully-formed physical execution plan.

This Unresolved Logical Plan serves as a crucial intermediary, facilitating further optimizations and transformations in subsequent stages before arriving at a physical execution plan. 

Manipulating the Unresolved Logical Plan empowers Spark to apply various optimization techniques like predicate pushdown, join reordering, and projection pruning; these tasks are not related to Unresolved logical plan.

In Unresolved logical plan we update the AST tree based on some statistics and rules, like a skewed column is replaced with non skewed column.

We’ll employ this logical plan to optimize Window functions and eliminate partition skewness.

Window expression: A window expression tree in Apache Spark represents a set of window functions applied to a DataFrame or a query. It is a logical diagram that shows the relationship between the various components of a window expression.

Window expressions basically have these components:

  1. windowFunction: Expression – can be a simple arithmetic operation, a function call, or a combination of these. It is used extensively in data transformations and aggregations.
  2. windowSpec: (WindowSpecDefinition)- It defines the partitioning, ordering, and frame boundaries for a window function that will be applied to the DataFrame. A WindowSpecDefinition is created using these components:
    • partitionSpec: Seq[Expression] – It defines the way that input rows are partitioned.
    • orderSpec: Seq[SortOrder] – It defines the ordering of rows in a partition.
    • frameSpecification: WindowFrame – It defines the window frame in a partition.

Unresolved logical plan for window expression SQL

Query – “SELECT FIRST_VALUE(State) OVER (PARTITION BY Referring_Service_center_ID order by service_date DESC ) AS State FROM car_owner”

Project [FIRST_VALUE(State#1L) windowspecdefinition(Referring_Service_center_ID#0)]

+- UnresolvedRelation `car_owner`

In this plan, the Project operator represents the projection of the FIRST_VALUE(State) window function over the Referring_Service_center_ID column. The windowspecdefinition method defines the window specification for the first value function, which partitions the data by the Referring_Service_center_ID column.

Updating the window expression:In Apache Spark, updating the unresolved logical plan involves applying a series of rule-based transformations to the plan in order to generate a more optimized plan. Below are the steps to update the logical plan:

Step 1 – Parse the user’s query and generate an unresolved logical plan using Spark’s Catalyst framework.

ParserInterface parserInterface = sparkSession.sqlContext().sessionState().sqlParser();
LogicalPlan logicalPlan = parserInterface.parsePlan(sql);

Step 2- Updating partition column the window functions- In this step transformExpressions method of a logical plan is used to apply a user-defined function to all expressions in the plan, recursively.

The transformExpressions method takes a function that maps expressions to expressions, and returns a new logical plan with the mapped expressions. The function is applied recursively to all expressions in the plan, including those in child plans. The method also preserves the structure of the plan, including any child plans and operators.

As mentioned above in this article, the WindowSpecDefinition contains the window partition columns so we need to modify only the WindowSpecDefinition expression and the rest will be unchanged. 

The most common approach is to use the pattern matching functions for WindowSpecDefinition that finds and replaces subtrees with a specific structure. 

Here is an example:

Here we have updated partitioned column Referring_Service_center_ID with updated_Referring_Service_center_ID

Note – This new column (updated_Referring_Service_center_ID) is projected before generating an unresolved logical plan and all null values are replaced with some random value between (0 to 100). 

Step 3 – Convert the Unresolved logical plan to a strongly-typed Dataset using Dataset.ofRows

QueryPlanningTracker queryPlanningTracker = new QueryPlanningTracker();
logicalPlan =
Dataset<Row> rowDataset = Dataset.ofRows(sparkSession, logicalPlan, queryPlanningTracker);

Step 4 – Fix null value transformation 

As we replaced the null value with some random value, this window function will execute the lastValue function on each value and return a different value for corresponding random value, but in the original query it was returning a single value for null partition.

To fix this, first extract the last row by filtering reffering_Referring_Service_center_ID on null value and then replace this value in the corresponding column.

Select lastValue(sate) from test where reffering_Referring_Service_center_ID is null;

Conclusion

Data skewness can significantly impact the performance and cost-effectiveness of Spark-based EL+T systems. By applying rule-based Unresolved Logical Plan Optimization and innovative skewness mitigation strategies, organizations can improve resource utilization, enhance scalability, and ensure efficient data processing without burdening end users with the need for specialized knowledge.

Logical plan optimization empowers Spark to handle larger datasets and complex computations while minimizing the environmental impact. It offers a more efficient way to harness the capabilities of big data without requiring in-depth knowledge of the system’s inner workings.

Incorporating these advanced techniques into your Spark workflows can lead to more efficient, cost-effective, and environmentally friendly data processing.

Also read – What’s Next For ELT? (Part II)

Scroll to Top