March 20, 2025

Data Deduplication Strategies in an Open Lakehouse Architecture

Data Deduplication Strategies in an Open Lakehouse Architecture
TL;DR: Data duplication is a persistent challenge in data engineering pipelines, impacting storage costs, query performance, and data integrity. By the end of this blog, you’ll understand how duplication may occur across ingestion, storage merging, and table management in lakehouse architectures, and explore the native deduplication strategies offered by open table formats like Apache Hudi.

In data engineering workflows, where data is ingested, processed, and stored across multiple systems, data duplication is a persistent and costly issue. Duplication can occur at various stages of the pipeline, leading to inconsistencies, bloated storage costs, and inaccurate analytics.

Here are some common scenarios:

  1. Streaming Ingestion Pipelines: Real-time data pipelines often rely on systems such as Apache Kafka for event streaming. Duplicate events can arise due to retries from producers, network latencies, or out-of-order event processing.
  2. Batch Data Processing: ETL jobs, which load data periodically, might reprocess the same data, especially if a failure occurs during ingestion, causing duplicate records to be written to storage. Additionally, there could be duplicates within the same batch of source data.
  3. Data Integration from Multiple Sources: Integrating data from multiple upstream systems (e.g., databases, CRMs) can result in duplicates if those systems contain overlapping or redundant records.
  4. Concurrent Writes: In environments where multiple pipelines write simultaneously to the same storage system, race conditions or lack of proper concurrency control can lead to duplicate entries.

It is important to note that duplication isn’t just a technical problem. It has a direct impact on business operations and can impact critical insights and lead to unnecessary costs.

Data Deduplication

Data deduplication is the process of identifying and eliminating duplicate copies of data in any kind of storage system. It is critical for maintaining data quality, improving storage efficiency, optimizing costs, and ensuring accurate analytics (data integrity). Without effective deduplication strategies, organizations can face challenges such as:

  • Data Integrity Issues: Duplicate records can lead to conflicting values and inaccurate analytics, impacting the decision-making process. For example, a financial transactions table with duplicate records could result in the same payment being processed multiple times, leading to discrepancies in account balances.
  • Increased Storage Costs: Storing duplicate records unnecessarily inflates storage in data warehouses and lakes, increasing costs. For example, in large-scale IoT applications, duplicate sensor logs can double or triple storage requirements.
  • Performance Degradation: Duplicate records can increase the volume of data to process, slowing down query and analytics performance. For example, when joining a "users" table with a "transactions" table, if duplicates exist in either table, the resulting join might include the same transaction multiple times for a single user, unnecessarily increasing query processing time.
  • Complexity in Data Governance: Maintaining clean and consistent data across a storage system becomes harder when duplicates exist, complicating compliance efforts and quality of data. This can be very challenging to tackle at scale, from a user perspective.

Deduplication in a Lakehouse

Now that we've explored the generic scenarios where duplication can occur and its implications, let’s turn our attention to lakehouse architectures, which utilize open table formats such as Apache Hudi™, Apache Iceberg, and Delta Lake on top of file formats such as Apache Parquet stored on data lake storage such as Amazon S3. First, let’s go over some of the common points of duplication in a lakehouse. Then we will understand how open table formats such as Apache Hudi deal with the challenges.

Within the Input Batch

Duplicate entries within a single input batch are a common challenge in any ingestion pipeline, often arising due to overlapping data from source systems, ingestion retries caused by transient failures, or the absence of robust primary keys or unique identifiers to distinguish records. For example, when ingesting user interaction data from a mobile app, retries initiated by the system to account for network failures can lead to multiple records being generated for the same event, creating duplicates in the dataset. This issue not only impacts storage efficiency but also increases the computational burden during downstream processing. To mitigate these challenges, it is essential to implement deduplication strategies at the ingestion stage.

When Merging Records into Storage

