December 19, 2024

Top 5 tips for scaling Apache Spark

Top 5 tips for scaling Apache Spark

Apache Spark, open-sourced in 2013, remains one of the most popular and powerful compute engines. Its popularity is in large part due to its deep configurability, supporting broad and complex data processing workloads across many of the world’s largest organizations. 

Operating a powerful tool like Spark, however, comes with its challenges. Many teams I’ve met have encountered difficulties handling issues such as downscaling, data skew, and out of memory errors. Such challenges are universal to Spark, even when running on managed compute platforms such as Amazon EMR, DataProc, and Databricks.

In this blog, we will review tips to avoid common pitfalls that you’ll likely encounter when operating Spark. We’ll recommend best practices using lessons learned from managing large-scale Spark pipelines for our customers at Onehouse.

Here’s a quick overview of what we will cover:

Tip #1: Setting up Memory and Storage for the Cluster

The foundational step in setting up your Spark cluster is to determine how much compute, memory, and storage you will assign to your driver and executors. Workloads can be bottlenecked by different factors, including memory, storage, and network latency. It's important to get your foundations right and overcome the bottlenecks for your specific workloads. Let’s briefly cover each of the key components.

CPU

CPUs (or cores) determine the level of parallelism in your Spark application. Each task in Spark occupies one core, so you should ensure that cores are proportional to the number of tasks and the parallelism of your workload. Aim for enough CPUs to match your workload’s parallelism – typically 2-5 cores per executor for general workloads – and consider using executors with fewer cores if you plan to shuffle data frequently. Typically, memory scales with the number of cores per machine.  

On-Heap Memory

By default, Spark only uses on-heap memory. On-heap memory is used for tasks such as processing, caching resilient distributed datasets (RDDs), and executing transformations. It’s important to provide enough memory to hold the data you are processing; otherwise, you can run into out of memory (OOM) errors or require additional shuffles which can be inefficient. On the other hand, using machines with too much memory can lead to long pauses when garbage collection is performed (more on garbage collection later). Generally, it’s best to avoid using large heap sizes (>32 GB), and instead split the data across multiple smaller executors.

Off-Heap

Use off-heap memory to bypass Java garbage collection, which can help reduce overhead and improve performance for memory-intensive workloads This is useful when you want to cache data that will be reused, such as with large, static datasets.

Enable off-heap memory with spark.memory.offHeap.enabled. Be careful, as memory is not unlimited – off-heap memory reduces the on-heap memory available for data processing. A general best practice is to set off-heap memory to use 10% of executor memory with spark.executor.memoryOverhead.

Storage

One of Spark’s strengths in handling big data is its ability to spill intermediate data to disk when memory is insufficient. By default, Spark uses the local disk of its worker nodes to store spilled data, but you can configure it to use high-performance block storage (such as AWS EBS or GCP Hyperdisk) for better performance. This feature allows for cost-efficient processing by avoiding the need to keep all data in memory. 

For instance, when handling large transformation workloads for a customer, we ran into OOM errors as the data exceeded available memory. Instead of scaling the cluster up, we opted to configure Spark to use AWS EBS to store Spark’s intermediate results on disk. This solution resolved the OOM issue and provided significant cost savings compared to provisioning larger memory instances.

However, it’s important to consider the tradeoffs when configuring memory and storage for the Spark cluster to balance performance and cost efficiency. Writing and reading data to/from disk is significantly slower than in-memory operations. Spilling to disk also incurs the overhead of serializing and deserializing the data, placing extra load on both CPU and disk I/O. 

Tip #2: Serialization & Data Structures

As a distributed execution engine, Spark frequently moves data between multiple machines, incurring higher compute costs as larger data volumes take longer to move across the network. Data is stored in memory for fast computation, but memory is expensive and limited on each machine. Given the cost of moving data over the network and storing the data in memory, it’s important to reduce your data footprint as much as possible.

Choose the Right Serialization

One way to shrink your data is through serialization. Serialization in Spark converts your data into a stream of bytes for movement across the network and storage in a memory buffer, before the data is deserialized for computation. By default, Spark uses Java serialization for moving data between nodes.

A popular optimization is switching to Kryo serialization, which is up to 10x faster and more compact than Java serialization. A key tradeoff is that Kryo requires you to register your classes (unlike Java serialization), which adds configuration overhead and can cause problems with backward compatibility. To enable Kryo serialization in Spark, set the following configuration:

spark.serializer=org.apache.spark.serializer.KryoSerializer

