January 9, 2025

Accelerating Lakehouse Table Performance - The Complete Guide

Accelerating Lakehouse Table Performance - The Complete Guide

Introduction

The concept of the data lakehouse was pioneered in 2016 by a team from Uber that was trying to solve the problem of storing large amounts of high-volume upsert data. This project eventually became Apache Hudi, then described as a “transactional data lake.” As time went on, other organizations created projects to decouple expensive data warehouse compute and storage, leveraging cloud object based systems for storing data as files. These projects became Apache Iceberg (born out of Netflix) and Linux Foundation's Delta Lake (born out of Databricks), eventually converging on the nomenclature of the “data lakehouse”.

While all three of these projects successfully worked to reduce the cost of running large-scale workloads on the data warehouse and unlocked never before seen data scale, , engineers soon found that they need to pay close attention to performance tuning to maintain a scaled lakehouse deployment. 

Some of the team behind Apache Hudi eventually went on to found Onehouse, where we are building a universal and truly interoperable data lakehouse platform. At Onehouse, both from our customers and from the many open source lakehouse projects that we support, we’ve seen the importance of effective table performance. This blog will dive deep into many of the mechanisms that engineering teams can employ to boost write and read performance on their lakehouse tables, serving as a guide for future engineering projects. I will focus on the features as they relate to deployments built on Apache Hudi, but many of these capabilities can be found in the other lakehouse table formats, and the techniques described here can be adapted for Iceberg and Delta Lake. For a detailed feature comparison between the formats, check out my colleague Kyle Weller’s blog here.

Why Lakehouse Performance Matters So Much

While to many of us data engineers this may seem like a somewhat obvious topic, it’s important to level set on the importance of performant tables as we develop our data systems. Now, the motivation for performance and the levels to which engineering teams strive to ensure this might vary from organization to organization — and frankly even differ vastly by use-case — the overarching principles remain. We all want to write data in a way that it is fast to query and use in our business applications, but without costing us a fortune to ingest and store. This is where performance tuning for the lakehouse matters so much. 

The lakehouse and open table formats can offer significant cost savings with the decoupling of storage and compute, with nearly infinitely scalable storage leveraging cloud hyperscaler storage services. However, they also come with the risk of having negatively impacting performance if the data is not properly optimized. Here are couple of drastic examples of unoptimized tables that result in significant cost and performance overages, with some even leading to system degradations:

  1. Queries for a large data science workload, where lakehouse tables were being queried for feature engineering pipelines. These pipelines had significant delays with pipelines showing Out of Memory errors. Running effective optimizations on these tables allowed for data skipping and enabled pipelines to proceed as needed.
  2. Large scale (1 TB/hour) event stream data needed to be processed into lakehouse tables. However, end-to-end write times on unoptimized tables were too slow and resulted in lack of data freshness and untenable volumes of data buildup in event stream topics.

How to Optimize Data Lakehouse Performance

In this section, we will talk about the common ways that lakehouse tables achieve performance gains and how engineers can leverage these techniques to our advantage. I will point out that many of these capabilities are applied to the underlying Parquet files that the lakehouse table formats use, so gains made in one of the table formats are reflected in all three formats when using an interoperability layer such as Apache XTable.

Selecting the Correct Table Type

When creating lakehouse tables, all three formats offer some version of two table types: Copy on Write  (COW) or Merge on Read (MOR). Each of these table types offers different performance characteristics and should be used for different types of workloads.

Table Type How it works When to use it
COW When data is added to a table, new file slices are created for each file group that has incoming data (for inserts, new file groups are created). Thus when you have inserts, there are new Parquet files created, but for updates, Parquet files are rewritten as a new file slice in the file group that contained the update.

This means that there is highly efficient read performance to these tables, and good write performance for workloads with relatively few updates (either in total volume or as a % of records).
  • Good for read optimized workloads
  • Very good for tables that are mostly inserts (> 90% of operations)
  • Simple batch jobs
  • If fast OLAP queries are a requirement
  • Low operational burden
MOR MOR tables process updates differently than COW tables. When updates are written to the tables, MOR tables create a series of log files, which are more lightweight than rewriting the base parquet file. Inserts are still handled as new file groups and written as base files.

