Coalesce With Care – DZone Big Data

item picture

Here’s a quick Spark SQL puzzle for you; What do you think could be a problem with the following spark code (assume a spark session is perfectly configured)?

sparkSession.sql("select * from my_website_visits where post_id=317456")
.write.parquet("s3://reports/visits_report")

Hint 1: The input data (my_website_visits) is too large.

Hint2: We filter most of the data before writing.

I’m sure you’ve figured it out by now; If the input data is large, and the spark is configured in a perfect way, then my spark function has a lot of tasks. This means that writing is also done from multiple tasks.

This probably means that the output of this will be a large amount of very small parquet files.

big data input

Small files are a known problem in the world of big data. It takes a large amount of unnecessary resources to write this data, but more importantly, it takes a large number of resources to read this data (more IO, more memory, more runtime).

This is how it looks in Spark UI.

SparkUI

In this case, we have 165 tasks, which means we can have up to 165 output files. How can you improve this? Instead of writing from several workers, let’s write from one factor.

One-factor writing

How is that in spark?

Consolidation vs. Repartition

In Spark, there are two common transitions for changing the number of tasks; Merger and Repartition. They are very similar but not identical.

Repartitioning in spark SQL triggers shuffle, where merging does not occur. And as we know shuffle can be costly (note: this is true for DataFrames/DataSets). In RDDs, the behavior is slightly different).

So let’s try to use merge.

sparkSession.sql("select * from my_website_visits where post_id=317456")
.coalesce(1).write.parquet("s3://reports/visits_report")

that took 3.3 minutes To run, while only the original program took 12 seconds escape from.

Now let’s try it with repartition:

sparkSession.sql("select * from my_website_visits where post_id=317456")
.repartition(1).write.parquet("s3://reports/visits_report")

It just took 8 seconds escape from. How could it be?! Repartitioning adds shuffle, so it should be more expensive.

Let’s take a look at Spark UI. This is what it looks like when using merge.

Spark UI when using fusion

And this is when using partitioning:

Spark UI when using repartition

The reason must be clear. Using merge reduces the number of tasks for the entire stage, as well as for the part that comes before the merge call. This means that input reading and filtering were done using only one operator with one task, as opposed to 165 tasks with the original program.

On the other hand, repartitioning creates random shuffling, and this really adds to the runtime, but since the first phase is still done with 165 tasks, the total runtime is much better than merging. Does this mean merger is evil? of course not. Let’s see an example where merging is actually a better option.

limit integration

sparkSession.sql("select * from my_website_visits").limit(10)
.write.parquet("s3://reports/visits_report")

This went on for 2.1 minutes, but when using merge:

sparkSession.sql("select * from my_website_visits")
.coalesce(1).limit(10).write.parquet("s3://reports/visits_report")

It only ran for 3 seconds (!)

As you can see, integration helped a lot here. To understand why we need to understand how a given operator works. Limit actually splits the program into two phases and mixes them up. In the first stage, there is a file LocalLimit The process that is performed in each of the sections. The filtered data from each partition is then combined into a single partition where another limit operation is performed on that data. This process is defined as GlobalLimit.

This is how it looks in the SQL tab in Spark UI:

SQL tab in Spark UI

Note that the local limit and the general limit are in separate phases. Now if this data is somehow arranged, it would make more sense to implement the local limit on each partition before the global limit is implemented. But since there is absolutely no order here, this is clearly a wasteful operation, we could just take these 10 records at random from a partition, and logically it wouldn’t make a difference and would be much faster.

when using merger (1) Although it helps in two ways.

First, as we’ve seen, it sets the number of tasks to be 1 for the entire stage. Since the limit also reduces the number of tasks to 1, the extra stage and random shuffling that adds the limit will no longer be needed.

But there is another, more important reason why fusion (1) helps here. As we’ve seen, merge(1) reduces the number of tasks to 1 for the entire stage (as opposed to repartitioning which splits the stage), the local limit operation is only done on one partition instead of doing it on many partitions. This helps performance a lot.

Looking at this in Spark UI when using merge, you can clearly see that local and global boundaries are implemented at the same stage.

Spark UI when using fusion

What about partition for this case?

sparkSession.sql("select * from my_website_visits").repartition(1).limit(10)
.write.parquet("s3://reports/visits_report")

It takes 2.7 seconds. Even slower than the original function. This is how it appears in the SQL tab in the Spark UI:

SQL tab in Spark UI

We see that in this case local and global boundaries are also performed on the same stage in that single task, such as merging. So why is it slower here?

Repartitioning rather than merging does not change the number of tasks for the entire stage, but instead creates a new stage with a new number of divisions. This means that in our case it actually takes all the data from all partitions and combines them into one large partition. This of course has a huge impact on performance.

conclusion

As we have seen, merging and repartitioning can both help or harm the performance of our apps, we just need to be careful when using them.

.

Leave a Comment