Apache Spark, open-sourced in 2013, remains one of the most popular and powerful compute engines. Its popularity is in large part due to its deep configurability, supporting broad and complex data processing workloads across many of the world’s largest organizations.
Operating a powerful tool like Spark, however, comes with its challenges. Many teams I’ve met have encountered difficulties handling issues such as downscaling, data skew, and out of memory errors. Such challenges are universal to Spark, even when running on managed compute platforms such as Amazon EMR, DataProc, and Databricks.
In this blog, we will review tips to avoid common pitfalls that you’ll likely encounter when operating Spark. We’ll recommend best practices using lessons learned from managing large-scale Spark pipelines for our customers at Onehouse.
Here’s a quick overview of what we will cover:
The foundational step in setting up your Spark cluster is to determine how much compute, memory, and storage you will assign to your driver and executors. Workloads can be bottlenecked by different factors, including memory, storage, and network latency. It's important to get your foundations right and overcome the bottlenecks for your specific workloads. Let’s briefly cover each of the key components.
CPU
CPUs (or cores) determine the level of parallelism in your Spark application. Each task in Spark occupies one core, so you should ensure that cores are proportional to the number of tasks and the parallelism of your workload. Aim for enough CPUs to match your workload’s parallelism – typically 2-5 cores per executor for general workloads – and consider using executors with fewer cores if you plan to shuffle data frequently. Typically, memory scales with the number of cores per machine.
On-Heap Memory
By default, Spark only uses on-heap memory. On-heap memory is used for tasks such as processing, caching resilient distributed datasets (RDDs), and executing transformations. It’s important to provide enough memory to hold the data you are processing; otherwise, you can run into out of memory (OOM) errors or require additional shuffles which can be inefficient. On the other hand, using machines with too much memory can lead to long pauses when garbage collection is performed (more on garbage collection later). Generally, it’s best to avoid using large heap sizes (>32 GB), and instead split the data across multiple smaller executors.
Off-Heap
Use off-heap memory to bypass Java garbage collection, which can help reduce overhead and improve performance for memory-intensive workloads This is useful when you want to cache data that will be reused, such as with large, static datasets.
Enable off-heap memory with spark.memory.offHeap.enabled
. Be careful, as memory is not unlimited – off-heap memory reduces the on-heap memory available for data processing. A general best practice is to set off-heap memory to use 10% of executor memory with spark.executor.memoryOverhead
.
Storage
One of Spark’s strengths in handling big data is its ability to spill intermediate data to disk when memory is insufficient. By default, Spark uses the local disk of its worker nodes to store spilled data, but you can configure it to use high-performance block storage (such as AWS EBS or GCP Hyperdisk) for better performance. This feature allows for cost-efficient processing by avoiding the need to keep all data in memory.
For instance, when handling large transformation workloads for a customer, we ran into OOM errors as the data exceeded available memory. Instead of scaling the cluster up, we opted to configure Spark to use AWS EBS to store Spark’s intermediate results on disk. This solution resolved the OOM issue and provided significant cost savings compared to provisioning larger memory instances.
However, it’s important to consider the tradeoffs when configuring memory and storage for the Spark cluster to balance performance and cost efficiency. Writing and reading data to/from disk is significantly slower than in-memory operations. Spilling to disk also incurs the overhead of serializing and deserializing the data, placing extra load on both CPU and disk I/O.
As a distributed execution engine, Spark frequently moves data between multiple machines, incurring higher compute costs as larger data volumes take longer to move across the network. Data is stored in memory for fast computation, but memory is expensive and limited on each machine. Given the cost of moving data over the network and storing the data in memory, it’s important to reduce your data footprint as much as possible.
Choose the Right Serialization
One way to shrink your data is through serialization. Serialization in Spark converts your data into a stream of bytes for movement across the network and storage in a memory buffer, before the data is deserialized for computation. By default, Spark uses Java serialization for moving data between nodes.
A popular optimization is switching to Kryo serialization, which is up to 10x faster and more compact than Java serialization. A key tradeoff is that Kryo requires you to register your classes (unlike Java serialization), which adds configuration overhead and can cause problems with backward compatibility. To enable Kryo serialization in Spark, set the following configuration:
You may also explore newer serialization frameworks such as Apache Fury (incubating), which aims to offer further performance improvements.
Optimize Your Data Structures
An important way to minimize your data footprint is by choosing data structures with smaller memory overhead and fewer objects. Follow these tips to optimize your data structures:
In addition to reducing the memory footprint, optimizing your data structures can significantly reduce the cost of garbage collection (which we will cover in the next tip).
Each node in a Spark cluster runs a Java virtual machine (JVM), and therefore uses Garbage Collection (GC) to clean up unused objects from memory. GC is crucial for freeing up memory and preventing memory leaks, but without proper tuning, can delay or hang Spark jobs.
Choosing the Right GC Algorithm
Selecting the right GC algorithm is key to getting good performance. Here’s a comparison of common algorithms:
If you require ultra-low GC pause times (such as for high-frequency trading), it’s worth exploring other specialized GC algorithms, such as Shenandoah and ZGC.
Case Study: Switching from G1GC to ParNew GC
Recently, the Onehouse team worked with a large customer processing workloads with many short-lived objects. Initially, we used G1GC, which balanced throughput and pause times across the heap. However, this led to higher GC latency and increased task execution times.
The customer’s workload involved many short-lived objects, so we needed a faster and more predictable way to handle young generation garbage collection. Switching to ParNew GC allowed us to optimize young generation collections with lower pause times and reduced latency. While ParNew GC may increase the frequency of major collections, in this case, the improvements in young generation GC performance outweighed the additional overhead, leading to better performance overall.
GC tuning is an iterative process that depends heavily on workload characteristics. It’s important to monitor the performance using Spark UI and JVM metrics to refine your configurations over time.
One of Spark’s superpowers is processing data in parallel. To achieve this, Spark partitions data into smaller chunks, enabling each chunk to fit on a smaller machine and for the chunks to be processed concurrently. The key to efficient partitioning is to prevent data skew. When Spark partitions are skewed, you are more likely to encounter OOM errors and small files, which hurt performance.
The .coalesce()
and .repartition()
methods are effective tools that optimize and balance your partitions. Each of these functions slightly differently:
Another effective tool is dynamic partition pruning, which prunes unnecessary partitions based on runtime filters to reduce disk reads and improve query performance. This optimization can enhance performance if you are running queries with filtering. During query execution, Spark identifies relevant partitions and avoids reading unnecessary ones, reducing disk I/O and improving performance.
Enable dynamic partition pruning with the Spark config:
Joining tables is one of the most common yet computationally intensive operations in Spark. Optimizing your joins can have a huge impact on your overall performance and compute costs.
Leverage Adaptive Query Execution (AQE)
The biggest optimization Spark offers for joins is Adaptive Query Execution (AQE). AQE automatically collects statistics and uses them to adapt query execution plans at runtime for optimal performance. Enable AQE with spark.sql.adaptive.enabled=true
. AQE provides the following optimizations for joins:
spark.sql.adaptive.skewJoin.enabled=true
.You can also leverage AQE for optimizations beyond joins, such as adaptive repartitioning and removal of unused sorts. Read more in the official AQE docs.
In addition to AQE, follow these best practices to optimize joins:
.cache()
on the dataframe to cache data that fits in-memory.persist(StorageLevel.DISK_ONLY)
for larger datasets that require you to persist data on diskAfter applying these optimizations, it’s important to validate the impact using Spark UI.
The dynamic allocation feature in Spark can help you efficiently handle unpredictable workloads by dynamically scaling resources based on runtime demands. While it can optimize resource utilization, it may not be worth the complexity for static and predictable workloads.
To handle changing workloads, dynamic allocation assigns resources during runtime to enable better resource utilization based on the actual workloads. Dynamic allocation adds and removes executor nodes on-the-fly to efficiently handle spikes and drops in your workloads.
Downscaling relies on several configurations in tandem, which require tuning in order to work optimally. Below are key configurations:
spark.dynamicAllocation.enabled=true
: Enables dynamic allocation, which is disabled by default.spark.dynamicAllocation.executorIdleTimeout
: Controls how long an idle executor can remain before being removed. Increase this if executors are being removed too frequently, causing inefficient recomputation.spark.dynamicAllocation.cachedIdleTimeout
: Controls how long an executor with cached RDD blocks can remain idle before it is deallocated. Set this higher if your workload heavily uses cached RDDs to prevent premature deallocation and avoid unnecessarily recomputing the cached data.spark.dynamicAllocation.shuffleActiveTimeout
: Defines how long an idle executor without cached data or active tasks can remain before being removed. Increase this value if your workload involves long shuffle stages or intermediate operations, to ensure executors are not deallocated too soon.spark.decommission.enabled=true
: Decommissioning is a feature in Spark to gracefully shutdown executors by migrating cached RDDs and shuffled data to another active executor. This helps avoid recomputations. Note that this is a newer functionality of Spark (available in version 3.1 and newer), and may have its quirks.By properly tuning these dynamic allocation configurations, you can achieve significant performance gains and optimize resource utilization for your unpredictable workloads.
Spark can be challenging to operate, as workloads may bottleneck on several different factors including compute, memory, I/O, and even network bandwidth. The tips above provide a solid foundation for debugging and optimizing your Spark jobs. From there, you should continuously experiment to uncover the best configuration combinations for each of your workloads.
At Onehouse, we are always seeking to learn and deploy the latest optimizations in Spark to supercharge our customers’ workloads. If you’re using Spark to run your data lakehouse with Apache Hudi, Apache Iceberg, or Delta Lake, we would love to exchange learnings.
Reach out at gtm@onehouse.ai to set up a free consultation and learn how Onehouse can help you run your data pipelines reliably and efficiently.
Be the first to read new posts