However, on the read side, MOR tables create an additional constraint. In order to get the most accurate picture of the data, queries must read both base files (parquet) and log files. This means that queries may take longer than before. However, MOR tables offer read optimized queries, where the engine only queries the base files present - offering faster reads at the cost of data freshness.
  • Good for write optimized workloads
  • Makes processing updates and deletes more efficient
  • Ideal fit for streaming workloads
  • Change Data Capture Tables
  • Batch + Streaming Tables

If you are using Iceberg or Delta Lake, you will also have similar capabilities to choose from. Iceberg has more recently added support for COW and MOR tables, and Delta’s deletion vectors offer similar functionality to MOR tables on delete operations.

In Summary:

  • CoW tables rewrite the underlying Parquet files when writing updates or deletions MoR tables store these changes as log files, which make writing more efficient in the short term.
  • Use MoR tables if you have a high number or % of updates or deletes in your data.
  • Configure the compaction service (we’ll talk more about this later) to match your query speed needs for _ro and _rt tables.

Optimizing Partitioning Strategies

What is Partitioning? 

Partitioning is when you separate your data into different locations based on a particular key. This key splits your data into different folders (partitions) based on the value of the key. Below is a diagram showing this for a table of retail sales data that is partitioned by store_id.

As you can probably infer from the picture, partitions can provide a series of benefits that reader and writer clusters can leverage. Here are just a few listed.

  1. Horizontal scaling: Partitioning allows clusters to horizontally scale with the data volume. By partitioning the data into smaller groups, we can isolate each partition onto nodes and scale compute accordingly.
  2. Data skipping and partition pruning: Partitioning improves query performance because queries only need to look at files inside partitions relevant to the predicates of the query. Queries may never need to scan many , large amounts of files in the table may and will scan smaller data volumes.
  3. Easier manageability and life cycle management: You can isolate data based on the rules defined in the partition. For example, you can isolate “hot” data to specific partitions and manage lifecycle rules and time-to-live policies on a partition level.You can isolate data based on the rules defined in the partition. For example, you can isolate “hot” data to specific partitions and manage lifecycle rules and time-to-live policies on a partition level.

While these are great cases for employing partitioning, you might be asking yourself, “How would I be able to guarantee some performance benefit from partitioning?” Let’s talk about how partitioning plays into this.

Partitions Based on Query Shapes

Most significant data projects have multiple layers in their pipelines. Data is landed raw, then a series of downstream tables are created from this raw data through pipelines. When crafting the partitions for the tables, examine the pipelines that read from the table. Within those pipelines, there will be queries which read from the table and feed the pipeline. Here, you want to make sure that the filtering conditions in those queries match the partitioning scheme that you have.

Example:

SELECT sku,
       price,
       count(transaction_id),
       item_category
FROM transactions.clothing_sales
WHERE item_color = "blue"
GROUP BY sku

Here, we can see that the queries from the table are filtered on item_color. We should make sure that the partitioning strategy for this table is also based on item_color. As a result, this query will not have to open any of the files for items that are “red”, “green”, or any other color besides “blue”.

Use coarse grained partitioning 

From our previous section on partitioning, it might seem as though the strategy should just be to split up the partitions as small as they can go, splitting your data into thousands, or even tens of thousands of partitions. However, this causes far more problems than it solves (and frankly doesn’t solve that many problems). This section focuses on the balance of coarse partitioning.

At a high level, coarse grained partitioning indicates that partitions should not exceed a certain number and partition keys should maintain relatively low cardinalities (number of unique values). The reasoning for this is twofold. 

  1. Fine grained partitioning results in many small files. This is a result of writes being independent of partitioning (think back to our parallelism example for partitioned operations). Hence, the more partitions that you have, the more files that are generated, and with millions of operations on each table, you can result in partitions that have thousands of small files each representing just a few rows of data. Small files can bottleneck listing and file opening operations at query time, slowing down performance on the tables.
  2. Queries often have to list all partitions and the underlying files, resulting in lost listing operations when there are thousands of partitions. These also can slow down query performance as engines will have to list these many files for even simple queries.

