首页
Search
1
Linux免密登陆-ubuntu
958 阅读
2
Redis集群部署方案
942 阅读
3
Hadoop各版本汇总
901 阅读
4
数据同步工具DataX、Sqoop和Canal
883 阅读
5
Spark学习笔记
841 阅读
大数据
Flink
后端
Java
笔记
运维
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
WD1016
累计撰写
56
篇文章
累计阅读
12.4万
次
首页
栏目
大数据
Flink
后端
Java
笔记
运维
页面
搜索到
29
篇与
WD1016
的结果
返回首页
2025-08-21
阿里云大数据计算服务:性能探索与经验记录
云平台的性能与功能通常是用户最核心的关注点——性能直接影响计算任务的执行效率,而功能则决定了数据处理链路的流畅性与灵活性。近期在使用云平台大数据计算服务时,重点测试了其数据处理、SQL查询及并发执行等性能表现,同时评估了任务调度、权限管理和数据导入导出等关键功能的实用性。任务优化 MaxCompute常用参数set odps.sql.allow.fullscan=true; 不指定分局查询 set odps.sql.groupby.skewindata=true; 解决agg数据倾斜问题 set odps.service.mode=off; 如果打开,超过十分钟时,会被kill,增加执行时长 set odps.sql.hive.compatible=true; 开启Hive SQL兼容模式,降低任务迁移成本 set odps.task.wlm.quota=os_datagouptest3; 指定计算资源Quota 自动MapJoin的阈值,用于决定是否将小表数据广播set odps.optimizer.auto.mapjoin.threshold=4096000000; set odps.optimizer.enable.online.conditional.mapjoin=true; set odps.sql.split.dop={"xxx.table1":120, "xxx.table2": 10}; 读取并行度设置 set odps.optimizer.hbo.enable.new.signature=true; 历史执行信息进行查询优化的增强功能压力测试 MaxCompute并发跑任务,默认FIFO(先进先出),可配置成FAIR,资源抢占和分配情况如下FIFO 确保了公平的顺序,但可能导致头部阻塞,即早期任务(任务 1)垄断资源,延迟后续任务(任务 2 和 3),直到早期任务让步,从任务 2 和 3 的初始低值中显而易见。查询加速 MaxQA引擎,默认FAIR(公平调度),不可改:MaxQA引擎概述链接 任务并发情况如下FAIR调度支持抢占,如果一个任务超过其份额,系统可能暂停或回收其资源,重新分配给欠份额的任务,可见三个任务在各自的峰值(最高点)时期趋于均衡。DataWorks数据开发 定时任务可以是单个节点、也可以是工作流,创建时选择任务类型数据集成 官网链接Serverless资源组 官网链接Spark作业创建任务运行信息MaxCompute功能特性 开放存储(Storage API):数据不可直接访问,可通过Storage API访问(按量付费):官网链接Quota资源管理:分层管理、资源隔离、弹性伸缩:官网链接物化视图:根据历史作业和性能分析自动创建物化视图(AutoMV):官网链接数据安全:支持按项目、表(列级别、行级别)、资源、函数或实例维度的访问控制:官网链接近实时批流一体数仓:支持基于Flink等流计算的分钟级数据写入与秒级查询加速(MCQA2.0):官网链接任务调优:内置作业诊断与优化建议,包括了SQL调优、数据倾斜调优等:官网链接成本优化推荐:保障作业按时完成的前提下,生成更优的资源配置方案,降低成本:官网链接账单明细:支持历史作业分析、资源分账,可以详细统计任务的执行时长、资源使用、所属人等:官网链接
2025年08月21日
1 阅读
0 点赞
2025-08-02
湖仓一体架构下流批协同实践:从Spark批量加速到Flink实时更新
在湖仓一体架构下,无论是业务库数据同步还是宽表构建,Flink虽然支持"先全量,再增量"的处理模式,但当历史数据规模达到亿级时,全量数据导入阶段存在瓶颈,如果调高并发会抢占实时集群资源,低并发会导致处理时间过长。因此,我们采用Spark+Flink的协同计算架构,首先基于Spark的离线计算优势,在离线集群完成大规模历史数据的批量导入,再启动Flink任务进行增量更新。本文将分享该方案落地过程中遇到的问题和解决方案。Spark加速全量数据入湖 配置写入方式为bulk insert,减少数据序列化以及合并操作,该数据写入方式会跳过数据去重,所以可以在hive中通过SparkSQL预处理历史数据。set hoodie.spark.sql.insert.into.operation=bulk_insert;如果历史数据在Hive中,表格式尽可能是orc或parquet格式,否则处理效率会显著下降,SparkSQL任务配置如下:executor-memory 16g num-executors 150 executor-cores 4 spark.default.parallelism 600上述配置在数据量较大的情况下容易OOM,如果存量数据过多,需要分多个批次入湖,这样耗时更长,作业更不稳定。因为数据入湖时数据分桶、写入缓冲都属于内存密集型操作,所以适当调大spark.memory.fraction,调小spark.memory.storageFraction(减小存储内存,增加执行内存),有利于加速入湖。spark.memory.fraction 0.8 spark.memory.storageFraction 0.3同时,在资源不变的情况下,适当增大并行度,低并行度时每个Task处理的数据量较大,排序操作的内存压力剧增,当单个Task的数据量超过可用内存时,Spark会触发磁盘溢写,并行度增大一倍后,每个Task处理的数据量减半,排序完全在内存中完成,避免Spill和GC,效率显著提升。spark.default.parallelism 1200最终配置如下,实现2分钟内15亿全量数据入湖:spark-sql --master yarn --queue ... --deploy-mode client --name ... \ --driver-memory 16g --driver-cores 8 --executor-memory 16g --num-executors 150 \ --conf spark.executor.heartbeatInterval=120000s \ --conf spark.network.timeout=130000s \ --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.3 \ --executor-cores 4 --conf spark.default.parallelism=1200 \ --hiveconf hive.cli.print.header=true \ --jars hudi-spark3.2-bundle_2.12-0.14.1.jar \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' -S全量数据入湖后小文件合并 增大并行度会生成很多小文件,在全量数据导入完成后,Flink任务启动前,通过Spark提交Clustering任务进行一次小文件合并。spark-submit --master yarn --deploy-mode cluster --queue ... \ --name ... \ --driver-memory 16g --executor-memory 16g --num-executors 150 \ --executor-cores 8 --conf spark.default.parallelism=1200 \ --class org.apache.hudi.utilities.HoodieClusteringJob hudi-utilities-bundle_2.12-0.14.1.jar \ --mode scheduleAndExecute \ --base-path hdfs://... \ --table-name ... \ --retry-last-failed-clustering-job \ --job-max-processing-time-ms 1800000 \ --hoodie-conf hoodie.clean.async=true \ --hoodie-conf hoodie.clean.automatic=true \ --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS \ --hoodie-conf hoodie.cleaner.fileversions.retained=1 \ --hoodie-conf hoodie.cleaner.parallelism=100 \ --hoodie-conf hoodie.clustering.async.enabled=true \ --hoodie-conf hoodie.clustering.async.max.commits=1 \ --hoodie-conf hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy \ --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=20000 \ --hoodie-conf hoodie.metadata.enable=false \ --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=...使用KEEP_LATEST_FILE_VERSIONS清理策略,保留最后1个文件版本,避免使用成KEEP_LATEST_COMMITS,因为压缩前的文件仍关联原始提交时间,会导致合并前的小文件和合并后的大文件共存的问题。hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=1清理策略原理可参考:https://developer.aliyun.com/article/1457637 Flink index bootstrap加速 第一次启动实时任务需要打开索引引导函数,同时把开始消费时间回调到离线数据的截止时间,配置如下:'index.bootstrap.enabled'='true' 'scan.startup.mode'='timestamp' 'scan.startup.timestamp-millis' = ''数据量较大时,可以提高并发,同时调整下面三个配置,以确保上下游算子的并行度保持一致。否则,会导致大量不必要的网络IO,进而引发 checkpoint 长时间阻塞,最终可能导致任务启动失败。'write.tasks' = '100', 'write.bucket_assign.tasks' = '100', 'write.index_bootstrap.tasks'='100'初始化索引完成后,执行savepoint,减小任务的并行度,关闭index bootstrap,从savepoint重新启动作业。'write.tasks' = '12' 'write.bucket_assign.tasks' = '20' 'index.bootstrap.enabled'='false'由于savepoint保存了index bootstrap算子信息,关闭index bootstrap后,会导致作业无法恢复,需要通过参数配置,允许跳过无法还原的保存点状态。作业为Per-Job模式时,启动任务配置--allowNonRestoredState参数即可。flink run -d -t yarn-per-job -Dyarn.application.queue=... -Dparallelism.default=6 \ -Dtaskmanager.numberOfTaskSlots=3 -Djobmanager.memory.process.size=2048m \ -Dexecution.checkpointing.snapshot-compression=true \ -Dexecution.checkpointing.local-backup.enabled=true \ -Dexecution.state-recovery.from-local=true \ -s hdfs://.../savepoint/savepoint-f36c2e-ada1c3edc7f2 --allowNonRestoredState \ -Dtaskmanager.memory.process.size=2048m -Dtaskmanager.memory.managed.fraction=0.3 \ -Dstate.backend.rocksdb.log.size=100m -Dyarn.application.name=... -c ... ....jar \ ... -stateBackendType 2 -externalizedCheckpointCleanup RETAIN_ON_CANCELLATION -enableIncremental true作业是Flink SQL或者Application模式时,并不支持在任务启动命令上配置--allowNonRestoredState参数,设置下面参数后并不生效,作业仍然会启动失败。execution.savepoint.ignore-unclaimed-state=true如果作业需要以Application模式运行,目前最佳的方式是先通过Per-Job模式恢复作业,再次执行savepoint,把作业从该savepoint以Application模式的方式拉起即可。
2025年08月02日
2,104 阅读
35 点赞
2023-12-30
Flink + Hudi 流批一体作业稳定性优化
目前,任务分为两种类型:当业务逻辑较为简单时,使用 Flink SQL 进行处理,例如将原始日志或业务库同步至 Hudi 的 ODS 层、进行多表关联和聚合等操作;当业务逻辑比较复杂或需要特殊处理时,例如部分数据需要通过 API 获取,则使用 Flink DataStream API 消费 Hudi 数据,并经过一系列处理后,写入 Hudi 的 DWD、DWS 或 ADS 层。针对内存方面的问题,对Flink写入Hudi任务多个Task的Heap使用率进行了监控。注意到使用率存在较大的波动,有时甚至可能超出分配的阈值,这种情况可能导致任务被强制终止。Yarn Application报错信息: org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested. Association with remote system [akka.tcp://flink] has failed, address is now Association with remote system [akka.tcp://flink] has failed,address is now gated for [50] ms NodeManager Local logs报错信息: org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=13427,containerID=container_e632_1670877056012_20807318_01_000001] is running beyond physical memory limits. Current usage: 2.0 GB of 2 GB physical memory used; 4.5 GB of 4.2 GB virtual memory used. Killing container.可以通过在yarn-site.xml中将yarn.nodemanager.vmem-check-enabled设为false,或者为jobmanager和taskmanager分配足够的内存来提高任务的稳定性。<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> 注:是否启用一个线程检查每个任务正在使用的虚拟内存量,如果任务超出了分配值,则直接将其kill,默认是true另一个问题是,Managed Memory的使用率较低,而Task Heap的使用率却相当高。在增加TaskManager的内存分配(通过-ytm参数)时,会根据taskmanager.memory.managed.fraction的设置将新增的内存分配给Managed Memory。这可能会造成资源浪费。适当减小该配置的值有助于提高整体内存利用率。taskmanager.memory.managed.fraction: 0.2使用Hudi写入可能导致节点过载,从而导致上游算子背压。在执行SQL时,可以考虑提高写入并行度。如果是DateStream API Hudi到Hudi的数据传输,由于数据分布可能不均,建议在读取后直接使用rebalance()算子进行数据均衡处理。对于当前分区文件数量问题,写入Hudi任务的并行度会直接影响文件数量,随着并行度的增加,文件数量也会相应增加。此外,对于COPY_ON_WRITE模式,以下三个参数会直接影响文件数量。任务在运行一段时间后,当合并操作和触发操作达到平衡时,文件数量会在很小范围内波动。此外,可以通过减小clustering.delta_commits来减少当前分区文件数量。clustering.delta_commits 触发合并文件所需的提交次数 clean.retain_commits 保留的提交数,不进行清理 cleaner.policy(KEEP_LATEST_COMMITS)相对于在线clustering而言,离线clustering更加稳定并且可以实现资源隔离。经过大量任务长时间运行后来看,在使用离线clustering时需要注意以下问题。常用离线clustering命令如下:nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/${bundle_jar_name}.jar \ --service \ --path ${table_path} > clustering.log & \以上离线clustering运行时,--plan-partition-filter-mode默认为NONE,所有符合条件的分区都会加入到任务当中,根据使用情况来看,随着历史分区的增加,资源使用逐渐处于较高的水平,该模式比较适用于临时处理大量滞留文件。同时,当数据量较大时还存在一个问题,当Clustering异常时,重启任务,偶尔会报错提示历史分区某parquet不存在(社区群有同学反馈过该问题)。通过配置参数--plan-partition-filter-mode RECENT_DAYS --target-partitions 2可以解决该问题。还可以使用--plan-partition-filter-mode的SELECTED_PARTITIONS模式,划定历史分区的范围,处理积压问题。Flink离线Clustering使用建议,分区过滤模式RECENT_DAYS和NONE的组合使用相对较为稳定,适用于线上环境。当历史分区中存在大量待Clustering的文件时,可以临时使用NONE模式压缩几次。等到压缩速度跟得上时,再切换回RECENT_DAYS模式进行进一步的处理,最终的离线Clustering命令:常规: nohup flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --clustering-delta-commits 8 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --plan-partition-filter-mode RECENT_DAYS \ --target-partitions 2 \ --path hdfs://...... > clustering.log & 处理积压: flink run -c \ org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ lib/hudi-flink1.14-bundle-0.12.3.jar \ --service \ --min-clustering-interval-seconds 3 \ --max-num-groups 1000 \ --clustering-delta-commits 1 \ --small-file-limit 90 \ --target-file-max-bytes 125829120 \ --path ${target_path}在MERGE_ON_READ模式下,当前分区的文件数量与compaction.delta_commits、clean.retain_commits有关。但是当clean.retain_commits大于10时,可能会出现parquet和log文件无法被清理干净的情况,导致部分小文件滞留在历史分区中。HoodieFlinkClusteringJob类可以接受多个参数,可以在源码org.apache.hudi.sink.clustering.FlinkClusteringConfig中或使用如下命令查看:flink run \ -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \ ../flink-1.14.2/lib/hudi-flink1.14-bundle-0.12.3.jar \ -h需要注意的是Boolean类型的参数,默认都为false,如果要开启,直接使用--param_name即可,而不是--param_name true,Hudi中解析参数使用的JCommander,如下图所示,如果Boolean参数后面跟true、false、0、1,都会在遍历参数时错位,导致任务提交失败,报错提示和这种配置方式并不是很友好,很容易让使用者误认为该参数是不可配置的!通过适当增加以下两个超时参数,可以有效避免因网络波动等原因导致的超时问题,从而降低任务失败的概率。set execution.checkpointing.timeout=600000 properties.request.timeout.ms=120000(connector为kafka时)以下参数设置为true时,如果TaskManager发生了Akka错误,例如无法与JobManager通信或其他网络问题,TaskManager将会立即退出;设置为false时,TaskManager将不会主动退出,而是继续运行,这可能会导致系统处于不稳定状态,并且需要人工干预来解决问题。taskmanager.exit-on-fatal-akka-error: true改动Hudi源码或者适配低版本大数据组件,参考了如下文章,可顺利完成编译打包:https://blog.csdn.net/weixin_45417821/article/details/127407461mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -Dflink1.14 -Dscala-2.12 mvn clean install -DskipTests -Dscala-2.12 -Pflink-bundle-shade-hive3
2023年12月30日
2,010 阅读
15 点赞
2023-07-01
Flink流批一体作业管理平台
前言随着大数据处理需求的不断增长,流批一体作业管理平台的重要性愈发凸显。本文将介绍如何针对 Flink + Hudi 流批一体任务进行管理,特别针对 Hudi 任务的需求,支持一键启动离线 Compaction 和 Clustering 任务,保证数据湖的稳定运行。背景在大数据处理领域,简化作业的提交、监控和管理是一项重要的任务。调研过程中发现,现有的开源系统如 Dinky 和 StreamPark 在任务生成和监控方面存在一定局限性。它们使用了 Flink 或 YARN 源码中的 API,如 org.apache.hadoop.yarn.client.api.YarnClient 、 org.apache.hadoop.hdfs.DistributedFileSystem 等,导致系统与 Hadoop 集群和 Flink 客户端版本之间耦合度高,Hadoop集群使用权限要求较高。系统开发基于项目 flink-streaming-platform-web 进行开发,通过优化和功能扩展(如:对使用FlinkSQL操作Hudi表的支持),构建了一个无侵入性、与 Hadoop 集群和 Flink 客户端版本解耦的流批一体作业调度平台。只需在装有 Flink 客户端的机器上启动系统,即可轻松使用,所有任务均使用 Flink 自带的命令进行提交,如:# 提交 Compaction 任务 flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor ... # 提交 Clustering 任务 flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob ...核心功能流任务管理支持 FlinkSQL 和 JAR 提交任务,提供流式作业的提交、监控、告警和日志查看等功能,为用户提供全方位的作业管理服务。批任务调度支持 FlinkSQL 和 JAR 提交任务,可定时调度批处理作业,定时完成数据处理任务,提高数据处理效率。数据湖管理(Hudi)特别针对 Hudi 任务的需求,系统支持对 MOR(Merge On Read)和 COW(Copy On Write)模式下的离线 Compaction 和离线 Clustering 任务进行管理,保证数据湖的稳定运行和数据质量。架构设计该平台的架构设计是其高效运行的关键,采用了模块化设计,将流任务管理、批任务调度和数据湖管理等功能模块化,实现了高度的灵活性和可扩展性。每个模块都具有清晰的职责和接口,使得系统易于维护和扩展。流任务管理模块负责接收用户提交的流式作业,并将其转换为 Flink 任务进行执行。这个模块需要实现任务的监控和告警功能,以确保作业的稳定运行。采用了分布式监控系统来实现实时监控和告警,保障了系统的高可用性和可靠性。批任务调度模块则负责定时调度批处理作业,并在预定的时间点执行数据处理任务。这个模块需要考虑到作业的依赖关系和执行顺序,以确保数据处理任务按时完成。采用了依赖调度策略来解决这个问题,有效地提高了作业的执行效率。数据湖管理模块是该平台的重要组成部分,特别针对 Hudi 任务的需求进行了优化。实现了对 MOR 和 COW 模式下的离线 Compaction 和离线 Clustering 任务的管理,确保了数据湖的稳定运行和数据质量。该平台极大地提升了流批一体作业创建的效率和灵活性,提供了更便捷、可靠的作业管理解决方案。未来,将持续优化系统功能,以满足需求,并助力流批一体作业的稳定运行和管理。
2023年07月01日
1,419 阅读
91 点赞
2023-03-25
Flink on Kubernetes 计算和存储分离实践
云原生已成为业界的主要趋势之一。将Flink从Yarn迁移到Kubernetes平台带来了许多优势。在这种架构下,将计算和存储解耦,计算部分运行在Kubernetes上,而存储则使用HDFS等分布式存储系统。这样的架构优势在于可以根据实际情况独立调整计算和存储资源,从而提高整体的效率和弹性。本文将介绍四种Flink在Kubernetes上的部署模式。其中,两种是基于Native Kubernetes部署的,分别有Session模式和Application模式。另外两种是基于Flink Kubernetes Operator部署的,同样包括Session模式和Application模式。首先介绍基于Flink Kubernetes Operator部署的Application模式。如果要运行自己编写的jar包,需要先构建一个镜像。如果使用了HDFS、Hudi等其他组件,还需要在Dockerfile中将Hadoop客户端和配置文件复制到镜像中,并设置相应的环境变量。同时,将所有依赖的jar包复制到Flink Home的lib目录下。FROM flink:1.16.1-scala_2.12 USER root RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone ADD hadoop-3.1.1.tar.gz /opt ADD jdk1.8.0_121.tar.gz /opt RUN rm /opt/hadoop-3.1.1/share/hadoop/common/lib/commons-math3-3.1.1.jar RUN rm /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/commons-math3-3.1.1.jar COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/common/lib/ COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/ RUN chmod -R 777 /opt/hadoop-3.1.1/share/hadoop/common/lib/ RUN mkdir -p /opt/hadoop/conf/ COPY yarn-site.xml /opt/hadoop/conf/ COPY core-site.xml /opt/hadoop/conf/ COPY hdfs-site.xml /opt/hadoop/conf/ COPY flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar $FLINK_HOME/lib/ COPY commons-cli-1.5.0.jar $FLINK_HOME/lib/ RUN mkdir $FLINK_HOME/mylib COPY xxx-1.0-SNAPSHOT.jar $FLINK_HOME/mylib RUN chown -R flink:flink $FLINK_HOME/mylib RUN echo 'export JAVA_HOME=/opt/jdk1.8.0_121 \n\ export HADOOP_HOME=/opt/hadoop-3.1.1 \n\ PATH=$PATH:$JAVA_HOME/bin \n\ PATH=$PATH:$HADOOP_HOME/bin'\ >> ~/.bashrc EXPOSE 8081构建镜像,在Dockerfile所在的目录中执行以下命令,确保该目录包含用于构建镜像的文件。docker build -t flink-native/flink-on-k8s-xxxx .安装helmcurl https://baltocdn.com/helm/signing.asc | sudo apt-key add -sudo apt-get install apt-transport-https --yes echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list sudo apt-get update sudo apt-get install helm安装cert-manager组件,由它提供证书服务。kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml安装Flink Kubernetes Operatorhelm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink --create-namespace执行上述命令后,将会从ghcr.io/apache/flink-kubernetes-operator:7fc23a1镜像仓库拉取镜像。由于下载速度较慢,可以尝试从apache/flink-kubernetes-operator:7fc23a1仓库拉取镜像,然后为其添加标签docker tag apache/flink-kubernetes-operator:7fc23a1 ghcr.io/apache/flink-kubernetes-operator:7fc23a1如果在重新安装时遇到使用kubectl delete无法删除的情况,可以尝试以下命令来实现删除操作:kubectl patch crd/flinksessionjobs.flink.apache.org -p '{"metadata":{"finalizers":[]}}' --type=merge通过执行上述命令,可以成功删除该资源。查看自定义资源 kubectl get customresourcedefinition构建一个YAML文件来提交任务。其中,image指定了镜像,jarURI指定了jar包在镜像中的位置,entryClass指定了要执行的类,args指定了该类所需的参数:kind: FlinkDeployment metadata: name: flink-application-xxx spec: image: flink-native/flink-on-k8s-xxxx flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "n" serviceAccount: flink jobManager: resource: memory: "nm" cpu: n taskManager: resource: memory: "nm" cpu: n job: jarURI: local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar entryClass: com.xxx.run.XXXJob parallelism: n upgradeMode: stateless args: ["hdfs://host:9000/data/input","hdfs://host:9000/data/output","n"]提交任务kubectl create -f xxx.yaml查看flinkdeployment kubectl get flinkdeployment查看日志kubectl logs -f deploy/flink-application-xxx{lamp/}Flink on K8S Session模式和Application模式需要安装Flink客户端,下载flink压缩包,解压即可设置命名空间首选项、赋权等kubectl create ns flink-native kubectl config set-context --current --namespace=flink-native kubectl create serviceaccount flink kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink-nativeSession模式,启动Flink集群bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=60000 \ -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"端口转发nohup kubectl -n flink-native port-forward --address 0.0.0.0 service/my-first-flink-cluster-rest 8081:8081 >port-forward.log &打开Flink Web UI,可以看到此时只有jobmanager向集群提交任务,运行测试任务bin/flink run -e kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=my-first-flink-cluster examples/streaming/TopSpeedWindowing.jar运行自己的jar包bin/flink run -e kubernetes-session \ -Dkubernetes.namespace=flink-native \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.cluster-id=xxx \ -c com.xxx.run.XXXJob \ mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2查看pod,此时可以看到生成了taskmanagerkubectl get pod -o wide -A查看日志,使用以下命令可以看到测试程序TopSpeedWindowing的输出结果kubectl logs my-first-flink-cluster-taskmanager-1-1 -n flink-native查看任务列表:bin/flink list --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxx 根据ID删除任务:bin/flink cancel --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxxxr 3ff3c5a5e3c2f47e024e2771dc108f77Application模式bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP \ -c com.sohu.longuserprofile.run.TestJob \ local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2Session模式只能提交本地(宿主机)jar包,Application模式只能使用local:///{lamp/}常用命令k8s web ui 登录token获取 kubectl -n kubernetes-dashboard describe secret $(kubectl -n kubernetes-dashboard get secret | grep dashboard-admin | awk '{print $1}') | grep token: 查看所有pod列表 kubectl get pod -o wide -A 查看pod详细信息 kubectl describe pod pod_name -n flink-native 删除deployment kubectl delete deployment/my-first-flink-cluster 进入pod kubectl exec -it -n flink-native pod_name /bin/bash 获得所有命名空间 kubectl get namespace 拷贝出来 kubectl cp -n application psqls-0:/var/lib/postgresql/data/pg_wal /home 拷贝进去 kubectl cp /home/dades/pg_wal -n application psqls-0:/var/lib/postgresql/data/pg_wal
2023年03月25日
1,050 阅读
6 点赞
1
2
...
6