May 2, 2023

An Introduction to the Hudi and Flink Integration

An Introduction to the Hudi and Flink Integration

In September 2020, while working on the Flink SQL Engine team at Alibaba Cloud, I witnessed the data lakehouse concept slowly gaining traction in the data engineering world. However, many engineers found it challenging to understand due to its radical departure from traditional data storage and processing approaches. Previously, data lakes were regarded as vast repositories of raw data storage. With the data lakehouse concept, engineers could now manage raw data storage and processing, as well as structured querying and analysis.

Around the same time, emerging technologies centered around the data lakehouse concept, such as Apache Hudi, Delta Lake, and Apache Iceberg, were gaining popularity. Limited documentation on data lakehouse architectures made it difficult for some to adapt, and questions arose about integrating these systems into existing tech stacks and potential compatibility issues. The steep learning curve and uncertainties surrounding the transformative potential of the data lakehouse concept presented challenges for engineers adopting this new paradigm.

While exploring data lakehouse technologies, I was also deeply involved in the Flink community, addressing user challenges and enhancing the open-source project. Flink specializes in robust stream processing capabilities, enabling users to process vast volumes of data in real time. However, it lacked a built-in storage component and relied on various external systems like Apache Kafka, MySQL, and S3. The Flink community was seeking a mutable data lakehouse system that can support changelog ingestion, is capable of handling row-level updates and deletes efficiently, and has core primitives and architecture that support streaming computations.

In an effort to combine the strengths of Flink’s streaming capabilities with the advantages of data lakehouse table formats, we explored integrating the Flink engine with Apache Iceberg. Unfortunately, this integration yielded minimal benefits. Upon reflection, we discovered that Iceberg initiatives were rooted in batch processing, such as pluggable file formats and metadata for query optimization, which were common pain points inherited from Apache Hive. While this approach did solve certain issues, it did not resonate strongly with the streaming-oriented users within the Flink community.

This prompted me to pursue an integration between Flink and Apache Hudi. Hudi not only provides ACID semantics for data lakes but also enables efficient upsert and delete capabilities at the record level, allowing seamless handling of new and modified records during data processing. Moreover, Hudi supports incremental pulls, reading only the data that changed since the last pull instead of reprocessing the entire table. Some use cases that resonated well with the Flink community involved incremental ingestion of database changelogs, incremental ETL pipeline, partial update and join, and near-real-time materialization view with abundant OLAP query engines. 

In 2021, I began working on the integration, which I’ll detail in the next section. As of May 2023, over 100 companies and more than 3600 enterprises use the Hudi-Flink integration in production. Both the Flink and Hudi communities continue to strengthen the integration, with contributors from around the world. As a PMC member of both Flink and Hudi, I am enthusiastic about continuing my technical contributions to bridge the streaming and data lakehouse communities, fostering the development of low-latency and near-real-time data applications.

The Internals of Flink & Hudi

The following sections describes the challenges with the Bloom filter index and how the Hudi-Flink integration efficiently supports high volume streaming workload with Flink’s stateful backend. After, I go over the purpose of the stream reader and how Flink has built-in support for automatic recovery in case the stream reader fails. Finally, I’ll cover three use cases you can build with this integration and show a code sample of how to get started!

The writer mechanism

Prior to this integration, many users defaulted to using the Bloom filter index to ingest, process, and locate data for streaming workloads. Let’s see how the Hudi-Flink integration redesigned the index layer to overcome some challenge associated with Bloom filter index with respect to streaming worklads.

How a record is written into Hudi

The diagram above represents at a high level how writes happen in Hudi. To support efficient upserts in mutable datasets, Hudi maps a given record key consistently to a fileId via an indexing mechanism. This mapping between record key and file group/file id, never changes once the first version of a record has been written to a file. In short, the mapped file group contains all versions of a group of records. Each time data is ingested the inserts and updates are tagged based on the above mapping lookups. 

Hudi’s indexing layer is pluggable, and prior to the Hudi-Flink integration, the Bloom filter index was still a popular choice for streaming computes. However the possibility of false positives and the need to constantly update/add new elements to the Bloom filter with every new ingestion can quickly become compute intensive when dealing with large volume of data in streaming pipelines. For batch workloads, this performance is still acceptable, whereas streaming workloads could be impacted with end-to-end latency. 

In the Hudi-Flink integration, the index layer is redesigned to avoid reundant scans for each input dataset that the Bloom filter index encounters. The mappings are stored directly in Flink’s state-backend, which serves as storage for all Flink operators. The state-backend offers excellent performance for point lookups. These operators are responsible for transforming, processing, and managing data as it moves through the pipeline. Flink includes stateful operators that help manage state information for processing data, providing features such as exactly-once semantics and fault tolerance, making Flink a stateful computation engine.

