November 7, 2024

How to Optimize Performance for Your Open Data Lakehouse

How to Optimize Performance for Your Open Data Lakehouse

The data lakehouse architecture, powered by open table formats such as Apache Hudi, Apache Iceberg, and Delta Lake, has enabled an open and cost-effective way to manage the growing data and analytical needs of organizations. It offers the reliability of running concurrent transactions on the same data store, increasing efficiency. The data lakehouse supports critical capabilities such as ACID transactions, schema evolution, time travel, and incremental/CDC-style queries, previously unavailable on the data lake. 

While adopting the lakehouse architecture provides these tangible benefits, it is important to recognize that this is only the first step in the journey. As more data is ingested into storage, whether it’s a cloud object store such as Amazon S3, GCS, or Azure Blob, or an on-premises system, critical considerations must be made regarding the optimal management of data files in the lakehouse. Running queries against hundreds of terabytes, or even petabytes, of data requires ongoing optimization to maintain performance. 

Although your queries may be running efficiently today, that may not always be the case in the future. Without proper optimizations, as data grows and more files accumulate in the storage, performance can degrade. Typically, you may start encountering these common issues:

  • Slow queries due to unorganized or small files: Without regular maintenance, compute engines may end up scanning numerous inefficiently-sized or unorganized files, leading to longer query times and higher costs. Additionally, running the same slow queries by multiple teams can contribute to higher query times and costs.
  • Shifting query patterns: Over time, business requirements may evolve, leading to new ways of accessing and analyzing data that weren’t initially optimized for.
  • New queries and workloads: Emerging analytical use cases may require different query optimizations to maintain performance.

Meeting these challenges requires that various optimization tasks be performed, including partitioning, compaction, clustering, data skipping, and cleaning. These techniques enable better data organization, helping to prune unnecessary data during queries, and ensuring that data can be accessed efficiently and remains manageable over time. The less data you need to scan during a query, the faster and more cost-efficient the query will be.

In the sections below, we’ll dive into these techniques in detail, providing insights into how they can be applied to optimize storage in a data lakehouse and improve query performance.

Partitioning

Partitioning is one of the most fundamental techniques for optimizing data access with large-scale data. It involves dividing your data into smaller, more manageable chunks, or partitions, based on specific columns (usually commonly queried fields) or criteria, such as date, geographic region, or category. Partitioning helps reduce the amount of data scanned during queries by limiting the data that needs to be read to only the relevant partitions.

Figure 1: The Logs table before and after partitioning

Query:

SELECT details FROM Logs
WHERE event_date = '2018-12-02'

For example, consider the Logs table, as illustrated above. Initially, the table consists of a collection of data files, each representing a mix of log entries. If we run a query to retrieve log entries for a specific date range, say December 2, 2018, the query engine would have to scan through all the files in the table to find the matching rows. This approach is inefficient, particularly when dealing with large volumes of data.

In this case, if we partition the table by event_date, we can organize the log data into groups where each partition contains logs for a specific date. For instance, Partition A holds logs from December 1, 2018, Partition B from December 2, 2018, and so on. This segmentation allows the query engine to skip irrelevant partitions entirely. In our example, if we’re only interested in logs from 2018-12-02, the engine can ignore Partitions A and C, which contain logs from December 1 and 3, respectively.

Other than the performance benefits, partitioning also provides a range of other benefits for the data architecture. One of those advantages is scalability. By distributing data across multiple partitions, the system can scale horizontally, allowing for easy expansion as data volumes grow. Partitioning also enhances availability. Since the data is distributed across partitions, the failure of a single partition doesn’t bring the entire system down; instead, only the specific partition affected might be temporarily unavailable, while the rest of the data remains accessible. 

File Sizing (Compaction)

Over time, as data is ingested into storage systems , numerous small files are generated, leading to what is commonly known as the "small file problem." These small files cause query engines to process many files, increasing I/O overhead and slowing down query performance, because there is a cost to open, read and close each and every file (depending on the query). 

File sizing, or compaction, is an important technique in a data lakehouse architecture, that ensures small and inefficiently sized files are consolidated into larger, more optimal ones. Without regular compaction, file counts can grow exponentially, severely degrading system performance as data volumes expand.

The goal of file-sizing algorithms is to merge these smaller files into larger files that meet the system’s target file size, reducing the number of files and improving query efficiency. By doing so, systems can maintain fast query speeds and optimize storage usage, ultimately providing better scalability and cost efficiency.

One of the most commonly used algorithms for compaction across open table formats is the bin packing algorithm. This algorithm is designed to combine small files into larger, near-optimal-sized files, ensuring that the merged files are close to the system’s defined target size, without exceeding it. The bin packing approach is simple yet effective, as it efficiently groups small files to minimize wasted space and reduce the overall file count.

