Hudi provides a set of data-plane components to build and operate optimized, self-managed data lakes. More importantly, Hudi provides the primitives to power an end-end streaming architecture, by optimizing fast upserts and change streams, resulting in incremental processing efficiencies of a few minutes.
A key part of the incremental data processing stack is the ability to ingest data from real-time streaming sources such as Kafka. To achieve this goal today, we can use Deltastreamer, which runs within the Spark Engine to pull records from Kafka, and ingest data to Hudi tables. To provide users with another option, as of Hudi v0.10.0, we are excited to announce the availability of a Hudi Sink Connector for Kafka. This offers greater flexibility to current users of Kafka Connect (with S3, HDFS sinks etc.) to readily ingest their Kafka data into Hudi data lake, leveraging the power of Hudi's platform.
Kafka Connect is a free, open-source component of Apache Kafka. It standardizes the integration of Kafka with data systems, providing both source connectors that write data from external systems to Kafka and sink connectors that write data from Kafka into external systems. The connect platform has the ability to scale up by adding more workers across a cluster and re-balancing workload across tasks that are responsible for processing one or more Kafka partitions. It also provides APIs for easy management and abstracts out low-level functionalities such as offset management.
The Kafka Connect Sink for Hudi has the following key properties.
Prior to proposing the design, we need to understand the design requirements. The Kafka Sink connectors run in a distributed environment with multiple tasks parallelly processing data across Kafka partitions. We need to provide a concurrency model to ensure that each task can write to the Hudi table concurrently. While Hudi provides support for multiple writers using optimistic concurrency control (OCC), it requires provisioning distributed locking service and potentially limits the effective concurrency and write performance. In this context, since we can control each writer in the Sink connector, we can centrally coordinate the write transactions (start and end of a transaction) and ensure each task writes records to non-overlapping files to avoid any conflict without the overhead of locking.
In addition, Hudi has multiple data and file management services and optimizations, such as compaction, clustering, cleaning, etc. that have to be coordinated with the write transactions. In this case, Hudi supports the MVCC model, where such services can run concurrently with the main writer/ ingestion. However, the services have to be scheduled and planned by a single writer to avoid conflicts and race conditions. Hence, in addition to the writers being coordinated, we need to schedule and plan such services centrally as well.
Building upon these requirements, we designed the Hudi Sink connector as shown in the figure below. We build on the principles of a two-phase commit protocol, with a Coordinator and one or more Participants. The Coordinator is always executed on the Kafka connect task that handles partition-0, avoiding the need to implement leader election. The coordinator is responsible for both scheduling the transactional writes across the participants and scheduling the data and table services.
Each Kafka partition is handled by an instance of the Participant. A dedicated control topic on the same Kafka cluster is used for the Coordinator-Participant communications. When the coordinator starts a new transaction, all participants start reading records from their assigned Kafka partition and append records to non-overlapping file groups in the Hudi table. The Kafka partition index is embedded in the file ID so that two Participants do not fail while writing to the same Hudi partition. The records are written in the Hudi Merge-on-Read (MOR) table type. As opposed to Copy-on-Write (COW) type, MOR provides lower write latencies and smaller write amplification, which suits the streaming use case for Kafka ingestion. After the coordinator ends the transaction, all participants stop writing records and send back write status. If successful write statuses are received from all participants, the coordinator commits all the written records.
In the case of worker or task failures, the Kafka connect platform re-assigns the Kafka partitions across existing tasks. To keep the protocol design simple during such failures of either the coordinator or the participants, the entire transaction is not committed, and the records written during the transaction are deleted later by the Hudi cleaner service. Moreover, the Kafka offsets for each partition are committed in the Hudi commit file. This allows the system to recover from the offset of the last committed record of each Kafka partition by reading the latest Hudi commit file.
If you are interested in further details, please refer to the RFC.
The following figure shows one instance of an end-to-end deployment of the Hudi Sink Connector. The different applications or Kafka source connectors bring in external data into Kafka, and can optionally register the schema of the data to a schema registry. The Hudi sink connector reads the data and the latest schema from the registry to write data from each Kafka topic into a Hudi table. If hive integration is configured, the Hudi sink continuously syncs the Hudi metadata information with the Hive Metastore. The Hudi tables can be queried via different query engines, such as Presto, Trino etc.
The detailed steps to deploy the end-to-end system shown above can be found here. The steps specific to configuring the Hudi sink are listed below:
Async clustering scheduling is disabled by default, and you can enable it by setting `hoodie.clustering.async.enabled` to `true`.
The Sink only schedules the compaction and clustering if necessary and does not execute them for performance. You need to execute the scheduled compaction and clustering using separate Spark jobs or Hudi CLI.
Then you can run async compaction job with `HoodieCompactor` and `spark-submit` by:
Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled compaction is going to be executed.
The async clustering job can be executed with `HoodieClusteringJob` and `spark-submit` by:
Be the first to read new posts