Even after deduplicating within the input batch, duplication issues often arise during the process of merging these records into lakehouse formats. During an update or delete operation, open table formats merge the resulting changes with the existing record in storage. Since file formats in data lakes are immutable by nature, they are usually rewritten (Copy-on-Write strategy) when these changes are merged. The merge algorithm controls what happens when the updates are applied. 

For instance, the logic can reject the updates outright, leading to stale data persisting in storage, while others might apply blind updates, indiscriminately overwriting existing records without evaluating conflicts. Another common issue is with partial updates, where only specific fields of a record are modified, leaving other parts of the record unchanged and potentially creating inconsistencies. Additionally, improper ordering during merges can result in multiple versions of the same record coexisting, especially when systems fail to apply consistent business logic such as prioritizing the latest version based on timestamps or version numbers. A common scenario might involve an e-commerce system where product price updates are merged into a table format in the storage; if timestamps are not considered, duplicate records with conflicting prices may persist, causing inaccurate reporting. 

When Running Table Management Services (e.g., Compaction)

Table management services, such as compaction, clustering, and partition optimization, are essential for improving query performance and managing storage costs in lakehouse systems. However, if these services are not aligned with ingestion logic, they can inadvertently introduce duplicates. Compaction, for example in Hudi Merge-on-Read (MoR) tables, reconciles changes recorded in log files with the base files (Parquet), but if the compaction process applies different merge rules than those used during ingestion, duplicate records may persist in the rewritten files. For instance, if ingestion logic uses timestamps to prioritize the latest version of a record but compaction does not honor the same ordering, duplicate records with conflicting values may result. Addressing this challenge requires ensuring that ingestion pipelines and table management services operate with consistent merge logic.

During Reads for Merge-on-Read Tables

Merge-on-Read (MoR) tables typically store all the incoming updates into an additional data file, separate from the base data file. Therefore, they have additional operational overheads that require merging both these files for row-level updates. For instance, Apache Hudi stores all the incoming data in log files, which contain incremental changes (related to updates), and base files, which hold consolidated snapshots of data. If a query is run to get a consistent and up-to-date view of data from the table, these two files must be merged. The challenge arises when merging records from these files, as duplication can occur if the merging process fails to reconcile updates properly. For instance, if a record exists in both a base file (older snapshot) and a log file (recent update), and correct merging logic is not applied, the same record may appear twice in the query output. 

Deduplication with Apache Hudi

In the previous section, we learned about the four areas where duplication can happen in a lakehouse architecture. Next, we will deep dive into how Apache Hudi addresses the duplication challenges we outlined out-of-the-box, and how it enables users with the flexibility to tailor deduplication strategies to their specific requirements.

  • Within the input batch:

We discussed how duplicate keys within the same input batch are one of the most common sources of duplication. It could happen due to retries in upstream systems or overlapping datasets may lead to records with the same key (unique identifier) appearing multiple times within a batch. Apache Hudi has a concept of primary key which is a combination of the record key and an optional partition path. This primary key is directly linked to a file id (uuid) that uniquely identifies the file groups in Hudi. Since there is a possibility of having the same primary key within the batch, Hudi ensures deduplication using Record Merger, based on the selected merge mode.  

When two or more records in the same batch have the same primary key, the record mergers are used to decide to merge these records. For instance, EVENT_TIME_ORDERING mode allows users to define a precombine field (such as a timestamp or version number), which determines which record to retain when duplicates are encountered. By default, Hudi selects the record with the largest value for the precombine field. This ensures that only the most relevant version of a record is retained at the ingestion stage. Hudi also enables extending this implementation to specific business logic (how you want to dedupe). This native capability reduces duplication at the very first stage of the data pipeline, ensuring that only unique, deduplicated records enter the data lake. 

  • Merging records to storage

When merging records into storage, Apache Hudi employs advanced merge algorithms, i.e. record mergers to ensure data consistency and eliminate duplication. Since lakehouses use immutable file formats such as Parquet to store the records, any update or delete operation requires creating new versions of files (Copy-on-Write) that incorporate the latest changes.