Typically a coarse grained partition should contain at least 1 GB of data, but partitions can contain significantly more. The Databricks recommendation for partitioning is to only partition tables larger than 1 TB and to keep each partition larger than 1GB.

Note: Use immutable partition keys — for older Hudi versions (pre 1.0.X) 

In both Hudi (versions previous to 1.0.0, newer versions of Hudi solve this problem) and Delta Lake, each new partition is written as a folder in the data lake. When these partitions are created, they are not changeable. New partitions can be added - and will be added to the table as a new partition directory. Suppose you have a table that is partitioned by an item category_code. If the code representing winter jackets is updated, say from “1XY” (old code) to “WNTJ” (new code), the older partition’s name will not get changed, instead a new partition will be created for any newly arriving data with the name “WNTJ” and the data will be corrupted as half the winter jacket data will live in “1XY” and the other half will live in “WNTJ.” As you can see, you want to use keys that will not change as your partition keys.

Partitioning for Hudi 1.0.X

Hudi 1.0.0 has released a new way of thinking about partitioning. In this new vision, partitions are expressed as an abstraction of indexes (specifically the new expression index). What this means is that Hudi 1.0 partitions are now treated as a coarse grained index on a column value. From here, the partition stats index (built into the new partitioning mechanism) enables data skipping and faster reads.

For people using Iceberg and Delta lake, partitioning is still a useful strategy, especially at scale. Iceberg even has advanced partitioning capabilities such as partition evolution which makes it easier to create partitions of your data and to change the names of partitions.

In summary:

  • Partitioning enhances performance and scalability when data is segregated across multiple nodes
  • Partition design should align with query shapes — this is what allows data skipping in queries, as only a small subset of partitions need to be queried
    • Look for columns in the WHERE clauses or FILTERs in your queries
  • Use immutable partition keys to prevent data corruption

Data Lakehouse Indexes

Indexes were first added to the data lakehouse in 2020 in Apache Hudi. Since then, Hudi uses them as an integral tool in its performance acceleration arsenal with the most recent addition of secondary indexes in the Hudi 1.0.0 release. This multi-modal indexing system is powered by the Hudi metadata table. As a result, we’ll spend a bit of time talking about the metadata table before diving into Hudi ‘s indexing capabilities.

The Hudi Metadata Table

The Hudi metadata table is a MoR table within the .hoodie/ folder which can be used to accelerate your Hudi table performance. It stores the following information:

file_listings
  • Stored in the “files” partition
  • Stores file names, sizes, active states, and table partitions
  • Readers and writers no longer have to perform expensive file lookups, status checks, or file listing operations
column_statistics
  • Stored in the “column_stats” partition
  • Contains stats for select columns such as min, max, # of values, null count, data size, and more
  • Enables data skipping during queries, as statistics can be used to narrow down select files to query
partition_statistics
  • Stored in the “partition_stats” partition
  • Aggregates metrics for each table partition, allows for partition pruning, where entire partitions are skipped on query side for read performance benefits

Here is how you would add the metadata table to your jobs

Writer Configs

writer_metadata_configs = {
"hoodie.metadata.enable": "true", #enables metadata table
"hoodie.metadata.index.column.stats.enable": "true", # (optional)enables columnstats
"hoodie.metadata.index.column.stats.columns":["column_a","column_b","column_c"], #list of columns to track for columns stats
"hoodie.metadata.index.partition.stats.enable": "true" # (optional) enables tracking partition stats
}

Reader Configs

reader_metadata_configs = {
    "hoodie.metadata.enable":"true", # tells reader to use metadata table
    "hoodie.enable.data.skipping": "true" # enables data skipping using column_stats
}

Hudi Indexes

Because of the advanced metadata capabilities that we just discussed, Hudi also has the ability to generate indexes — a first and only format capability in the market. Iceberg and Delta do not have this to the same extent as they don’t employ the structured metadata system that Hudi does (although Delta does offer a data skipping capability, which is also built into Hudi). These indexes can be used to accelerate both read and write performance (read side vs writer side indexes). Certain indexes are better for accelerating read performance while others are better for write performance. In this section, we will look at the different Hudi indexing properties and how to set them up in your lakehouse deployment.

