In this post we will talk about a new deltastreamer source which reliably and efficiently processes new data files as they arrive in AWS S3. As of today, to ingest data from S3 into Hudi, users leverage DFS source whose path selector would identify the source files modified since the last checkpoint based on max modification time. The problem with this approach is that modification time precision is upto seconds in S3. It maybe possible that there were many files (beyond what the configurable source limit allows) modifed in that second and some files might be skipped. For more details, please refer to HUDI-1723. While the workaround is to ignore the source limit and keep reading, the problem motivated us to redesign so that users can reliably ingest from S3.
For use-cases where seconds granularity does not suffice, we have a new source in deltastreamer using log-based approach. The new S3 events source relies on change notification and incremental processing to ingest from S3. The architecture is as shown in the figure below.
In this approach, users need to enable S3 event notifications. There will be two types of deltastreamers as detailed below.
Users only need to specify the SQS queue url and region name to start the S3EventsSource (metadata source).
hoodie.deltastreamer.s3.source.queue.url=https://sqs.us-west-2.amazonaws.com/queue/url
hoodie.deltastreamer.s3.source.queue.region=us-west-2
Copy
There are a few other configurations for the metadata source which can be tuned to suit specific requirements:
To setup the pipeline, first enable S3 event notifications. Download the aws-java-sdk-sqs jar. Then start the S3EventsSource and S3EventsHoodieIncrSource using the HoodieDeltaStreamer utility as shown in sample commands below.
# To start S3EventsSource
spark-submit \
--jars "/home/hadoop/hudi-utilities-bundle_2.11-0.9.0.jar,/usr/lib/spark/external/lib/spark-avro.jar,/home/hadoop/aws-java-sdk-sqs-1.12.22.jar" \
--master yarn --deploy-mode client \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.11-0.9.0-SNAPSHOT.jar \
--table-type COPY_ON_WRITE --source-ordering-field eventTime \
--target-base-path s3://bucket_name/path/for/s3_meta_table \
--target-table s3_meta_table --continuous \
--min-sync-interval-seconds 10 \
--hoodie-conf hoodie.datasource.write.recordkey.field="s3.object.key,eventName" \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.datasource.write.partitionpath.field=s3.bucket.name --enable-hive-sync \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.hive_sync.database=default \
--hoodie-conf hoodie.datasource.hive_sync.table=s3_meta_table \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=bucket \
--source-class org.apache.hudi.utilities.sources.S3EventsSource \
--hoodie-conf hoodie.deltastreamer.source.queue.url=https://sqs.us-west-2.amazonaws.com/queue/url
--hoodie-conf hoodie.deltastreamer.s3.source.queue.region=us-west-2
# To start S3EventsHoodieIncrSource
spark-submit \
--jars "/home/hadoop/hudi-utilities-bundle_2.11-0.9.0.jar,/usr/lib/spark/external/lib/spark-avro.jar,/home/hadoop/aws-java-sdk-sqs-1.12.22.jar" \
--master yarn --deploy-mode client \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.11-0.9.0-SNAPSHOT.jar \
--table-type COPY_ON_WRITE \
--source-ordering-field eventTime --target-base-path s3://bucket_name/path/for/s3_hudi_table \
--target-table s3_hudi_table --continuous --min-sync-interval-seconds 10 \
--hoodie-conf hoodie.datasource.write.recordkey.field="pull_request_id" \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \
--hoodie-conf hoodie.datasource.write.partitionpath.field=s3.bucket.name --enable-hive-sync \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.hive_sync.database=default \
--hoodie-conf hoodie.datasource.hive_sync.table=s3_hudi_v6 \
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=bucket \
--source-class org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource \
--hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=s3://bucket_name/path/for/s3_meta_table \
--hoodie-conf hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
This post introduced a log-based approach to ingest data from S3 into Hudi tables reliably and efficiently. We are actively improving this along the following directions.
Please follow this JIRA to learn more about active development on this issue. We look forward to contributions from the community. Hope you enjoyed this post.
Put your Hudi on and keep streaming!
Be the first to read new posts