During an update, partial update, or delete operation, Hudi merges the incoming changes with existing records in the table. The record mergers determine how these updates are applied, ensuring that the most recent or relevant version of a record is retained in the storage layer. Hudi supports three types of merge modes. Here’s a quick summary:

  1. COMMIT_TIME_ORDERING: Prioritizes records based on the order of their commits, ensuring changes from the latest commit overwrite earlier ones.
  2. EVENT_TIME_ORDERING: Prioritizes records based on event time, ensuring that the most recent event (e.g., based on a timestamp field) determines the final state of the record.
  3. CUSTOM: Allows users to define custom merge logic, providing flexibility to handle unique use cases or complex business rules.

These merge modes ensure that Hudi can handle complex scenarios such as late-arriving data, partial updates, and reprocessing without introducing duplicates. For example, in an e-commerce system where product prices are updated, Hudi can ensure that only the latest price is retained, even if updates arrive out of order. Hudi’s Record Merger API is at the core of this functionality, allowing users to define how conflicting records should be resolved during the merge process. This API provides higher flexibility, enabling organizations to implement specific merge logic tailored to their requirements.

  • When Running Table Management Services (e.g., Compaction)

In Merge-on-Read (MoR) tables, table management services such as compaction are critical for reconciling log files (which store incremental changes) with base files (actual records). If compaction logic is inconsistent with the ingestion logic, duplicates can persist in the rewritten files. Apache Hudi ensures that the compaction process is aligned with the ingestion logic by respecting the merge modes (COMMIT_TIME_ORDERING, EVENT_TIME_ORDERING, CUSTOM) defined during ingestion, thereby maintaining consistency and avoiding duplication in the rewritten files. For example, consider a system that processes IoT sensor data where records are updated frequently. If the ingestion pipeline uses event-time ordering to prioritize the most recent updates, Hudi ensures that the same event-time-based logic is applied during compaction.

By respecting the merge mode defined during ingestion, Hudi guarantees that the deduplication and conflict resolution rules applied earlier are consistently preserved during table management operations. This ensures that no new duplicates are introduced during compaction and that the reconciled base files reflect the latest, most accurate state of the data.

  • During Reads for Merge-on-Read (MoR) tables

Querying MoR tables brings additional complexity. When a snapshot query is executed on an MoR table, Hudi merges records from log files and base files at query time. The merge process is governed by the merge modes (COMMIT_TIME_ORDERING, EVENT_TIME_ORDERING, CUSTOM). 

Another important design in Hudi that plays a central role in deduplication during reads is the record key (primary key). It uniquely identifies each record, enabling Hudi to compile changes from log and base files accurately. For example, if multiple records exist, the record key ensures that only the most recent version (based on the merge mode) is included in the query result.

Example Use Case

To illustrate the use case for deduplication, we’ll use the example of digital marketing, where accurate click data is crucial for evaluating the effectiveness of ad campaigns. Advertisers frequently track user interactions with ads through click events, which typically include details such as the user ID, ad ID, timestamp, and click ID. However, due to factors such as network retries, duplicate submissions, or system errors, it's common to encounter duplicate click events for the same user interacting with the same ad.

For instance, a user might click on an ad multiple times within a short time frame, or the same click event might be recorded twice due to a system failure or retry mechanism. If these duplicates are not removed, your reports may overstate the actual number of unique clicks, leading to inaccurate insights and potentially misleading campaign performance analysis.

To explain it with a code example, we have provided sample code with click data columns. The user id and ad id will together make a record key which should be unique in our final dataset.

# Simulate loading click data into a DataFrame , Note that couple of records exist for user_1 and ad_101
click_data = [ ("user_1", "ad_101", "2025-02-01T10:00:00", "click_001", "device_001", "web", "campaign_1", "US", 1.5), ("user_1", "ad_101", "2025-02-01T10:00:01", "click_001", "device_001", "web", "campaign_1", "US", 1.5), ("user_2", "ad_102", "2025-02-01T10:05:00", "click_002", "device_002", "mobile", "campaign_2", "IN", 0.75), ("user_3", "ad_103", "2025-02-01T10:10:00", "click_003", "device_003", "web", "campaign_3", "UK", 2.0) ] 