Bloom Index

In 2022, Hudi introduced the Bloom Index, which employs the Bloom filter data structure to speed up queries across the dataset. This index creates a hash based lookup on the specified index key, which speeds up searches for the existence of the key within the table. Bloom filters provide zero misses for values through its “No False Negative” guarantee that are inserted into the hash, guaranteeing that queries will get a valid hash hit for all records.

When to use: Bloom filters are applied across the entire dataset, and store the minimum and maximum values in the Parquet file footers. As a result, since the footers of each file are re-hashed when updates are performed, this filter becomes difficult when you have a large table (lots of Parquet files) with random updates across the table. You should use the Bloom index when you have a known set of later arriving data to the table and want to leverage the significant data skipping advantages to limit files scanned on insert.

Configs for enabling Bloom Index

PYTHON
SQL
bloom_index_configs = { "hoodie.metadata.index.bloom.filter.enable": true, "hoodie.index.type": "BLOOM" }
CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING bloom_filters(driver);

Record Index

Hudi also had a record level index. This allows users to perform fast writes and lookups on a single record. The Hudi record index is stored in the record_index partition of the metadata table. This index generates a hash on the record key specified on the table (globally unique to the table), and uses a hash-shard to create very fast lookups of single records.

When to use: If you have large tables where a single record or small batch of records needs to be searched and/or updated from the table. This is relevant for use cases such as finding single entries, or making updates that are context aware to a single entry in the table. The record level index drastically speeds up the search process for these records reducing both query and write times.

Configs to enable Record Index

PYTHON
SQL
record_index_configs = { "hoodie.metadata.record.index.enable": true, "hoodie.index.type": "RECORD_INDEX" }
CREATE INDEX record_index ON hudi_indexed_table (record_key_column);

Secondary Indexes (require Hudi 1.0.0)

In the most recent version of Hudi (1.0.0) released, Hudi announced the first capability for indexes on secondary columns (non-record key). This means that users can set indexes on columns that are not the Hudi record key and hence speed up non record key lookups on the table.

When to use: This is a great choice for speeding up queries that have predicates that are not the record key. In order to create this index, you can use the SQL syntax released with Hudi 1.0.

CREATE INDEX secondary_index ON hudi_indexed_table (index_column);

In summary:

  • The Hudi metadata table is a powerful tool that can be used for accelerating table performance
  • Enable the metadata table and data skipping to speed up queries on expected predicates
  • Leverage Bloom Index or Record Index for fast lookups
    • Record Index: Single record lookups
    • Bloom Index: Larger lookup
    • Additional Indexes also available (HBase, Expression, Simple, etc.) for more use-cases
  • Hudi 1.0 adds secondary indexes to enable indexing on multiple columns, similar to a database

Table Services

Table services are an important part of maintaining healthy and performant lakehouse tables — across all three table formats. In this section we will be going through a basic overview of the different services offered and talking about some of the improvements that you can leverage when running these services. Each of these table services may be referred to via slightly different nomenclature in the different table formats, but you’ll find that many of the core concepts remain similar. For the context of this section, we will use primarily the Hudi naming convention and will refer to variations for Iceberg and Delta as needed.

Cleaning

All lakehouse table formats require cleaning of old file versions. When new writes are performed on the table (“commits”) there is often a new filegroup that is generated for these writes. When this happens, this new commit is represented in the timeline. Naturally, we cannot have too many of these new file groups created indefinitely as that would greatly increase the storage costs of the table. Thus, the cleaner service goes and deletes older versions of the files, only keeping the desired number of versions specified in the configurations.

When running the cleaner service, it is important to track a couple of things. First, you must track how much history of the table you will need to store. It is not uncommon to perform time-travel or incremental queries on your tables, where you may need to query versions of the table that are hours or days old. You may also want to keep prior versions of the table in case you get bad data in your upstream pipeline. You can configure this as either # of hours retained, or as a # of commits retained. The recommendation is to keep this at the lowest safe number such that you can be confident you have the history needed for your use-case.

