The concept of the data lakehouse was pioneered in 2016 by a team from Uber that was trying to solve the problem of storing large amounts of high-volume upsert data. This project eventually became Apache Hudi, then described as a “transactional data lake.” As time went on, other organizations created projects to decouple expensive data warehouse compute and storage, leveraging cloud object based systems for storing data as files. These projects became Apache Iceberg (born out of Netflix) and Linux Foundation's Delta Lake (born out of Databricks), eventually converging on the nomenclature of the “data lakehouse”.
While all three of these projects successfully worked to reduce the cost of running large-scale workloads on the data warehouse and unlocked never before seen data scale, , engineers soon found that they need to pay close attention to performance tuning to maintain a scaled lakehouse deployment.
Some of the team behind Apache Hudi eventually went on to found Onehouse, where we are building a universal and truly interoperable data lakehouse platform. At Onehouse, both from our customers and from the many open source lakehouse projects that we support, we’ve seen the importance of effective table performance. This blog will dive deep into many of the mechanisms that engineering teams can employ to boost write and read performance on their lakehouse tables, serving as a guide for future engineering projects. I will focus on the features as they relate to deployments built on Apache Hudi, but many of these capabilities can be found in the other lakehouse table formats, and the techniques described here can be adapted for Iceberg and Delta Lake. For a detailed feature comparison between the formats, check out my colleague Kyle Weller’s blog here.
While to many of us data engineers this may seem like a somewhat obvious topic, it’s important to level set on the importance of performant tables as we develop our data systems. Now, the motivation for performance and the levels to which engineering teams strive to ensure this might vary from organization to organization — and frankly even differ vastly by use-case — the overarching principles remain. We all want to write data in a way that it is fast to query and use in our business applications, but without costing us a fortune to ingest and store. This is where performance tuning for the lakehouse matters so much.
The lakehouse and open table formats can offer significant cost savings with the decoupling of storage and compute, with nearly infinitely scalable storage leveraging cloud hyperscaler storage services. However, they also come with the risk of having negatively impacting performance if the data is not properly optimized. Here are couple of drastic examples of unoptimized tables that result in significant cost and performance overages, with some even leading to system degradations:
In this section, we will talk about the common ways that lakehouse tables achieve performance gains and how engineers can leverage these techniques to our advantage. I will point out that many of these capabilities are applied to the underlying Parquet files that the lakehouse table formats use, so gains made in one of the table formats are reflected in all three formats when using an interoperability layer such as Apache XTable.
When creating lakehouse tables, all three formats offer some version of two table types: Copy on Write (COW) or Merge on Read (MOR). Each of these table types offers different performance characteristics and should be used for different types of workloads.
If you are using Iceberg or Delta Lake, you will also have similar capabilities to choose from. Iceberg has more recently added support for COW and MOR tables, and Delta’s deletion vectors offer similar functionality to MOR tables on delete operations.
Partitioning is when you separate your data into different locations based on a particular key. This key splits your data into different folders (partitions) based on the value of the key. Below is a diagram showing this for a table of retail sales data that is partitioned by store_id.
As you can probably infer from the picture, partitions can provide a series of benefits that reader and writer clusters can leverage. Here are just a few listed.
While these are great cases for employing partitioning, you might be asking yourself, “How would I be able to guarantee some performance benefit from partitioning?” Let’s talk about how partitioning plays into this.
Most significant data projects have multiple layers in their pipelines. Data is landed raw, then a series of downstream tables are created from this raw data through pipelines. When crafting the partitions for the tables, examine the pipelines that read from the table. Within those pipelines, there will be queries which read from the table and feed the pipeline. Here, you want to make sure that the filtering conditions in those queries match the partitioning scheme that you have.
Example:
SELECT sku,
price,
count(transaction_id),
item_category
FROM transactions.clothing_sales
WHERE item_color = "blue"
GROUP BY sku
Here, we can see that the queries from the table are filtered on item_color. We should make sure that the partitioning strategy for this table is also based on item_color. As a result, this query will not have to open any of the files for items that are “red”, “green”, or any other color besides “blue”.
From our previous section on partitioning, it might seem as though the strategy should just be to split up the partitions as small as they can go, splitting your data into thousands, or even tens of thousands of partitions. However, this causes far more problems than it solves (and frankly doesn’t solve that many problems). This section focuses on the balance of coarse partitioning.
At a high level, coarse grained partitioning indicates that partitions should not exceed a certain number and partition keys should maintain relatively low cardinalities (number of unique values). The reasoning for this is twofold.
Typically a coarse grained partition should contain at least 1 GB of data, but partitions can contain significantly more. The Databricks recommendation for partitioning is to only partition tables larger than 1 TB and to keep each partition larger than 1GB.
In both Hudi (versions previous to 1.0.0, newer versions of Hudi solve this problem) and Delta Lake, each new partition is written as a folder in the data lake. When these partitions are created, they are not changeable. New partitions can be added - and will be added to the table as a new partition directory. Suppose you have a table that is partitioned by an item category_code. If the code representing winter jackets is updated, say from “1XY” (old code) to “WNTJ” (new code), the older partition’s name will not get changed, instead a new partition will be created for any newly arriving data with the name “WNTJ” and the data will be corrupted as half the winter jacket data will live in “1XY” and the other half will live in “WNTJ.” As you can see, you want to use keys that will not change as your partition keys.
Hudi 1.0.0 has released a new way of thinking about partitioning. In this new vision, partitions are expressed as an abstraction of indexes (specifically the new expression index). What this means is that Hudi 1.0 partitions are now treated as a coarse grained index on a column value. From here, the partition stats index (built into the new partitioning mechanism) enables data skipping and faster reads.
For people using Iceberg and Delta lake, partitioning is still a useful strategy, especially at scale. Iceberg even has advanced partitioning capabilities such as partition evolution which makes it easier to create partitions of your data and to change the names of partitions.
Indexes were first added to the data lakehouse in 2020 in Apache Hudi. Since then, Hudi uses them as an integral tool in its performance acceleration arsenal with the most recent addition of secondary indexes in the Hudi 1.0.0 release. This multi-modal indexing system is powered by the Hudi metadata table. As a result, we’ll spend a bit of time talking about the metadata table before diving into Hudi ‘s indexing capabilities.
The Hudi metadata table is a MoR table within the .hoodie/ folder which can be used to accelerate your Hudi table performance. It stores the following information:
Here is how you would add the metadata table to your jobs
Writer Configs
writer_metadata_configs = {
"hoodie.metadata.enable": "true", #enables metadata table
"hoodie.metadata.index.column.stats.enable": "true", # (optional)enables columnstats
"hoodie.metadata.index.column.stats.columns":["column_a","column_b","column_c"], #list of columns to track for columns stats
"hoodie.metadata.index.partition.stats.enable": "true" # (optional) enables tracking partition stats
}
Reader Configs
reader_metadata_configs = {
"hoodie.metadata.enable":"true", # tells reader to use metadata table
"hoodie.enable.data.skipping": "true" # enables data skipping using column_stats
}
Because of the advanced metadata capabilities that we just discussed, Hudi also has the ability to generate indexes — a first and only format capability in the market. Iceberg and Delta do not have this to the same extent as they don’t employ the structured metadata system that Hudi does (although Delta does offer a data skipping capability, which is also built into Hudi). These indexes can be used to accelerate both read and write performance (read side vs writer side indexes). Certain indexes are better for accelerating read performance while others are better for write performance. In this section, we will look at the different Hudi indexing properties and how to set them up in your lakehouse deployment.
In 2022, Hudi introduced the Bloom Index, which employs the Bloom filter data structure to speed up queries across the dataset. This index creates a hash based lookup on the specified index key, which speeds up searches for the existence of the key within the table. Bloom filters provide zero misses for values through its “No False Negative” guarantee that are inserted into the hash, guaranteeing that queries will get a valid hash hit for all records.
When to use: Bloom filters are applied across the entire dataset, and store the minimum and maximum values in the Parquet file footers. As a result, since the footers of each file are re-hashed when updates are performed, this filter becomes difficult when you have a large table (lots of Parquet files) with random updates across the table. You should use the Bloom index when you have a known set of later arriving data to the table and want to leverage the significant data skipping advantages to limit files scanned on insert.
Configs for enabling Bloom Index
Hudi also had a record level index. This allows users to perform fast writes and lookups on a single record. The Hudi record index is stored in the record_index partition of the metadata table. This index generates a hash on the record key specified on the table (globally unique to the table), and uses a hash-shard to create very fast lookups of single records.
When to use: If you have large tables where a single record or small batch of records needs to be searched and/or updated from the table. This is relevant for use cases such as finding single entries, or making updates that are context aware to a single entry in the table. The record level index drastically speeds up the search process for these records reducing both query and write times.
Configs to enable Record Index
In the most recent version of Hudi (1.0.0) released, Hudi announced the first capability for indexes on secondary columns (non-record key). This means that users can set indexes on columns that are not the Hudi record key and hence speed up non record key lookups on the table.
When to use: This is a great choice for speeding up queries that have predicates that are not the record key. In order to create this index, you can use the SQL syntax released with Hudi 1.0.
CREATE INDEX secondary_index ON hudi_indexed_table (index_column);
Table services are an important part of maintaining healthy and performant lakehouse tables — across all three table formats. In this section we will be going through a basic overview of the different services offered and talking about some of the improvements that you can leverage when running these services. Each of these table services may be referred to via slightly different nomenclature in the different table formats, but you’ll find that many of the core concepts remain similar. For the context of this section, we will use primarily the Hudi naming convention and will refer to variations for Iceberg and Delta as needed.
All lakehouse table formats require cleaning of old file versions. When new writes are performed on the table (“commits”) there is often a new filegroup that is generated for these writes. When this happens, this new commit is represented in the timeline. Naturally, we cannot have too many of these new file groups created indefinitely as that would greatly increase the storage costs of the table. Thus, the cleaner service goes and deletes older versions of the files, only keeping the desired number of versions specified in the configurations.
When running the cleaner service, it is important to track a couple of things. First, you must track how much history of the table you will need to store. It is not uncommon to perform time-travel or incremental queries on your tables, where you may need to query versions of the table that are hours or days old. You may also want to keep prior versions of the table in case you get bad data in your upstream pipeline. You can configure this as either # of hours retained, or as a # of commits retained. The recommendation is to keep this at the lowest safe number such that you can be confident you have the history needed for your use-case.
The second consideration is how often to run the cleaner service. The cleaner service (as with all table services) uses compute resources to execute and does not have to be run after every commit. Hence, it may be prudent to only run the service at set intervals of commits. This interval is best determined by your use-case and the amount of data that is being processed. I have included below a sample cleaner job run via Onehouse’s Table Optimizer, that runs the cleaner service at an interval of every 5 commits and retains 2 days worth of commit history.
For Iceberg and Delta tables, similar functionality exists that can be leveraged to “clean” older file versions and reclaim storage space and listing operations. Iceberg has a series of maintenance operations that users can run which accomplish this, including expiring snapshots, deleting orphaned files, and removing old metadata files. In Delta Lake, operations such as vacuuming and log retention remove files marked for deletion (“tombstone files”) as new files are added to replace these.
Inside each of the different table formats, you will also find the ability to sort and fill the Parquet files using advanced space filling techniques. Hudi calls this operation “clustering”. What this means is that the data will be filled into the Parquet files, sorted according to the selected keys. This means that if a query has predicates that match the keys that are selected for sorting, then only a small subset of files will be scanned for matching the predicates, according to the data skipping principles similar to what is outlined in the Bloom index section of this blog.
Hudi, Delta and Iceberg all support multiple space filling curves which help to efficiently sort the data inside of these Parquet files. There are 3 main sorting options that are used — Linear, Z-order, and Hilbert curves. They each sort the data differently according to their characteristics described below.
Z-order vs Hilbert Curve Diagrams - credits to Shiyan Xu’s Hudi e-book
When selecting which of these sorting methods to use, there are some general rules of thumb that can be followed in order to maintain the correct balance between writer workload and query performance acceleration. My colleague Po Hong ran a series of tests and created the following illustrative scenario.
Table_1 is using a linear sort order on columns (A,B,C), while table_2 uses a Z-order on the same columns (A,B,C). Now consider the following three queries:
Q1: Select A, count(*) From table Group By A;
Q2: Select B, count(*) From table Group By B;
Q3: Select C, count(*) From table Group By C;
The performance of these queries ran as follows — Q1 performed roughly equal for linear and Z-order, but Q2 and Q3 drastically outperformed with Z-order.
From his testing, he came up with the following pattern — if the query predicate matches the order of the clustering key (i.e. predicate is of the form A, A+B, or A+B+C), then the performance will be good for linear sorting. In other permutations of the query predicates (B, B+C, A+C, etc.), the space filling curves (Z-order and Hilbert) will drastically outperform linear clustering. In addition, he found that selecting high cardinality clustering keys adds to performance improvements as it maximizes cluster parallelism and reduces the data scanned per query predicate.
Onehouse Table Optimizer makes setting up and modifying clustering simple and intuitive, with just a few clicks of the mouse as shown in the screenshot below. It also makes it easy for users to set the frequency for executing clustering in an incremental fashion, where tables will be clustered on the data written, maximizing compute efficiencies.
Iceberg and Delta Lake also offer Z-order space filling curves to fill the parquet files written in those formats, which will need to be expressed in the writer configs when writing the tables.
The different table formats have different meanings for the term “compaction” but regardless of the format, they all help address a key problem faced by data lakehouse operators: file sizing. As data is ingested into data lake systems, users can often find that many small files are generated. These small files create I/O overhead onto the reader and writer systems which can bottleneck pipelines. In fact, in their 2024 re:Invent announcement of S3 tables, Amazon Web Services even referred to maintaining small files as “the most common problem” that they noticed customers facing when operating large scale data lakehouse deployments. The compaction services aim to solve this problem (in Hudi, compaction refers to something different, which we will also cover in this section).
In Iceberg, file sizing can be defined via the compaction commands defined in table maintenance. Hudi has auto sizing file sizing built into the writer operations. There are a few writer configs that should be enabled to set this capability.
hudi_file_sizing_configs = {
"hoodie.parquet.max.file.size"=125829120 #sets max file size to optimal size of # 120 MB
"hoodie.parquet.small.file.limit"=104857600 # limits files to be above 100 MB in size
}
The above combination of configs will tell the Hudi writer to try to keep all files to between 100-120 MBs in size. This maintains the optimal file size of 120 MB for each Parquet file.
Hudi refers to another process by the term “compaction.” If you remember our earlier discussion on table types, you will recall that in MOR tables changes are initially written in log files rather than in the base Parquet files. At set intervals, these log files will need to be merged into the base files so that the readers do not have to parse through a long set of log files when reading from the MOR tables. This merging process is done by the compaction service in Hudi. Users will have to set the frequency for this compaction service, as balancing how often this merging occurs is important to maintaining the efficiencies of writing to log files, but reading from the larger Parquet base files. Onehouse Table Optimizer allows for an easy and hands free approach to configuring this service and enabling optimal performance of the compaction service. Shown below is a screenshot of setting up the compaction service to execute via Table Optimizer.
Each of the table services that are described above – clustering, cleaning, and compaction – can be executed in either in-line or async mechanisms. In inline execution, the defined table services are executed by the writer sequentially after the write commit occurs. This is easy to configure, but adds the limitation whereby the total write time before new data is available to be processed is extended, thus slowing down the end to end write operations.
This is where asynchronous table services, using Hudi’s multi-writer Concurrency Control (CC) , come into play. In this method, the main writer continues to process data, while secondary writers execute the table services operations on the table. A lock provider is used to ensure that there are no write conflicts between the different writers. A diagram (shown below) and detailed explanation of this process can be found in my colleague Lin Liu’s blog.
As you can see, asynchronous and standalone table services allow for faster end-to-end write operations and data freshness on the table, but take significant additional work in setup. This is where Onehouse Table Optimizer comes into play. By automating the compute resources and lock provider orchestration, users are able to achieve high performance asynchronous table services without heavy lifting.
As we have seen over the course of this blog, the options to optimize lakehouse table performance on the format and storage layer are significant. At Onehouse, we recognize this and are working to make this experience easier to manage for our customers through our data platform, standalone capabilities found in our Table Optimizer, and through observability offered in the free and open Onehouse LakeView application. As more lakehouse formats such as Apache Hudi, Apache Iceberg, and Delta Lake are adopted, we hope that these deployments continue to ease the burden of scale and cost for data platform organizations around the world. You can try the Onehouse Universal Data Lakehouse for an easy way to get started with all of the needed optimizations here.
I have not included all of the optimizations used by our team in this blog. If you would like to see any other optimizations included or feel that I should add capabilities to this blog, please feel free to reach out to me at chandra@onehouse.ai. I’m always excited to have a friendly conversation about the exciting capabilities of this space and to grow my own technical knowledge.
Be the first to read new posts