# Define schema
columns = ["user_id", "ad_id", "timestamp", "click_id", "device_id", "platform", "campaign_id", "geo_location", "click_value"] 
# Create a DataFrame from the raw click data 

click_data_df = spark.createDataFrame(click_data, columns)

hudi_options = {
    "hoodie.upsert.shuffle.parallelism": "2",   # Number of partitions for parallelism during upsert
    "hoodie.insert.shuffle.parallelism": "2",   # Number of partitions for inserts
    "hoodie.table.name": "clicks_table",        # Table name for Hudi
    "hoodie.datasource.write.recordkey.field": "user_id,ad_id",  # Composite primary key (RecordKey)
    "hoodie.datasource.write.precombine.field": "timestamp",  # Field to deduplicate (PreCombineKey)
    "hoodie.record.merge.mode" : "EVENT_TIME_ORDERING"
}

click_data_df.write.format("hudi").options(**hudi_options).mode("append").save(hudi_table_path)


spark.read.format("hudi").load(hudi_table_path).show()

+-------------------+--------------------+--------------------+----------------------+--------------------+-------+------+-------------------+---------+----------+--------+-----------+------------+-----------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|user_id| ad_id|          timestamp| click_id| device_id|platform|campaign_id|geo_location|click_value|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------+------+-------------------+---------+----------+--------+-----------+------------+-----------+
|  20250202112241380|20250202112241380...|user_id:user_2,ad...|                      |a320f380-2864-408...| user_2|ad_102|2025-02-01T10:05:00|click_002|device_002|  mobile| campaign_2|          IN|       0.75|
|  20250202112241380|20250202112241380...|user_id:user_1,ad...|                      |a320f380-2864-408...| user_1|ad_101|2025-02-01T10:00:01|click_001|device_001|     web| campaign_1|          US|        1.5|
|  20250202112241380|20250202112241380...|user_id:user_3,ad...|                      |a320f380-2864-408...| user_3|ad_103|2025-02-01T10:10:00|click_003|device_003|     web| campaign_3|          UK|        2.0|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------+------+-------------------+---------+----------+--------+-----------+------------+-----------+


The code above demonstrates how deduplication is performed in the incoming batch before writing the data to the Hudi table. Specifically, if multiple click records are generated by user_1 for ad_101 within a span of 1 second, they will be deduplicated based on the timestamp, as they arrive in the same batch. This ensures that only the most recent record is retained before the data is stored in the Hudi table.

click_data = [
    ("user_1", "ad_101", "2025-02-01T10:10:00", "click_001", "device_001", "web", "campaign_1", "US", 1.5),
    ("user_2", "ad_102", "2025-02-01T10:00:00", "click_002", "device_002", "mobile", "campaign_2", "IN", 0.50)
]

click_data_df = spark.createDataFrame(click_data, columns)

# Write the data to Hudi table
click_data_df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save(hudi_table_path)

# Read and show the data from Hudi table
spark.read.format("hudi").load(hudi_table_path).show()
+-------------------+--------------------+--------------------+----------------------+--------------------+-------+------+-------------------+---------+----------+--------+-----------+------------+-----------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|user_id| ad_id|          timestamp| click_id| device_id|platform|campaign_id|geo_location|click_value|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------+------+-------------------+---------+----------+--------+-----------+------------+-----------+
|  20250202112241380|20250202112241380...|user_id:user_2,ad...|                      |a320f380-2864-408...| user_2|ad_102|2025-02-01T10:05:00|click_002|device_002|  mobile| campaign_2|          IN|       0.75|
|  20250202112241380|20250202112241380...|user_id:user_1,ad...|                      |a320f380-2864-408...| user_1|ad_101|2025-02-01T10:00:01|click_001|device_001|     web| campaign_1|          US|        1.5|
|  20250202112241380|20250202112241380...|user_id:user_3,ad...|                      |a320f380-2864-408...| user_3|ad_103|2025-02-01T10:10:00|click_003|device_003|     web| campaign_3|          UK|        2.0|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------+------+-------------------+---------+----------+--------+-----------+------------+-----------+