The second consideration is how often to run the cleaner service. The cleaner service (as with all table services) uses compute resources to execute and does not have to be run after every commit. Hence, it may be prudent to only run the service at set intervals of commits. This interval is best determined by your use-case and the amount of data that is being processed. I have included below a sample cleaner job run via Onehouse’s Table Optimizer, that runs the cleaner service at an interval of every 5 commits and retains 2 days worth of commit history.

For Iceberg and Delta tables, similar functionality exists that can be leveraged to “clean” older file versions and reclaim storage space and listing operations.  Iceberg has a series of maintenance operations that users can run which accomplish this, including expiring snapshots, deleting orphaned files, and removing old metadata files. In Delta Lake, operations such as vacuuming and log retention remove files marked for deletion (“tombstone files”) as new files are added to replace these.

Clustering

Inside each of the different table formats, you will also find the ability to sort and fill the Parquet files using advanced space filling techniques. Hudi calls this operation “clustering”. What this means is that the data will be filled into the Parquet files, sorted according to the selected keys. This means that if a query has predicates that match the keys that are selected for sorting, then only a small subset of files will be scanned for matching the predicates, according to the data skipping principles similar to what is outlined in the Bloom index section of this blog.

Hudi, Delta and Iceberg all support multiple space filling curves which help to efficiently sort the data inside of these Parquet files. There are 3 main sorting options that are used —  Linear,  Z-order, and Hilbert curves. They each sort the data differently according to their characteristics described below. 

Linear Using this clustering or sorting strategy, the data files in each partition of a Hudi table - assuming it is a partitioned table - will be sorted by one or more columns, and the order of these columns plays a critical role.
Z-order Z-ordering is a data layout optimization technique that enhances query performance by sorting data across multiple columns. It maps multidimensional data into a one-dimensional space while preserving data locality, meaning that data points close in the original, multidimensional space remain close in the one-dimensional space.
Hilbert Curves Hilbert curves are similar to Z-order in that it is a space filling technique. It uses a different space filling mechanism under the hood, but also maintains the characteristics of mapping multidimensional space into a single dimension - preserving the relationship between different attributes in the specified key.

Z-order vs Hilbert Curve Diagrams - credits to Shiyan Xu’s Hudi e-book

How to set up clustering and when to use each mechanism

When selecting which of these sorting methods to use, there are some general rules of thumb that can be followed in order to maintain the correct balance between writer workload and query performance acceleration. My colleague Po Hong ran a series of tests and created the following illustrative scenario. 

Table_1 is using a linear sort order on columns (A,B,C), while table_2 uses a Z-order on the same columns (A,B,C). Now consider the following three queries:  

Q1:  Select A, count(*) From table Group By A;

Q2:  Select B, count(*) From table Group By B;

Q3:  Select C, count(*) From table Group By C;

The performance of these queries ran as follows — Q1 performed roughly equal for linear and Z-order, but Q2 and Q3 drastically outperformed with Z-order. 

From his testing, he came up with the following pattern — if the query predicate matches the order of the clustering key (i.e. predicate is of the form A, A+B, or A+B+C), then the performance will be good for linear sorting. In other permutations of the query predicates (B, B+C, A+C, etc.), the space filling curves (Z-order and Hilbert) will drastically outperform linear clustering. In addition, he found that selecting high cardinality clustering keys adds to performance improvements as it maximizes cluster parallelism and reduces the data scanned per query predicate.

Onehouse Table Optimizer makes setting up and modifying clustering simple and intuitive, with just a few clicks of the mouse as shown in the screenshot below. It also makes it easy for users to set the frequency for executing clustering in an incremental fashion, where tables will be clustered on the data written, maximizing compute efficiencies.

Iceberg and Delta Lake also offer Z-order space filling curves to fill the parquet files written in those formats, which will need to be expressed in the writer configs when writing the tables.

Compaction and File Sizing

The different table formats have different meanings for the term “compaction” but regardless of the format, they all help address a key problem faced by data lakehouse operators: file sizing. As data is ingested into data lake systems, users can often find that many small files are generated. These small files create I/O overhead onto the reader and writer systems which can bottleneck pipelines. In fact, in their 2024 re:Invent announcement of S3 tables, Amazon Web Services even referred to maintaining small files as “the most common problem” that they noticed customers facing when operating large scale data lakehouse deployments. The compaction services aim to solve this problem (in Hudi, compaction refers to something different, which we will also cover in this section).

