Is there a way to find out which number of partitions is optimal for my data? Any rules of thumb? Can I see it in the Spark UI?
Background:
I created a dataset by appending a small amount of rows (~ 30 - 100) very often to my data (currently 1000 times, but more to come). The WFs which should use this dataset are extremely slow (or hit spark.driver.maxResultSize property), which is solved, if I use the Manipulate Partitions processor. But as I must run these following Workflows also more than 1000 times, I would like to have the lowest performance as possible.
Currently, I am playing around with different numbers and look at the WF execution time. But this method is not really reliable, as many other people are working on the instance.
There are different parameters to take into consideration.
Before executing the job:
You need to be sure that the size of the files that you want to read are <= HdfsDataBlockSize (default 128 mb) for each file
During the execution of the spark job:
you need to know how many cores are available in your cluster and repartition based on that number.
On a spark level spark.sql.shuffle.partitions default value is 200, this need to match the #cores of usable cores on that instance. It is always dependent on the data that you use and the operations that you do. So it is also ok if the number of partitions is higher than the number of available cores. You can test your WF with a couple of values and pick the one that make your WF faster.
When writing the output of your WF try to respect the first part of this answer.
A rough estimation of the number of partitions is:
If you have enough cores to read all your data in parallel, (i.e. at least one core for every HdfsDataBlockSize of your data)
AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores)
NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize
If you don’t have enough cores ,
AveragePartitionSize ≈ HdfsDataBlockSize
NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize
I hope that it helps
PS: hopefully an OD expert can add more tipps in the context of OD
First of all: @anon1396911 your rules of thumb rock. Not much more to be added here.
On the OD-Part of the story: @kai.geukes, ONE DATA already has you somewhat covered. The “Manipulate Partitions” processor has a “dynamic” default for target partitions, i.e. the suggested value when opening the processor configuration is in fact the amount of worker cores available on your instance. I’m not totally sure how the new feature of having multiple SparkContexts at your disposal might interfere with this defaulting but for the time being, I can suggest either keeping the amount if you see it fit for your data size (rules of thumb above) or take multiples of it.
Please also note, that in order to increase partition count, you will have to use “repartition” (boolean toggle in the processor). The same is recommended, when your initial partitioning is skew (e.g. after a Join with unbalanced results). When there is a common usage based on keys (e.g. always the same group by operation in later workflows), it can also be beneficial to the performance to select a key to partition by (only with repartition enabled). If you just want your partitions to be “evenly” populated, just leave this configuration empty.
FYI: Repartitioning is a rather costly operation (since it shuffles data around a cluster) so you should perform it before saving the data and not upon loading it.