In this example, a second batch of data is ingested into the Hudi table, which includes records for user_1 and ad_101, along with other data. Hudi automatically deduplicates records based on the RecordKey (composed of user_id and ad_id). For user_1 and ad_101, two records with different timestamps arrive. The record merger uses the EVENT_TIME_ORDERING strategy, leveraging the timestamp as the PreCombineKey. This ensures that only the most recent record, based on event time, is retained for each RecordKey, effectively managing late-arriving data and maintaining consistency. As a result, the record with the latest timestamp (2025-02-01T10:00:01) is kept, while the older record is discarded.

Additionally, a record for user_2 and ad_102 arrives with an earlier timestamp than the existing one. Since the new record’s timestamp is older (lower value in the PreCombineKey), it’s not considered a duplicate, and the existing record remains intact. This process ensures proper deduplication, preserving only the most recent and relevant data, while preventing the overwriting of valid records in the case of late-arriving data.

Deduplication in Apache Iceberg and Delta Lake

Deduplication strategies in Apache Iceberg and Delta Lake are fundamentally different. Unlike Hudi, which provides built-in mechanisms with features such as record merger APIs, and ingestion-aware table services, Delta and Iceberg rely on explicit merge operations and require users to handle deduplication outside of the table format.

Delta Lake depends on the standard MERGE INTO operation to deduplicate incoming data against the existing dataset, but it does not automatically deduplicate records within the source dataset itself - users must handle that manually. For example:

MERGE INTO customer_transactions AS target
USING newTransactions AS source
ON target.transaction_id = source.transaction_id
WHEN NOT MATCHED THEN INSERT *

Here, the MERGE statement ensures that transactions with a matching transaction_id in the target table are not duplicated, but it does not deduplicate transactions within the newTransactions dataset itself. If newTransactions contains duplicate transaction records for the same transaction_id, all duplicates will be inserted into the table unless explicitly handled before the MERGE operation. This means users must manually deduplicate newTransactions before executing MERGE, as Delta Lake does not provide built-in logic to eliminate duplicates within the source dataset. Additionally, there is no configurable logic in Delta Lake to control how merge conflicts are resolved, unlike formats such as Hudi, which provides different merge modes to define update behavior.

Apache Iceberg follows a similar approach to Delta Lake in handling deduplication, relying on MERGE statements to reconcile new data with existing records. There are no native deduplication mechanisms at ingestion time, nor is there configurable merge logic, and users must deduplicate records before writing them into Iceberg tables.

Conclusion

Deduplication is a critical aspect for lakehouse architectures that ensures data consistency, reduces storage costs, and improves query performance. Apache Hudi, Delta Lake, and Apache Iceberg have different approaches to handling duplicate records.

Delta Lake and Iceberg rely on explicit MERGE operations, requiring users to deduplicate data externally before ingestion. They lack configurable merge strategies, and once data lands in storage, there is no built-in enforcement of primary keys or query-time deduplication. Ultimately, deduplication is entirely left up to the user which could be challenging in high-volume streaming and late-arriving data scenarios.

Apache Hudi provides a built-in framework for deduplication, eliminating duplicates at multiple stages of the data lifecycle. At ingestion and during merges, Hudi offers configurable strategies allowing users to control how updates and conflicts are handled. Additionally, Hudi’s table management services, such as compaction, align with the ingestion-time merge logic, ensuring consistency across storage optimizations without reintroducing duplicates.

Authors
No items found.

Subscribe to the Blog

Be the first to read new posts

We are hiring diverse, world-class talent — join us in building the future