In Iceberg, file sizing can be defined via the compaction commands defined in table maintenance. Hudi has auto sizing file sizing built into the writer operations. There are a few writer configs that should be enabled to set this capability.

hudi_file_sizing_configs = {
"hoodie.parquet.max.file.size"=125829120 #sets max file size to optimal size of # 120 MB
"hoodie.parquet.small.file.limit"=104857600 # limits files to be above 100 MB in size
}

The above combination of configs will tell the Hudi writer to try to keep all files to between 100-120 MBs in size. This maintains the optimal file size of 120 MB for each Parquet file.

Hudi refers to another process by the term “compaction.” If you remember our earlier discussion on table types, you will recall that in MOR tables changes are initially written in log files rather than in the base Parquet files. At set intervals, these log files will need to be merged into the base files so that the readers do not have to parse through a long set of log files when reading from the MOR tables. This merging process is done by the compaction service in Hudi. Users will have to set the frequency for this compaction service, as balancing how often this merging occurs is important to maintaining the efficiencies of writing to log files, but reading from the larger Parquet base files. Onehouse Table Optimizer allows for an easy and hands free approach to configuring this service and enabling optimal performance of the compaction service. Shown below is a screenshot of setting up the compaction service to execute via Table Optimizer.

Inline vs Asynchronous Table Services

Each of the table services that are described above – clustering, cleaning, and compaction – can be executed in either in-line or async mechanisms. In inline execution, the defined table services are executed by the writer sequentially after the write commit occurs. This is easy to configure, but adds the limitation whereby the total write time before new data is available to be processed is extended, thus slowing down the end to end write operations.

This is where asynchronous table services, using Hudi’s multi-writer Concurrency Control (CC) , come into play. In this method, the main writer continues to process data, while secondary writers execute the table services operations on the table. A lock provider is used to ensure that there are no write conflicts between the different writers. A diagram (shown below) and detailed explanation of this process can be found in my colleague Lin Liu’s blog.

As you can see, asynchronous and standalone table services allow for faster end-to-end write operations and data freshness on the table, but take significant additional work in setup. This is where Onehouse Table Optimizer comes into play. By automating the compute resources and lock provider orchestration, users are able to achieve high performance asynchronous table services without heavy lifting.

In Summary:

  • Enable clustering on your tables to sort the data according to your desired query predicates
    • Use linear clustering for predicates that are in order
    • Use Z-order or Hilbert clustering for predicates that are in order or have multi-dimensional relationships
    • Select high cardinality clustering keys to further reduce data scanned and maximize parallelism
  • Configure the cleaning service to remove unneeded file versions, only keeping the time travel history that is needed for your use-case
  • Run table services asynchronously to speed up writer executions
    • Use a managed service to alleviate the setup pains of concurrency control

Wrap Up

As we have seen over the course of this blog, the options to optimize lakehouse table performance on the format and storage layer are significant. At Onehouse, we recognize this and are working to make this experience easier to manage for our customers through our data platform, standalone capabilities found in our Table Optimizer, and through observability offered in the free and open Onehouse LakeView application. As more lakehouse formats such as Apache Hudi, Apache Iceberg, and Delta Lake are adopted, we hope that these deployments continue to ease the burden of scale and cost for data platform organizations around the world. You can try the Onehouse Universal Data Lakehouse for an easy way to get started with all of the needed optimizations here.

I have not included all of the optimizations used by our team in this blog. If you would like to see any other optimizations included or feel that I should add capabilities to this blog, please feel free to reach out to me at chandra@onehouse.ai. I’m always excited to have a friendly conversation about the exciting capabilities of this space and to grow my own technical knowledge. 

Authors
Chandra Krishnan
Chandra Krishnan
Solutions Engineer

Chandra is a Solutions Engineer at Onehouse, building large scale data products. Prior to this, he worked as a Data and ML Engineer at AWS. Education: University of Michigan.

Subscribe to the Blog

Be the first to read new posts

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
We are hiring diverse, world-class talent — join us in building the future