When streaming data is ingested from a source, Flink first converts it into a Hudi-compatible in memory structure called HudiRecord. The BucketAssigner receives the HudiRecord and checks Flink’s state-backend to determine whether the incoming record is a new INSERT or an UPSERT/DELETE, subsequently tagging the HudiRecord with its location (partition path and file group ID). The BucketAssigner then passes the tagged HudiRecord to the StreamWriter.

The StreamWriter, an operator in Flink, buffers HudiRecords and flushes them to disk (the Hudi table), rendering the record unreadable and uncommitted. After flushing the data, the StreamWriter sends the write statuses to the coordinator. This component, located in Flink’s JobManager, is responsible for collecting metadata and committing the instant to Hudi’s timeline, managing the life cycles and state transitions of instants.

This diagram illustrates the basic pipeline of an UPSERT operation for a Copy-On-Write table. For other operations like INSERT and BULK_INSERT, the pipelines are different.

The streaming reader pipeline

In Flink, the stream reader is a component that consumes data from sources such as file systems and converts it into a record stream for processing by downstream Flink applications. In the context of Hudi, the stream reader monitors Hudi’s timeline, which consists of numerous instants. The timeline’s purpose is to offer a snapshot view of activities to a Hudi table. An instant represents a successfully committed transaction and includes metadata about the committed files, such as the transaction’s timestamp.

Periodically, the stream reader monitors the Hudi timeline to fetch new incremental instants and generate input splits, which divide the input data into smaller, more manageable chunks. You can set an explicit start offset for reading incremental instants by specifying a timestamp or using the ‘earliest’ keyword. Flink then distributes the input splits evenly among the stream readers. Following this, the stream reader scans the input splits and carries out record payload deserialization, converting HudiRecord instances into RowData.

Fail recovery for the stream reader 

When a streaming reading job experiences a crash or is manually canceled, Flink must keep track of the offset for each input split that has been consumed. Fortunately, Flink features an automatic job recovery mechanism that allows it to re-consume the input splits from the recorded offsets, effectively preventing the processing of duplicate records.

Prevent checkpoint timeouts

Hudi organizes records into file groups, where each file group consists of multiple file slices. In Flink, the instants monitor operator generates input splits based on these file slices, with each file slice corresponding to a new input split.

As file slices grow larger due to the ingestion of new records into Hudi, it may take Flink longer to consume these file slices since they contain a significant number of records. If scanning the file slice takes too long, it can hinder the progress of checkpointing, potentially leading to checkpoint timeouts. To ensure that the checkpoint process remains unblocked, it is advisable to consume input splits in mini-batches, with each batch containing approximately 2400 records.

Read the Hudi timeline and populate the downstream pipeline

Use Cases

Streaming ingestion with changelog

One of the most common use cases for Flink is streaming ingestion with changelogs. There are two ways to construct the ingestion pipeline, the first way is using the CDC-Connectors. These connectors monitor the source database and capture any changes made to the data, such as inserts, updates, or deletes.

Alternatively, you can send the changelogs into a message queue like Kafka, and then consumes the messages from Kafka using the CDC-format. For more information, refer to the CDC Ingestion documentation.

Streaming ingestion with changelogs

Incremental ETL pipeline

Flink is known as a stateful computation engine where it can handle change streams. The change stream can be represented as a Dynamic Table when using the Table API or SQL. A dynamic table continuously changes over time due to new records being added, updated, or deleted; however, the data is not persisted into storage. 

Hudi is a perfect storage companion for Flink’s Dynamic Table. Hudi has natural order sequence preservation, small file clustering, row-level updates/deletes and more. By sending the change streams in the Dynamic table to Hudi, you can build end-to-end streaming  incremental ETL pipelines. This allows you to build compute-efficient applications where you process only the updated data.

Incremental pipelines

Incremental materialized view

Unlike the incremental ETL pipeline, there is no raw, silver, or gold tables. Flink ingests and computes the data from an upstream data source and sends the final results to a Hudi table. After, you can query the table with its supported query engines.

Incremental materialized view

A Simple Demo

Let’s do a quickstart with the SqlClient. We will create a datagen source and do streaming ingestion into Hudi. In order to commit the dataset, the checkpoint needs to be enabled, here is an example configuration for a flink-conf.yaml:

Start the SqlClient, create the datagen and Hudi table. Then, submit the insert job:

This is an overview of how I created the Hudi-Flink integration. If you have any question about the integration or want more details about how to get started with Hudi and Flink, you can find me in the Hudi and Flink community channels.

Authors
No items found.

Subscribe to the Blog

Be the first to read new posts

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
We are hiring diverse, world-class talent — join us in building the future