You may also explore newer serialization frameworks such as Apache Fury (incubating), which aims to offer further performance improvements.

Optimize Your Data Structures

An important way to minimize your data footprint is by choosing data structures with smaller memory overhead and fewer objects. Follow these tips to optimize your data structures:

  • Use primitive types instead of standard Java and Scala classes (like HashMap), which add memory overhead with metadata and object references.
  • Flatten nested structures to avoid heavy nesting, which may contain many small objects and pointers.
  • Replace strings with enums for fixed value sets and numeric values for identifiers.

In addition to reducing the memory footprint, optimizing your data structures can significantly reduce the cost of garbage collection (which we will cover in the next tip).

Tip #3: Garbage Collection

Each node in a Spark cluster runs a Java virtual machine (JVM), and therefore uses Garbage Collection (GC) to clean up unused objects from memory. GC is crucial for freeing up memory and preventing memory leaks, but without proper tuning, can delay or hang Spark jobs.

Choosing the Right GC Algorithm

Selecting the right GC algorithm is key to getting good performance. Here’s a comparison of common algorithms:

GC Algorithm How it Works Best For Drawbacks
Garbage First (G1GC) Divides the heap into regions and prioritizes garbage collection in regions with the most reclaimable memory Most workloads; balances throughput and predictable pauses Requires tuning for very large heaps (>32GB)
Parallel GC High-throughput, multi-threaded GC; focused on minimizing GC overhead relative to application execution time Batch-heavy workloads Longer pause times for large heaps
Concurrent Mark-Sweep (CMS) Concurrently collects garbage to reduce pause times Low-latency workloads Deprecated; issues with memory fragmentation
ParNew GC Multi-threaded young generation collector, often paired with CMS for low pauses Low-latency workloads requiring shorter young generation GC times May lead to frequent major collections; reliant on CMS, which is deprecated

If you require ultra-low GC pause times (such as for high-frequency trading), it’s worth exploring other specialized GC algorithms, such as Shenandoah and ZGC.

Case Study: Switching from G1GC to ParNew GC

Recently, the Onehouse team worked with a large customer processing workloads with many short-lived objects. Initially, we used G1GC, which balanced throughput and pause times across the heap. However, this led to higher GC latency and increased task execution times. 

The customer’s workload involved many short-lived objects, so we needed a faster and more predictable way to handle young generation garbage collection. Switching to ParNew GC allowed us to optimize young generation collections with lower pause times and reduced latency. While ParNew GC may increase the frequency of major collections, in this case, the improvements in young generation GC performance outweighed the additional overhead, leading to better performance overall.

GC tuning is an iterative process that depends heavily on workload characteristics. It’s important to monitor the performance using Spark UI and JVM metrics to refine your configurations over time.

Tip #4: Parallelism and Partitioning

One of Spark’s superpowers is processing data in parallel. To achieve this, Spark partitions data into smaller chunks, enabling each chunk to fit on a smaller machine and for the chunks to be processed concurrently. The key to efficient partitioning is to prevent data skew. When Spark partitions are skewed, you are more likely to encounter OOM errors and small files, which hurt performance.

The .coalesce() and .repartition() methods are effective tools that optimize and balance your partitions. Each of these functions slightly differently:

  • Use coalesce() to decrease the number of partitions without reshuffling. This is less expensive, and useful when data is already evenly distributed across partitions. Jobs can then run more efficiently without the overhead of reading many partitions.
  • Use repartition() to distribute data evenly across partitions with reshuffling. This is more expensive due to the shuffle, but can greatly improve performance of imbalanced partitions. This can be used to increase or decrease the number of partitions.

Another effective tool is dynamic partition pruning, which prunes unnecessary partitions based on runtime filters to reduce disk reads and improve query performance. This optimization can enhance performance if you are running queries with filtering. During query execution, Spark identifies relevant partitions and avoids reading unnecessary ones, reducing disk I/O and improving performance.

Enable dynamic partition pruning with the Spark config:

spark.sql.dynamicPartitionPruning.enabled=true

Tip #5: Optimizing Joins

Joining tables is one of the most common yet computationally intensive operations in Spark. Optimizing your joins can have a huge impact on your overall performance and compute costs.

Leverage Adaptive Query Execution (AQE)

The biggest optimization Spark offers for joins is Adaptive Query Execution (AQE). AQE automatically collects statistics and uses them to adapt query execution plans at runtime for optimal performance. Enable AQE with spark.sql.adaptive.enabled=true. AQE provides the following optimizations for joins:

  • Broadcast Joins: Broadcast joins put the DataFrame on every executor, which helps avoid shuffling data and speed up execution. This is particularly effective when joining a large DataFrame with a small one. AQE can automatically convert joins to broadcast joins if it detects that one side of the join is small enough to fit in memory. Enable this with:
spark.sql.adaptive.localShuffleReader.enabled=true # Set conversion threshold based on your executor memory spark.sql.adaptive.maxBroadcastJoinThreshold=<size>
  • Skewed Join Handling:  Data skew can cause performance bottlenecks when a few keys dominate partitions. AQE addresses skew by redistributing skewed keys more evenly or avoiding shuffles for those keys. Enable this with spark.sql.adaptive.skewJoin.enabled=true.

You can also leverage AQE for optimizations beyond joins, such as adaptive repartitioning and removal of unused sorts. Read more in the official AQE docs.

In addition to AQE, follow these best practices to optimize joins:

  • Keep the largest DataFrame at the left of your join. This can help reduce shuffles and improve efficiency.
  • Use the same partitioner (i.e. repartition using the same key) between DataFrames when joining to avoid re-shuffling data.
df1 = df1.repartition("join_key") df2 = df2.repartition("join_key") joined_df = df1.join(df2, "join_key")
  • Cache frequently used datasets in memory or disk to avoid recomputation during joins:some text
    • Use .cache() on the dataframe to cache data that fits in-memory
    • Use .persist(StorageLevel.DISK_ONLY) for larger datasets that require you to persist data on disk

After applying these optimizations, it’s important to validate the impact using Spark UI.

[Bonus] Tip #6: Dynamic Allocation

The dynamic allocation feature in Spark can help you efficiently handle unpredictable workloads by dynamically scaling resources based on runtime demands. While it can optimize resource utilization, it may not be worth the complexity for static and predictable workloads.

To handle changing workloads, dynamic allocation assigns resources during runtime to enable better resource utilization based on the actual workloads. Dynamic allocation adds and removes executor nodes on-the-fly to efficiently handle spikes and drops in your workloads.

Downscaling relies on several configurations in tandem, which require tuning in order to work optimally. Below are key configurations:

  • spark.dynamicAllocation.enabled=true: Enables dynamic allocation, which is disabled by default.
  • spark.dynamicAllocation.executorIdleTimeout: Controls how long an idle executor can remain before being removed. Increase this if executors are being removed too frequently, causing inefficient recomputation.
  • spark.dynamicAllocation.cachedIdleTimeout: Controls how long an executor with cached RDD blocks can remain idle before it is deallocated. Set this higher if your workload heavily uses cached RDDs to prevent premature deallocation and avoid unnecessarily recomputing the cached data.
  • spark.dynamicAllocation.shuffleActiveTimeout: Defines how long an idle executor without cached data or active tasks can remain before being removed. Increase this value if your workload involves long shuffle stages or intermediate operations, to ensure executors are not deallocated too soon.
  • spark.decommission.enabled=true: Decommissioning is a feature in Spark to gracefully shutdown executors by migrating cached RDDs and shuffled data to another active executor. This helps avoid recomputations. Note that this is a newer functionality of Spark (available in version 3.1 and newer), and may have its quirks.

By properly tuning these dynamic allocation configurations, you can achieve significant performance gains and optimize resource utilization for your unpredictable workloads.

Conclusion

Spark can be challenging to operate, as workloads may bottleneck on several different factors including compute, memory, I/O, and even network bandwidth. The tips above provide a solid foundation for debugging and optimizing your Spark jobs. From there, you should continuously experiment to uncover the best configuration combinations for each of your workloads.

At Onehouse, we are always seeking to learn and deploy the latest optimizations in Spark to supercharge our customers’ workloads. If you’re using Spark to run your data lakehouse with Apache Hudi, Apache Iceberg, or Delta Lake, we would love to exchange learnings.

Reach out at gtm@onehouse.ai to set up a free consultation and learn how Onehouse can help you run your data pipelines reliably and efficiently.

Authors
Profile Picture of Andy Walner, Product Manager
Andy Walner
Product Manager

Andy is a Product Manager at Onehouse, designing the next-generation data platform to power analytics & AI. Before Onehouse, Andy developed ads and MLOps products at Google, and served as the Founding Product Manager for an AI startup backed by Y Combinator. He previously graduated from University of Michigan with a degree in Computer Science & Engineering.

Read More:

Subscribe to the Blog

Be the first to read new posts

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
We are hiring diverse, world-class talent — join us in building the future