华为云用户手册

  • 操作场景 在程序代码完成开发后,您可以上传至Linux客户端环境中运行应用。使用Scala或Java语言开发的应用程序在Spark客户端的运行步骤是一样的。 使用Python开发的Spark应用程序无需打包成jar,只需将样例工程复制到编译机器上即可。 用户需保证worker和driver的Python版本一致,否则将报错:"Python in worker has different version %s than that in driver %s."。 用户需保证Maven已配置华为镜像站中SDK的Maven镜像仓库,具体可参考配置华为开源镜像仓
  • 前提条件 已按照准备开发和运行环境章节准备好开发用户,例如developuser,并下载用户的认证凭据文件到本地。 用户需要具备Oozie的普通用户权限,HDFS访问权限,Hive表读写权限,HBase读写权限以及Yarn的队列提交权限。 已在Linux环境中安装了完整的集群客户端。 获取Oozie服务器URL(任意节点),这个URL将是客户端提交流程任务的目标地址。 URL格式为:https://Oozie节点业务IP:21003/oozie。端口为“OOZIE_HTTPS_PORT”参数对应值,默认为21003。 例如,“https://10.10.10.176:21003/oozie”。
  • Structured Streaming不支持的功能 不支持多个流聚合。 不支持limit、first、take这些取N条Row的操作。 不支持Distinct。 只有当output mode为complete时才支持排序操作。 有条件地支持流和静态数据集之间的外连接。 不支持部分DataSet上立即运行查询并返回结果的操作: count():无法从流式Dataset返回单个计数,而是使用ds.groupBy().count()返回一个包含运行计数的streaming Dataset。 foreach():使用ds.writeStream.foreach(...)代替。 show():使用输出console sink代替。
  • Structured Streaming可靠性说明 Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。 从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下: 不允许source的个数或者类型发生变化。 source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如: 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...) 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic") sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如: File sink允许变更为kafka sink,kafka中只处理新数据。 kafka sink不允许变更为file sink。 kafka sink允许变更为foreach sink,反之亦然。 sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如: 不允许file sink的输出路径发生变更。 允许Kafka sink的输出topic发生变更。 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。 Projection、filter和map-like操作变更,局部场景下能够支持,例如: 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...) Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。 状态操作的变更,在部分场景下会导致状态恢复失败: Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。 Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。 Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。 Source的容错性支持列表 Sources 支持的Options 容错支持 说明 File source path:必填,文件路径 maxFilesPerTrigger:每次trigger最大文件数(默认无限大) latestFirst:是否有限处理新文件(默认值: false) fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false) 支持 支持通配符路径,但不支持以逗号分隔的多个路径。 文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。 Socket Source host:连接的节点ip,必填 port:连接的端口,必填 不支持 - Rate Source rowsPerSecond:每秒产生的行数,默认值1 rampUpTime:在达到rowsPerSecond速度之前的上升时间 numPartitions:生成数据行的并行度 支持 - Kafka Source 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html 支持 - Sink的容错性支持列表 Sinks 支持的output模式 支持Options 容错性 说明 File Sink Append Path:必须指定 指定的文件格式,参见DataFrameWriter中的相关接口 exactly-once 支持写入分区表,按时间分区用处较大 Kafka Sink Append, Update, Complete 参见:https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html at-least-once 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html Foreach Sink Append, Update, Complete None 依赖于ForeachWriter实现 参见https://spark.apache.org/docs/3.1.1/structured-streaming-programming-guide.html#using-foreach ForeachBatch Sink Append, Update, Complete None 依赖于算子实现 参见https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch Console Sink Append, Update, Complete numRows:每轮打印的行数,默认20 truncate:输出太长时是否清空,默认true 不支持容错 - Memory Sink Append, Complete None 不支持容错,在complete模式下,重启query会重建整个表 -
  • Structured Streaming支持的功能 支持对流式数据的ETL操作。 支持流式DataFrames或Datasets的schema推断和分区。 流式DataFrames或Datasets上的操作:包括无类型,类似SQL的操作(比如select、where、groupBy),以及有类型的RDD操作(比如map、filter、flatMap)。 支持基于Event Time的聚合计算,支持对迟到数据的处理。 支持对流式数据的去除重复数据操作。 支持状态计算。 支持对流处理任务的监控。 支持批流join,流流join。 当前JOIN操作支持列表如下: 左表 右表 支持的Join类型 说明 Static Static 全部类型 即使在流处理中,不涉及流数据的join操作也能全部支持 Stream Static Inner 支持,但是无状态 Left Outer 支持,但是无状态 Right Outer 不支持 Full Outer 不支持 Stream Stream Inner 支持,左右表可选择使用watermark或者时间范围进行状态清理 Left Outer 有条件的支持,左表可选择使用watermark进行状态清理,右表必须使用watermark+时间范围 Right Outer 有条件的支持,右表可选择使用watermark进行状态清理,左表必须使用watermark+时间范围 Full Outer 不支持
  • 在Linux中调测程序 在运行调测环境上创建一个目录作为运行目录,如“/opt/impala_examples”,并在该目录下创建子目录“conf”。 执行mvn package,在样例工程target目录下获取jar包,比如: impala-examples-mrs-2.1-jar-with-dependencies.jar ,复制到“/opt/impala_examples”下。 开启Kerberos认证的安全集群下把从准备MRS应用开发用户获取的“user.keytab”和“krb5.conf”复制到“/opt/impala_examples/conf”下。 在Linux环境下执行如下命令运行样例程序。 chmod +x /opt/impala_examples -R cd /opt/impala_examples java -cp impala-examples-mrs-2.1-jar-with-dependencies.jar com.huawei.bigdata.impala.example.ExampleMain 在命令行终端查看样例代码中的Impala SQL所查询出的结果。 Linux环境运行成功结果会有如下信息。 Create table success! _c0 0 Delete table success! 父主题: 调测程序
  • 部署运行及结果查看 导出本地jar包,请参见打包IntelliJ IDEA代码。 将4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包业务。 将开发好的yaml文件及相关的properties文件复制至storm客户端所在主机的任意目录下,如“/opt”。 执行命令提交拓扑。 storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml 如果设置业务以本地模式启动,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml 如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。 如果使用了properties文件,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties 拓扑提交成功后请自行登录storm UI查看。
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluser模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • 操作步骤 在Windows本地运行程序,需要配置https ssl证书。 登录集群任意节点,进入如下目录下载ca.crt文件。 cd ${BIGDATA_HOME}/om-agent_8.1.0.1/nodeagent/security/cert/subcert/certFile/ 将ca.crt文件下载到本地,以管理员的身份打开cmd。 输入如下命令: keytool -import -v -trustcacerts -alias ca -file "D:\xx\ca.crt" -storepass changeit -keystore "%JAVA_HOME%\jre\lib\security\cacerts" 其中“D:\xx\ca.crt”是实际ca.crt文件存放路径;“%JAVA_HOME% ”为jdk安装路径。 在开发环境中(例如IDEA中),右击OozieRestApiMain.java,单击“Run 'OozieRestApiMain.main()'”运行对应的应用程序工程。 使用Oozie客户端执行以下命令: oozie job -oozie https://Oozie业务IP:21003/oozie -config job.properties -run 其中需要提前将待使用样例工程目录“src\main\resources”中的“job.properties”文件复制到Oozie客户端所在目录。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 dstream.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表4 Spark Streaming方法介绍 方法 说明 socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] 从TCP源主机:端口创建一个输入流。 start():Unit 启动Spark Streaming计算。 awaitTermination(timeout: long):Unit 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 终止Spark Streaming计算。 transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ? RDD[T])(implicit arg0: ClassTag[T]): DStream[T] 对每一个RDD应用function操作得到一个新的DStream。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义状态和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(otherStream, [numTasks]) 实现不同的Spark Streaming之间做合并操作。 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。 表5 Spark Streaming增强特性接口 方法 说明 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。
  • SparkSQL常用接口 Spark SQL中常用的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集。 HiveContext:获取存储在Hive中数据的主入口。 表6 常用的Actions方法 方法 说明 collect(): Array[Row] 返回一个数组,包含DataFrame的所有列。 count(): Long 返回DataFrame中的行数。 describe(cols: String*): DataFrame 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first(): Row 返回第一行。 Head(n:Int): Row 返回前n行。 show(numRows: Int, truncate: Boolean): Unit 用表格形式显示DataFrame。 take(n:Int): Array[Row] 返回DataFrame中的前n行。 表7 基本的DataFrame Functions 方法 说明 explain(): Unit 打印出SQL语句的逻辑计划和物理计划。 printSchema(): Unit 打印schema信息到控制台。 registerTempTable(tableName: String): Unit 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 toDF(colNames: String*): DataFrame 返回一个列重命名的DataFrame。
  • 样例代码 -- 从本地文件系统/opt/impala_examples_data/目录下将employee_info.txt加载进employees_info表中. LOAD DATA LOCAL INPATH '/opt/impala_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; -- 从HDFS上/user/impala_examples_data/employee_info.txt加载进employees_info表中. LOAD DATA INPATH '/user/impala_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; 加载数据的实质是将数据复制到HDFS上指定表的目录下。 “LOAD DATA LOCAL INPATH”命令可以完成从本地文件系统加载文件到Impala的需求,但是当指定“LOCAL”时,这里的路径指的是当前连接的“Impalad”的本地文件系统的路径。
  • 在对性能要求比较高的场景下,可以使用Kryo优化序列化性能 Spark提供了两种序列化实现: org.apache.spark.serializer.KryoSerializer:性能好,兼容性差 org.apache.spark.serializer.JavaSerializer:性能一般,兼容性好 使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 为什么不默认使用Kryo序列化? Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介 绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
  • 在业务情况允许的情况下使用高性能算子 使用reduceByKey/aggregateByKey替代groupByKey。 所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。 使用mapPartitions替代普通map。 mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。 但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重! 使用filter之后进行coalesce操作。 通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即 可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。 repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在 repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions 算子。因为该算子 可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 使用foreachPartitions替代foreach。 原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数 据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写 MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
  • RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel:StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel:StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking:Boolean = true),该函数功能如下: 将该RDD从持久化列表中移除,RDD对应的数据进入可回收状态; 将RDD的存储级别重新设置为StorageLevel.NONE。
  • 数据量大并发数高且有Shuffle时可调整网络内存 在并发数高和数据量大时,发生shuffle后会发生大量的网络IO,提升网络缓存内存可以扩大一次性读取的数据量,从而提升IO速度。 【示例】 # 网络占用内存占整个进程内存的比例 taskmanager.memory.network.fraction: 0.6 # 网络缓存内存的最小值 taskmanager.memory.network.min: 1g # 网络缓存内存的最大值(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE) taskmanager.memory.network.max: 20g
  • 如果不能使用broardcast join应该尽量减少shuffle数据 不能broadcast join那么必定会发生shuffle,可通过各种手段来减少发生shuffle的数据量,例如谓词下推,Runtime Filter等等。 【示例】 # Runtime filter配置 table.exec.runtime-filter.enabled: true # 下推 table.optimizer.source.predicate-pushdown-enabled: true
  • 大状态Checkpoint优先从本地状态恢复 为了快速的状态恢复,每个task会同时写Checkpoint数据到本地磁盘和远程分布式存储,也就是说这是一份双复制。只要task本地的Checkpoint数据没有被破坏,系统在应用恢复时会首先加载本地的Checkpoint数据,这样就很大程度减少了远程拉取状态数据的过程。 【示例】配置Checkpoint优先从本地恢复(flink-conf.yaml): state.backend.local-recovery: true
  • TM的Slot数和TM的CPU数成倍数关系 在Flink中,每个Task被分解成SubTask,SubTask作为执行的线程单位运行在TM上,在不开启Slot Sharing Group的情况下,一个SubTask是部署在一个slot上的。即使开启了Slot Sharing Group,大部分情况下Slot中拥有的SubTask也是负载均衡的。所以可以理解为TM上的Slot个数代表了上面运行的任务线程数。 合理的Slots数量应该和CPU核数相同,在使用超线程时,每个Slot将占用2个或更多的硬件线程。 【示例】建议配置TM Slot个数为CPU Core个数的2~4倍: taskmanager.numberOfTaskSlots: 4 taskmanager.cpu.cores: 2
  • 数据倾斜状态下可以使用localglobal优化策略 【示例】 #开启mini-batch优化 table.exec.mini-batch.enabled:true #最长等待时间 table.exec.mini-batch.allow-latency: 20ms #最大缓存记录数 table.exec.mini-batch.size:8000 #开启两阶段聚合 table.optimizer.agg-phase-strategy:TWO_PHASE
  • RocksDB作为状态后端时通过多块磁盘提升IO性能 RocksDB使用内存加磁盘的方式存储数据,当状态比较大时,磁盘占用空间会比较大。如果对RocksDB有频繁的读取请求,那么磁盘IO会成为Flink任务瓶颈。当一个 TaskManager包含三个slot时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘IO,导致三个并行度的吞吐量都会下降。可以通过指定多个不同的硬盘从而减少IO竞争。 【示例】Rockdb配置Checkpoint目录放在不同磁盘(flink-conf.yaml): state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb
  • 吞吐量大场景下使用MiniBatch聚合增加吞吐量 MiniBatch聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个key只需一个操作即可访问状态,可以很大程度减少状态开销并获得更好的吞吐量。但是可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理,这是吞吐量和延迟之间的权衡。默认未开启该功能。 API方式: // instantiate table environmentTableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimizationconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input recordsconfiguration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task 资源文件方式(flink-conf.yaml): table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency : 5 s table.exec.mini-batch.size: 5000
  • 基于序列化性能尽量使用POJO和Avro等简单的数据类型 使用API编写Flink程序时需要考虑Java对象的序列化,大多数情况下Flink都可以高效的处理序列化。SQL中无需考虑,SQL中数据都为ROW类型,都采用了Flink内置的序列化器,能很高效的进行序列化。 表1 序列化 序列化器 Opts/s PojoSeriallizer 813 Kryo 294 Avro(Reflect API) 114 Avro(SpecificRecord API) 632
  • 网络通信调优 Flink通信主要依赖Netty网络,所以在Flink应用执行过程中,Netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。 【示例】 # netty的服务端线程数目(-1表示默认参数numOfSlot) taskmanager.network.netty.server.numThreads -1(numOfSlot) # netty的客户端线程数目(-1表示默认参数numofSlot) taskmanager.network.netty.client.numThreads : -1 # netty的客户端连接超时时间 taskmanager.network.netty.client.connectTimeoutSec:120s # netty的发送和接受缓冲区的大小(0表示netty默认参数,4MB) taskmanager.network.netty.sendReceiveBufferSize: 0 # netty的传输方式,默认方式会根据运行的平台选择合适的方式 taskmanager.network.netty.transport:auto
  • 内存总体调优 Flink内部对内存进行了划分,整体上划分成为了堆内存和堆外内存两部分。Java堆内存是通过Java程序创建时指定的,这也是JVM可自动GC的部分内存。堆外内存可细分为可被JVM管理的和不可被JVM管理的,可被JVM管理的有Managed Memory、Direct Memory,这部分是调优的重点,不可被JVM管理的有JVM Metaspace、JVM Overhead,这部分是native memory。 图1 内存 表2 相关参数 参数 配置 注释 说明 Total Memory taskmanager.memory.flink.size: none 总体Flink管理的内存大小,没有默认值,不包含Metaspace和Overhead,Standalone模式时设置。 整体内存。 taskmanager.memory.process.size: none 整个Flink进程使用的内存大小,容器模式时设置。 FrameWork taskmanager.memory.framework.heap.size: 128mb runtime占用的heap的大小,一般来说不用修改,占用空间相对固定。 RUNTIME底层占用的内存,一般不用做较大改变。 taskmanager.memory.framework.off-heap.size: 128mb runtime占用的off-heap的大小,一般来说不用修改,占用空间相对固定。 Task taskmanager.memory.task.heap.size:none 没有默认值,flink.size减去框架、托管、网络等得到。 算子逻辑,用户代码(如UDF)正常对象占用内存的地方。 taskmanager.memory.task.off-heap.size:0 默认值为0,task使用的off heap内存。 Managed Memory taskmanager.memory.managed.fraction: 0.4 托管内存占taskmanager.memory.flink.size的比例,默认0.4。 managed内存用于中间结果缓存、排序、哈希等(批计算),以及RocksDB state backend(流计算),该内存在批模式下一开始就申请固定大小内存,而流模式下会按需申请。 taskmanager.memory.managed.size: 0 托管内存大小,一般不指定,默认为0,内存大小由上面计算出来。若指定了则覆盖比例计算的内存。 Network taskmanager.memory.network.min:64mb 网络缓存的最小值。 用于taskmanager之间shuffle、广播以及与network buffer。 taskmanager.memory.network.max:1gb 网络缓存的最大值。(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE) taskmanager.memory.network.fraction:0.1 network memory占用taskmanager.memory.flink.size的大小,默认0.1,会被限制在network.min和network.max之间。 用于taskmanager之间shuffle、广播以及与network buffer。 Others taskmanager.memory.jvm-metaspace.size:256M metaspace空间的最大值,默认值256MB。 用户自己管理的内存。 taskmanager.memory.jvm-overhead.min:192M jvm额外开销的最小值,默认192MB。 taskmanager.memory.jvm-overhead.max:1G jvm额外开销的最大值,默认1GB。 taskmanager.memory.jvm-overhead.fraction:0.1 jvm额外开销占taskmanager.memory.process.size的比例,默认0.1,算出来后会被限制在jvm-overhead.min和jvm-overhead.max之间。 3.3.1及之后版本无需修改taskmanager.memory.network.max网络缓存的最大值
  • 使用local-global两阶段聚合减少数据倾斜 Local-Global聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于MapReduce中的 Combine + Reduce模式。 数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同key的输入数据累加到单个累加器中。全局聚合将仅接收reduce后的累加器,而不是大量的原始输入数据,这可以很大程度减少网络shuffle和状态访问的成本。每次本地聚合累积的输入数据量基于mini-batch间隔,这意味着local-global聚合依赖于启用了mini-batch优化。 API方式: // instantiate table environmentTableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); configuration.setString("table.exec.mini-batch.size", "5000"); configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation 资源文件方式: table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency : 5 s table.exec.mini-batch.size: 5000 table.optimizer.agg-phase-strategy: TWO_PHASE
  • 通过表级TTL进行状态后端优化 本章节适用于MRS 3.3.0及以后版本。 在Flink双流Join场景下,若Join的左表和右表其中一个表数据变化快,需要较短时间的过期时间,而另一个表数据变化较慢,需要较长时间的过期时间。目前Flink只有表级别的TTL(Time To Live:生存时间),为了保证Join的准确性,需要将表级别的TTL设置为较长时间的过期时间,此时状态后端中保存了大量的已经过期的数据,给状态后端造成了较大的压力。为了减少状态后端的压力,可以单独为左表和右表设置不同的过期时间。不支持where子句。 可通过使用Hint方式单独为左表和右表设置不同的过期时间,如左表(state.ttl.left)设置TTL为60秒,右表(state.ttl.right)设置TTL为120秒: Hint方式格式: table_path /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral 在SQL语句中配置示例: CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t LEFT JOIN -- 为左表和右表设置不同的TTL时间 /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */ user_score as d ON t.user_id = d.user_id;
  • 非状态计算提升性能的资源优化 Flink计算操作分为如下两类: 无状态计算操作:该部分算子不需要保存计算状态,例如:filter、union all、lookup join。 有状态计算操作:该部分算子要根据数据前后状态变化进行计算,例如:join,union、window、group by、聚合算子等。 对于非状态计算主要调优为TaskManager的Heap Size与NetWork。 例如作业仅进行数据的读和写,TaskManage无需增加额外的vCore,off-Heap和Overhead默认为1GB,内存主要给Heap和Network。
  • 通过调整对应算子并行度提升性能 读写Hudi可以通过配置读写并发提升读写性能。 读算子的并行度调整参数:read.tasks 写算子的并行度调整参数:write.tasks 采用状态索引在作业重启的时候(非Checkpoint重启),需要读目标表重建索引,可以增大该算子并行度提升性能。 加载索引的并行度调整参数:write.index_bootstrap.tasks 采用状态索引写数据需要进行主键唯一性检查,分配具体写入文件,提升该算子并行度提升性能。 写算子索引检测算子调整参数:write.bucket_assign.tasks
  • 通过表级JTL进行状态后端优化 本章节适用于MRS 3.3.0及以后版本。 在Flink双流inner Join场景下,若Join业务允许join一次就可以剔除后端中的数据时,可以使用该特性。 该特性只适用于流流inner join。 可通过使用Hint方式单独为左表和右表设置不同join次数: Hint方式格式: table_path /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral 在SQL语句中配置示例: CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- 为左表和右表设置不同的JTL关联次数 /*+ OPTIONS('eliminate-state.left.threshold'='1','eliminate-state.right.threshold'='1') */ user_score as d ON t.user_id = d.user_id;
共100000条