湖仓一体架构下流批协同实践:从Spark批量加速到Flink实时更新
侧边栏壁纸
  • 累计撰写 56 篇文章
  • 累计阅读 12.4万

湖仓一体架构下流批协同实践:从Spark批量加速到Flink实时更新

WD1016
2025-08-02 / 2,104 阅读 / 正在检测是否收录...

在湖仓一体架构下,无论是业务库数据同步还是宽表构建,Flink虽然支持"先全量,再增量"的处理模式,但当历史数据规模达到亿级时,全量数据导入阶段存在瓶颈,如果调高并发会抢占实时集群资源,低并发会导致处理时间过长。因此,我们采用Spark+Flink的协同计算架构,首先基于Spark的离线计算优势,在离线集群完成大规模历史数据的批量导入,再启动Flink任务进行增量更新。本文将分享该方案落地过程中遇到的问题和解决方案。

Spark加速全量数据入湖

配置写入方式为bulk insert,减少数据序列化以及合并操作,该数据写入方式会跳过数据去重,所以可以在hive中通过SparkSQL预处理历史数据。

set hoodie.spark.sql.insert.into.operation=bulk_insert;

如果历史数据在Hive中,表格式尽可能是orc或parquet格式,否则处理效率会显著下降,SparkSQL任务配置如下:

executor-memory 16g
num-executors 150
executor-cores 4
spark.default.parallelism 600

上述配置在数据量较大的情况下容易OOM,如果存量数据过多,需要分多个批次入湖,这样耗时更长,作业更不稳定。因为数据入湖时数据分桶、写入缓冲都属于内存密集型操作,所以适当调大spark.memory.fraction,调小spark.memory.storageFraction(减小存储内存,增加执行内存),有利于加速入湖。

spark.memory.fraction 0.8
spark.memory.storageFraction 0.3

同时,在资源不变的情况下,适当增大并行度,低并行度时每个Task处理的数据量较大,排序操作的内存压力剧增,当单个Task的数据量超过可用内存时,Spark会触发磁盘溢写,并行度增大一倍后,每个Task处理的数据量减半,排序完全在内存中完成,避免Spill和GC,效率显著提升。

spark.default.parallelism 1200

最终配置如下,实现2分钟内15亿全量数据入湖:

spark-sql --master yarn --queue ... --deploy-mode client --name ... \
--driver-memory 16g --driver-cores 8 --executor-memory 16g --num-executors 150 \
--conf spark.executor.heartbeatInterval=120000s \
--conf spark.network.timeout=130000s \
--conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.3 \
--executor-cores 4 --conf spark.default.parallelism=1200 \
--hiveconf hive.cli.print.header=true \
--jars hudi-spark3.2-bundle_2.12-0.14.1.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' -S

全量数据入湖后小文件合并

增大并行度会生成很多小文件,在全量数据导入完成后,Flink任务启动前,通过Spark提交Clustering任务进行一次小文件合并。

spark-submit --master yarn --deploy-mode cluster --queue ... \
--name ... \
--driver-memory 16g --executor-memory 16g --num-executors 150 \
--executor-cores 8 --conf spark.default.parallelism=1200 \
--class org.apache.hudi.utilities.HoodieClusteringJob hudi-utilities-bundle_2.12-0.14.1.jar \
--mode scheduleAndExecute \
--base-path hdfs://... \
--table-name ... \
--retry-last-failed-clustering-job \
--job-max-processing-time-ms 1800000 \
--hoodie-conf hoodie.clean.async=true \
--hoodie-conf hoodie.clean.automatic=true \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS \
--hoodie-conf hoodie.cleaner.fileversions.retained=1 \
--hoodie-conf hoodie.cleaner.parallelism=100 \
--hoodie-conf hoodie.clustering.async.enabled=true \
--hoodie-conf hoodie.clustering.async.max.commits=1 \
--hoodie-conf hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy \
--hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=20000 \
--hoodie-conf hoodie.metadata.enable=false \
--hoodie-conf hoodie.clustering.plan.strategy.sort.columns=...

使用KEEP_LATEST_FILE_VERSIONS清理策略,保留最后1个文件版本,避免使用成KEEP_LATEST_COMMITS,因为压缩前的文件仍关联原始提交时间,会导致合并前的小文件和合并后的大文件共存的问题。

hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS
hoodie.cleaner.fileversions.retained=1

清理策略原理可参考:https://developer.aliyun.com/article/1457637

Flink index bootstrap加速

第一次启动实时任务需要打开索引引导函数,同时把开始消费时间回调到离线数据的截止时间,配置如下:

'index.bootstrap.enabled'='true'

'scan.startup.mode'='timestamp'
'scan.startup.timestamp-millis' = ''

数据量较大时,可以提高并发,同时调整下面三个配置,以确保上下游算子的并行度保持一致。否则,会导致大量不必要的网络IO,进而引发 checkpoint 长时间阻塞,最终可能导致任务启动失败。

'write.tasks' = '100',
'write.bucket_assign.tasks' = '100',
'write.index_bootstrap.tasks'='100'

初始化索引完成后,执行savepoint,减小任务的并行度,关闭index bootstrap,从savepoint重新启动作业。

'write.tasks' = '12'
'write.bucket_assign.tasks' = '20'
'index.bootstrap.enabled'='false'

由于savepoint保存了index bootstrap算子信息,关闭index bootstrap后,会导致作业无法恢复,需要通过参数配置,允许跳过无法还原的保存点状态。作业为Per-Job模式时,启动任务配置--allowNonRestoredState参数即可。

Snipaste_2025-08-02_23-36-11.png

flink run -d -t yarn-per-job -Dyarn.application.queue=... -Dparallelism.default=6 \
-Dtaskmanager.numberOfTaskSlots=3 -Djobmanager.memory.process.size=2048m \
-Dexecution.checkpointing.snapshot-compression=true \
-Dexecution.checkpointing.local-backup.enabled=true \
-Dexecution.state-recovery.from-local=true \
-s hdfs://.../savepoint/savepoint-f36c2e-ada1c3edc7f2 --allowNonRestoredState \
-Dtaskmanager.memory.process.size=2048m -Dtaskmanager.memory.managed.fraction=0.3 \
-Dstate.backend.rocksdb.log.size=100m 
-Dyarn.application.name=... -c ... ....jar \
...
-stateBackendType 2 -externalizedCheckpointCleanup RETAIN_ON_CANCELLATION -enableIncremental true

作业是Flink SQL或者Application模式时,并不支持在任务启动命令上配置--allowNonRestoredState参数,设置下面参数后并不生效,作业仍然会启动失败。

Snipaste_2025-08-02_23-36-11.png

execution.savepoint.ignore-unclaimed-state=true

如果作业需要以Application模式运行,目前最佳的方式是先通过Per-Job模式恢复作业,再次执行savepoint,把作业从该savepoint以Application模式的方式拉起即可。

35

评论

博主关闭了所有页面的评论