As shown in the image, small files (File 1: 40 MB, File 2: 80 MB, and File 3: 90 MB) are compacted into larger, optimally sized files, reaching the system's target file size of 120 MB. This process reduces the total number of files while ensuring that each file is fully utilized, leading to better query performance and lower storage overhead.

Figure 2: Compaction process that merges smaller-sized files into larger ones

A critical design feature of Apache Hudi’s architecture is its built-in ability to avoid the creation of small files. Hudi is uniquely designed to automatically write files to an optimal size, eliminating the need for manual file size management. During data ingestion, Hudi dynamically adjusts file sizes using configuration settings such as:

  • Max File Size: The upper limit for the size of any file.
  • Small File Size: The threshold below which a file is considered for compaction/file sizing.
  • Insert Split Size: The size of data splits during inserts.

Note: In Hudi, the process of merging smaller-sized files into larger ones - which is referred to as compaction in other lakehouse formats - is called file sizing. Hudi uses the word compaction to describe a different process: merging log files (stored in Avro) with existing base files (stored in Parquet) to create a new, updated base file (also stored in Parquet). New updates then go to a new changelog file, until the next Hudi compaction operation. 

As an example of the Hudi implementation of file sizing, in the image above, Hudi is using a Max File Size of 120 MB and a Small File Size of 100 MB. During ingestion, Hudi fills up files to meet the 120 MB limit, consolidating smaller files as needed to avoid leaving small files behind. This ensures that file sizes remain optimal, enhancing performance. This automatic file sizing process happens during every ingestion cycle, helping to maintain well-sized files and preventing the accumulation of small, inefficient files over time. Most importantly, none of this process needs user interference except for setting the desired threshold values.

Clustering

Clustering is an optimization technique used to reorganize and group data within files, helping improve query performance by minimizing the amount of data scanned. The core problem that clustering addresses is the misalignment between how data is written and how it is queried. Often, data is written based on arrival time, which doesn’t necessarily align with the event time or other critical attributes queried later. For example, in analytical workloads where queries frequently filter by specific columns like location or event date, data spread across many files forces the query engine to scan unnecessary files, which can have a huge impact on performance. Clustering techniques typically involve two main strategies: simple sorting and multi-dimensional clustering.

Sorting

The simplest form of clustering is sorting, where data is ordered by a specific column, such as city (shown in the image below) or another frequently queried field. Sorting ensures that data rows with similar values are grouped together in a data file - with each data file having a unique range of values for the specific column it is sorted on - improving data locality. This allows the query engine to quickly locate and scan the data rows relevant to the query, significantly reducing the number of files it needs to read. 

Figure 3: Sorting data by a particular field

However, while sorting is effective for queries with single predicates, it has limitations when queries involve multiple predicates. For instance, if the data is sorted by city, a query that filters by both city and trip_duration will still require scanning all files that match the city filter, even if the trip_duration filter excludes most of the records. This is where simple sorting falls short—it is not enough to optimize multi-dimensional queries that involve more than one field.

Multi-Dimensional Clustering

To address the limitations of simple sorting, more advanced clustering techniques such as multi-dimensional clustering come into play. Multi-dimensional clustering reorganizes data across multiple columns simultaneously, optimizing queries that filter on more than one dimension. One of the most popular approaches in multi-dimensional clustering is Z-ordering - a type of space-filling curve. Z-ordering orders data across multiple columns in a way that preserves spatial locality, which means that similar types of records end up in the same data file. It works well, for example, with queries that involve latitude and longitude data, ensuring that data from nearby locations is stored together, reducing the number of files that need to be read.

For instance, if you have a query filtering on both city and trip_duration, multi-dimensional clustering ensures that data is organized so that relevant records for both predicates are grouped together in the same files. This reduces the number of files the engine must scan, resulting in much faster query execution than simple sorting alone.

Other techniques, such as Hilbert curves, achieve similar goals, but are more effective for handling high-dimensional data and can provide better clustering when queries span multiple complex dimensions. Hilbert curves ensure that data points remain close together in multiple dimensions.

In terms of deploying these clustering algorithms, Apache Hudi offers different models. These are broadly classified into asynchronous and synchronous (inline). Inline clustering typically happens as part of the regular ingestion pipeline, which means the next round of ingestion cannot happen until the clustering is complete. Asynchronous clustering, on the other hand, allows Hudi to optimize data layout without blocking ongoing data ingestion. This makes Hudi particularly effective for workloads that require both frequent data ingestion and fast, efficient querying. There are three deployment modes for running asynchronous clustering:

  • Asynchronous execution within the same process
  • Asynchronous scheduling and execution by a separate process
  • Scheduling inline and executing asynchronously

Hudi allows you to cluster data based on different sorting or multi-dimensional techniques, such as Z-ordering and Hilbert curves, depending on the query patterns.

Data Skipping

Data skipping is a technique used to enhance query performance by eliminating the need to scan irrelevant data files. By doing so, data skipping minimizes the volume of data scanned, improving query execution times and reducing resource usage. These techniques have risen to prominence particularly in data lake storages, where the data size is usually high. 

Figure 4: Parquet footer metadata that shows the min/max values of fields and their count

A common example of data skipping can be seen with Parquet file formats, which store column-level statistics such as min and max values for each file. For instance, in the sales table shown above, each Parquet file records metadata about the minimum and maximum values for columns such as date or sales. When a query filters on a specific date range (e.g., date >= '2022-06-01'), the query engine uses these min/max values to skip files that fall outside the date range, avoiding the need to scan irrelevant data. Data skipping leverages these column statistics—such as min/max values, null counts, and record counts—to ensure that only relevant data is processed. This approach dramatically improves query performance by reducing unnecessary file scans, especially for large datasets. 

Similarly, Bloom filters provide another robust way to skip data in lakehouses. A Bloom filter is a probabilistic data structure that allows you to quickly determine whether a specific value might be present in a dataset. By using multiple hash functions to map elements into a fixed-size array of bits, Bloom filters can efficiently identify files or row groups that are “definitely not relevant” to a query. Although they can return false positives (indicating a value might be present when it isn’t), they never produce false negatives. This makes them highly efficient for reducing unnecessary file scans in large datasets.

While using column-level statistics from individual Parquet files and Bloom filters helps to skip irrelevant files, performing these operations for every file—opening each file, reading the footers, and closing it—can be costly at scale. To avoid this overhead, Apache Hudi leverages the metadata table. The Hudi metadata table is a multi-modal indexing subsystem that stores various types of indices, enabling the query engine to efficiently look up relevant data files based on query predicates, without needing to read the column stats or Bloom filters from each individual file. By employing these indexes, Hudi can prune irrelevant files and speed up queries - at times, by orders of magnitude.

Cleaning

In data lakehouse systems, cleaning is a critical process for maintaining performance and managing storage costs. As data is continuously written, updated, and deleted, older file versions and metadata tend to accumulate over time. This can lead to significant storage bloat and long file listing time, which negatively impact query performance. The query engine must sift through a growing amount of metadata and irrelevant data, which increases the time and resources required to execute queries. As in traditional database systems, cleaning addresses this by periodically removing outdated data versions and files that are no longer needed, and updating metadata to match, keeping the dataset lean and optimized for performance.

In Apache Hudi, cleaning is a critical table service used to reclaim space by removing older file versions, which are maintained for features like time travel and rollback. Hudi operates using Multi-Version Concurrency Control (MVCC), enabling snapshot isolation between readers and writers. While this allows for the storage of multiple file versions, helping with query rollback and historical data access, keeping too many versions can significantly increase storage costs.

To manage this balance between retaining history and minimizing storage bloat, Hudi employs an automatic cleaning service. By default, cleaning is triggered immediately after each commit to remove older file slices, ensuring that the growth of both metadata and data remain bounded. This automatic cleaning process helps keep the table lean, preventing the accumulation of obsolete files.

For more flexibility, users can adjust the cleaning frequency by using the hoodie.clean.max.commits configuration setting, allowing the cleaning process to run after a specified number of commits rather than after every commit. This ensures that the cleaning process can be tailored based on workload requirements and the desired tradeoff between history retention and storage costs.

Hudi offers different cleaning policies, such as:

  • Version-based Cleaning: Retains a set number of recent file versions while deleting older ones.
  • Commit-based Cleaning: Retains a certain number of commits (say, last 10).
  • Time-based Cleaning: Removes files that have exceeded a specified age (in hours), ensuring only recent data is retained.

Conclusion

Optimizing performance in a data lakehouse architecture is essential for managing growing datasets and ensuring efficient query execution. By organizing data in a way that aligns with query patterns, reducing the overhead caused by small files, and leveraging metadata to minimize unnecessary data scans, Apache Hudi provides fast, cost-effective queries. At the same time, regular maintenance processes ensure that outdated data is purged, preventing storage bloat. Together, these optimization techniques, working in tandem, ensure that as your data grows, your data architecture remains scalable and efficient, allowing you to meet evolving analytical needs without compromising on performance. 

If you’re looking for a solution to intelligently optimize performance for your Hudi tables and automate the tedious tasks of infrastructure management, check out Table Optimizer from Onehouse.ai.

Authors
No items found.

Read More:

Onehouse Achieves PCI Compliance Certification
Introducing Onehouse LakeView
Introducing Onehouse Table Optimizer
How to use Apache Hudi with Databricks

Subscribe to the Blog

Be the first to read new posts

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
We are hiring diverse, world-class talent — join us in building the future