华为云用户手册

  • 参考信息 被广播的表执行超时,导致任务结束。 默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。 这种情况下,有两种方式处理: 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。
  • 配置描述 要启动小文件优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 描述 默认值 spark.sql.files.maxPartitionBytes 在读取文件时,将单个分区打包的最大字节数。 单位:byte。 134217728(即128M) spark.files.openCostInBytes 打开文件的预估成本, 按照同一时间能够扫描的字节数来测量。当一个分区写入多个文件时使用。高估更好,这样小文件分区将比大文件分区更先被调度。 4M
  • 操作步骤 可对INSERT...SELECT操作做如下的调优操作。 如果建的是Hive表,将存储类型设为Parquet,从而减少执行INSERT...SELECT语句的时间。 建议使用spark-sql或者在Beeline/JDBCServer模式下使用spark用户来执行INSERT...SELECT操作,避免执行更改文件owner的操作,从而减少执行INSERT...SELECT语句的时间。 在Beeline/JDBCServer模式下,executor的用户跟driver是一致的,driver是JDBCServer服务的一部分,是由spark用户启动的,因此其用户也是spark用户,且当前无法实现在运行时将Beeline端的用户透传到executor,因此使用非spark用户时需要对文件进行更改owner为Beeline端的用户,即实际用户。 如果查询的数据是大量的小文件将会产生大量map操作,从而导致输出存在大量的小文件,在执行重命名文件操作时将会耗费较多时间,此时可以通过设置“spark.sql.files.maxPartitionBytes”与“spark.files.openCostInBytes”来设置一个partiton读取的最大字节,在一个partition中合并多个小文件来减少输出文件数及执行重命名文件操作的时间,从而减少执行INSERT...SELECT语句的时间。 上述优化操作并不能解决全部的性能问题,对于以下场景仍然需要较多时间: 对于动态分区表,如果其分区数非常多,那么也需要执行较长的时间。
  • 操作场景 Spark on Yarn模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。 因而Driver和Executor的参数配置对Spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。
  • 示例 在执行spark wordcount计算中。1.6T数据,250个executor。 在默认参数下执行失败,出现Futures timed out和OOM错误。 因为数据量大,task数多,而wordcount每个task都比较小,完成速度快。当task数多时driver端相应的一些对象就变大了,而且每个task完成时executor和driver都要通信,这就会导致由于内存不足,进程之间通信断连等问题。 当把Driver的内存设置到4g时,应用成功跑完。 使用JDBCServer执行TPC-DS测试套,默认参数配置下也报了很多错误:Executor Lost等。而当配置Driver内存为30g,executor核数为2,executor个数为125,executor内存为6g时,所有任务才执行成功。
  • 操作步骤 并行度可以通过如下三种方式来设置,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。 在会产生shuffle的操作函数内设置并行度参数,优先级最高。 testRDD.groupByKey(24) 在代码中配置“spark.default.parallelism”设置并行度,优先级次之。 val conf = new SparkConf()conf.set("spark.default.parallelism", 24) 在“$SPARK_HOME/conf/spark-defaults.conf”文件中配置“spark.default.parallelism”的值,优先级最低。 spark.default.parallelism 24
  • 操作场景 Broadcast(广播)可以把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。 每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。 大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。
  • 使用coalesce调整分片的数量 coalesce可以调整分片的数量。coalesce函数有两个参数: coalesce(numPartitions: Int, shuffle: Boolean = false) 当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。 遇到下列场景,可选择使用coalesce算子: 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。 当输入切片个数太大,导致程序无法正常运行时使用。 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。
  • 操作步骤 配置Driver内存。 Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。 您可以根据实际任务数量的多少,为Driver设置一个合适的内存。 将“spark-defaults.conf”中的“spark.driver.memory”配置项设置为合适大小。 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。 配置Executor个数。 每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。 将“spark-defaults.conf”中的“spark.executor.instance”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_INSTANCES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--num-executors NUM”参数设置Executor个数。 配置Executor核数。 每个Executor多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用Executor的内存,所以要在内存和核数之间做好平衡。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 Executor的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加;当一个任务较小运行较快时,就可以增大并发度减少内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 操作场景 Spark是内存计算框架,计算过程中内存不够对Spark的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存中RDD的大小来判断内存是否变成性能瓶颈,并根据情况优化。 监控节点进程的GC情况(在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" ),如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。
  • 操作步骤 Spark程序运行时,在shuffle和RDD Cache等过程中,会有大量的数据需要序列化,默认使用JavaSerializer,通过配置让KryoSerializer作为数据序列化器来提升序列化性能。 在开发应用程序时,添加如下代码来使用KryoSerializer作为数据序列化器。 实现类注册器并手动注册类。 package com.etl.common;import com.esotericsoftware.kryo.Kryo;import org.apache.spark.serializer.KryoRegistrator; public class DemoRegistrator implements KryoRegistrator{ @Override public void registerClasses(Kryo kryo) { //以下为示例类,请注册自定义的类 kryo.register(AggrateKey.class); kryo.register(AggrateValue.class); }} 您可以在Spark客户端对spark.kryo.registrationRequired参数进行配置,设置是否需要Kryo注册序列化。 当参数设置为true时,如果工程中存在未被序列化的类,则会抛出异常。如果设置为false(默认值),Kryo会自动将未注册的类名写到对应的对象中。此操作会对系统性能造成影响。设置为true时,用户需手动注册类,针对未序列化的类,系统不会自动写入类名,而是抛出异常,相对比false,其性能较好。 配置KryoSerializer作为数据序列化器和类注册器。 val conf = new SparkConf()conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryo.registrator", "com.etl.common.DemoRegistrator")
  • 操作步骤 优化GC,调整老年代和新生代的大小和比例。在客户端的conf/spark-default.conf配置文件中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:-XX:NewRatio。如," -XX:NewRatio=2",则新生代占整个堆空间的1/3,老年代占2/3。 开发Spark应用程序时,优化RDD的数据结构。 使用原始类型数组替代集合类,如可使用fastutil库。 避免嵌套结构。 Key尽量不要使用String。 开发Spark应用程序时,建议序列化RDD。 RDD做cache时默认是不序列化数据的,可以通过设置存储级别来序列化RDD减小内存。例如: testRDD.persist(StorageLevel.MEMORY_ONLY_SER)
  • 配置场景 Spark SQL Adaptive Execution特性用于使Spark SQL在运行过程中,根据中间结果优化后续执行流程,提高整体执行效率。当前已实现的特性如下: 自动设置shuffle partition数 在启用Adaptive Execution特性前,Spark SQL根据spark.sql.shuffle.partitions配置指定shuffle时的partition个数。此种方法在一个应用中执行多种SQL查询时缺乏灵活性,无法保证所有场景下的性能合适。开启Adaptive Execution后,Spark SQL将自动为每个shuffle过程动态设置partition个数,而不是使用通用配置,使每次shuffle过程自动使用最合理的partition数。 动态调整执行计划 在启用Adaptive Execution特性前,Spark SQL根据RBO和CBO的优化结果创建执行计划,此种方法忽略了数据在运行过程中的结果集变化。比如基于某个大表创建的视图,与其他大表join时,即便视图的结果集很小,也无法将执行计划调整为BroadcastJoin。启用Adaptive Execution特性后,Spark SQL能够在运行过程中根据前面stage的运行结果动态调整后续的执行计划,从而获得更好的执行性能。 自动处理数据倾斜 在执行SQL语句时,若存在数据倾斜,可能导致单个executor内存溢出、任务执行缓慢等问题。启动Adaptive Execution特性后,Spark SQL能自动处理数据倾斜场景,对倾斜的分区,启动多个task进行处理,每个task读取若干个shuffle输出文件,再对这部分任务的Join结果进行Union操作,以达到消除数据倾斜的效果
  • 日志描述 日志存储路径: Executor运行日志:“${BIGDATA_DATA_HOME}/hadoop/data${i}/nm/containerlogs/application_${appid}/container_{$contid}” 运行中的任务日志存储在以上路径中,运行结束后会基于Yarn的配置确定是否汇聚到HDFS目录中,详情请参见Yarn常用参数。 其他日志:“/var/log/Bigdata/spark2x” 日志归档规则: 使用yarn-client或yarn-cluster模式提交任务时,Executor日志默认50MB滚动存储一次,最多保留10个文件,不压缩。 JobHistory2x日志默认100MB滚动存储一次,最多保留100个文件,压缩存储。 JDBCServer2x日志默认100MB滚动存储一次,最多保留100个文件,压缩存储。 IndexServer2x日志默认100MB滚动存储一次,最多保留100个文件,压缩存储。 JDBCServer2x审计日志默认20MB滚动存储一次,最多保留20个文件,压缩存储。 日志大小和压缩文件保留个数可以在FusionInsight Manager界面中配置。 表1 Spark2x日志列表 日志类型 日志文件名 描述 SparkResource2x日志 spark.log Spark2x服务初始化日志。 prestart.log prestart脚本日志。 cleanup.log 安装卸载实例时的清理日志。 spark-availability-check.log Spark2x服务健康检查日志。 spark-service-check.log Spark2x服务检查日志 JDBCServer2x日志 JDBCServer-start.log JDBCServer2x启动日志。 JDBCServer-stop.log JDBCServer2x停止日志。 JDBCServer.log JDBCServer2x运行时,Driver端日志。 jdbc-state-check.log JDBCServer2x健康检查日志。 jdbcserver-omm-pid***-gc.log.*.current JDBCServer2x进程gc日志。 spark-omm-org.apache.spark.sql.hive.thriftserver.HiveThriftProxyServer2-***.out* JDBCServer2x进程启动信息日志。若进程停止,会打印jstack信息。 JobHistory2x日志 jobHistory-start.log JobHistory2x启动日志。 jobHistory-stop.log JobHistory2x停止日志。 JobHistory.log JobHistory2x运行过程日志。 jobhistory-omm-pid***-gc.log.*.current JobHistory2x进程gc日志。 spark-omm-org.apache.spark.deploy.history.HistoryServer-***.out* JobHistory2x进程启动信息日志。若进程停止,会打印jstack信息。 IndexServer2x日志 IndexServer-start.log IndexServer2x启动日志。 IndexServer-stop.log IndexServer2x停止日志。 IndexServer.log IndexServer2x运行时,Driver端日志。 indexserver-state-check.log IndexServer2x健康检查日志。 indexserver-omm-pid***-gc.log.*.current IndexServer2x进程gc日志。 spark-omm-org.apache.spark.sql.hive.thriftserver.IndexServerProxy-***.out* IndexServer2x进程启动信息日志。若进程停止,会打印jstack信息。 审计日志 jdbcserver-audit.log ranger-audit.log JDBCServer2x审计日志。
  • 操作场景 Spark支持两种方式的序列化 : Java原生序列化JavaSerializer Kryo序列化KryoSerializer 序列化对于Spark应用的性能来说,具有很大的影响。在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略。 KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持,兼容性不好,所以需要手动注册类。 序列化功能用在两个地方:序列化任务和序列化数据。Spark任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和KryoSerializer。
  • 工具介绍 在Hadoop大规模生产集群中,由于HDFS的元数据都保存在NameNode的内存中,集群规模受制于NameNode单点的内存限制。如果HDFS中有大量的小文件,会消耗NameNode大量内存,还会大幅降低读写性能,延长作业运行时间。因此,小文件问题是制约Hadoop集群规模扩展的关键问题。 本工具主要有如下两个功能: 扫描表中有多少低于用户设定阈值的小文件,返回该表目录中所有数据文件的平均大小。 对表文件提供合并功能,用户可设置合并后的平均文件大小。
  • 配置场景 Spark优化sql的执行,一般的优化规则都是启发式的优化规则,启发式的优化规则,仅仅根据逻辑计划本身的特点给出优化,没有考虑数据本身的特点,也就是未考虑算子本身的执行代价。Spark在2.2中引入了基于代价的优化规则(CBO)。CBO会收集表和列的统计信息,结合算子的输入数据集来估计每个算子的输出条数以及字节大小,这些就是执行一个算子的代价。 CBO会调整执行计划,来最小化端到端的查询时间,中心思路2点: 尽早过滤不相关的数据。 最小化每个算子的代价。 CBO优化过程分为2步: 收集统计信息。 根据输入的数据集估算特定算子的输出数据集。 表级别统计信息包括:记录条数;表数据文件的总大小。 列级别统计信息包括:唯一值个数;最大值;最小值;空值个数;平均长度;最大长度;直方图。 有了统计信息后,就可以估计算子的执行代价了。常见的算子包括过滤条件Filter算子和Join算子。 直方图为列统计值的一种,可以直观的描述列数据的分布情况,将列的数据从最小值到最大值划分为事先指定数量的槽位(bin),计算各个槽位的上下界的值,使得全部数据都确定槽位后,所有槽位中的数据数量相同(等高直方图)。有了数据的详细分布后,各个算子的代价估计能更加准确,优化效果更好。 该特性可以通过下面的配置项开启: spark.sql.statistics.histogram.enabled:指定是否开启直方图功能,默认为false。
  • 配置场景 当Spark2x Web UI中有一些不允许其他用户看到的数据时,用户可能想对UI进行安全防护。用户一旦登录,Spark2x可以比较与这个用户相对应的视图ACLs来确认是否授权用户访问 UI。 Spark2x存在两种类型的Web UI,一种为运行中任务的Web UI,可以通过Yarn原生页面的应用链接或者REST接口访问。一种为已结束任务的Web UI,可以通过Spark2x JobHistory服务或者REST接口访问。 本章节仅支持安全模式(开启了Kerberos认证)集群。 运行中任务Web UI ACL配置。 运行中的任务,可通过服务端对如下参数进行配置。 “spark.admin.acls”:指定Web UI的管理员列表。 “spark.admin.acls.groups”:指定管理员组列表。 “spark.ui.view.acls”:指定yarn界面的访问者列表。 “spark.modify.acls.groups”:指定yarn界面的访问者组列表。 “spark.modify.acls”:指定Web UI的修改者列表。 “spark.ui.view.acls.groups”:指定Web UI的修改者组列表。 运行结束后Web UI ACL配置。 运行结束的任务通过客户端的参数“spark.history.ui.acls.enable”控制是否开启ACL访问权限。 如果开启了ACL控制,由客户端的“spark.admin.acls”和s“park.admin.acls.groups”配置指定Web UI的管理员列表和管理员组列表,由客户端的“spark.ui.view.acls”和“spark.modify.acls.groups”配置指定查看Web UI任务明细的访问者列表和组列表,由客户端的“spark.modify.acls”和“spark.ui.view.acls.groups”配置指定修改Web UI任务明细的访问者列表和组列表。
  • 配置场景 ORC文件格式是一种Hadoop生态圈中的列式存储格式,它最初产生自Apache Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。和Parquet文件格式类似,它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内按列进行存储,并且文件中的数据尽可能的压缩来降低存储空间的消耗。矢量化读取ORC格式的数据能够大幅提升ORC数据读取性能。在Spark2.3版本中,SparkSQL支持矢量化读取ORC数据(这个特性在Hive的历史版本中已经得到支持)。矢量化读取ORC格式的数据能够获得比传统读取方式数倍的性能提升。 该特性可以通过下面的配置项开启: “spark.sql.orc.enableVectorizedReader”:指定是否支持矢量化方式读取ORC格式的数据,默认为true。 “spark.sql.codegen.wholeStage”:指定是否需要将多个操作的所有stage编译为一个java方法,默认为true。 “spark.sql.codegen.maxFields”:指定codegen的所有stage所支持的最大字段数(包括嵌套字段),默认为100。 “spark.sql.orc.impl”:指定使用Hive还是Spark SQL native作为SQL执行引擎来读取ORC数据,默认为hive。
  • 配置描述 可以通过以下两种方式配置是否过滤掉分区表分区路径不存在的分区。 在Spark Driver端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 说明 默认值 spark.sql.hive.verifyPartitionPath 配置读取HIVE分区表时,是否过滤掉分区表分区路径不存在的分区。 “true”:过滤掉分区路径不存在的分区; “false”:不进行过滤。 false 在spark-submit命令提交应用时,通过“--conf”参数配置是否过滤掉分区表分区路径不存在的分区。 示例: spark-submit --class org.apache.spark.examples.SparkPi --conf spark.sql.hive.verifyPartitionPath=true $SPARK_HOME/lib/spark-examples_*.jar
  • 配置描述 提供两种不同的数据汇聚功能配置选项,两者在Spark JDBCServer服务端的tunning选项中进行设置,设置完后需要重启JDBCServer。 表1 参数说明 参数 说明 默认值 spark.sql.bigdata.thriftServer.useHdfsCollect 是否将结果数据保存到HDFS中而不是内存中。 优点:由于查询结果保存在hdfs端,因此基本不会造成JDBCServer的OOM。 缺点:速度慢。 true:保存至HDFS中 false:不使用该功能 须知: spark.sql.bigdata.thriftServer.useHdfsCollect参数设置为true时,将结果数据保存到HDFS中,但JobHistory原生页面上Job的描述信息无法正常关联到对应的SQL语句,同时spark-beeline命令行中回显的Execution ID为null,为解决JDBCServer OOM问题,同时显示信息正确,建议选择 spark.sql.userlocalFileCollect参数进行配置。 false spark.sql.uselocalFileCollect 是否将结果数据保存在本地磁盘中而不是内存里面。 优点:结果数据小数据量情况下和原生内存的方式相比性能损失可以忽略,大数据情况下(亿级数据)性能远比使用hdfs,以及原生内存方式好。 缺点:需要调优。大数据情况下建议JDBCServer driver端内存10G,executor端每个核心分配3G内存。 true:使用该功能 false: 不使用该功能 false spark.sql.collect.Hive 该参数在spark.sql.uselocalFileCollect开启的情况下生效。直接序列化的方式,还是间接序列化的方式保存结果数据到磁盘。 优点:针对分区数特别多的表查询结果汇聚性能优于直接使用结果数据保证在磁盘的方式。 缺点:和spark.sql.uselocalFileCollect开启时候的缺点一样。 true:使用该功能 false:不使用该功能 false spark.sql.collect.serialize 该参数在spark.sql.uselocalFileCollect, spark.sql.collect.Hive同时开启的情况下生效。 作用是进一步提升性能 java:采用java序列化方式收集数据。 kryo:采用kryo序列化方式收集数据,性能要比采用java好。 java 参数spark.sql.bigdata.thriftServer.useHdfsCollect和spark.sql.uselocalFileCollect不能同时设置为true。
  • 配置描述 在Spark Driver端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 说明 默认值 spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false spark.streaming.kafka010.inputstream.class 获取解耦在FusionInsight侧的类 org.apache.spark.streaming.kafka010.HWDirectKafkaInputDStream
  • 配置场景 Spark Streaming对接Kafka时,当Spark Streaming应用重启后,应用根据上一次读取的topic offset作为起始位置和当前topic最新的offset作为结束位置从Kafka上读取数据的。 Kafka服务的topic的leader异常后,若Kafka的leader和follower的offset相差太大,用户重启Kafka服务,Kafka的follower和leader相互切换,则Kafka服务重启后,topic的offset变小。 若Spark Streaming应用一直在运行,由于Kafka上topic的offset变小,会导致读取Kafka数据的起始位置比结束位置大,这样将无法从Kafka读取数据,应用报错。 若在重启Kafka服务前,先停止Spark Streaming应用,等Kafka重启后,再重启Spark Streaming应用使应用从checkpoint恢复。此时,Spark Streaming应用会记录终止前读取到的offset位置,以此为基准读取后面的数据,而Kafka offset变小(例如从10万变成1万),Spark Streaming会等待Kafka leader的offset增长至10万之后才会去消费,导致新发送的offset在1万至10万之间的数据丢失。 针对上述背景,提供配置Streaming对接Kafka更高级别的可靠性。对接Kafka可靠性功能开启后,上述场景处理方式如下。 若Spark Streaming应用在运行应用时Kafka上topic的offset变小,则会将Kafka上topic最新的offset作为读取Kafka数据的起始位置,继续读取后续的数据。 对于已经生成但未调度处理的任务,若读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务会运行失败。 若任务失败过多,则会将executor加入黑名单,从而导致后续的任务无法部署运行。此时用户可以通过配置“spark.blacklist.enabled”参数关闭黑名单功能,黑名单功能默认为开启。 若Kafka上topic的offset变小后,Spark Streaming应用进行重启恢复终止前未处理完的任务若读取的Kafka offset区间大于Kafka上topic的最新offset,则该任务直接丢弃,不进行处理。 若Streaming应用中使用了state函数,则不允许开启对接Kafka可靠性功能。
  • 配置场景 在某些场景下,当任务已经启动后,用户想要修改日志级别以定位问题或者查看想要的信息。 用户可以在进程启动前,在进程的JVM参数中增加参数“-Dlog4j.configuration.watch=true”来打开动态设置日志级别的功能。进程启动后,就可以通过修改进程对应的log4j配置文件,来调整日志打印级别。 目前支持动态设置日志级别功能的有:Driver日志、Executor日志、AM日志、JobHistory日志、JDBCServer日志。 允许设置的日志级别是:FATAL,ERROR,WARN,INFO,DEBUG,TRACE和ALL。
  • 配置描述 在进程对应的JVM参数配置项中增加以下参数。 表1 参数描述 参数 描述 默认值 -Dlog4j.configuration.watch 进程JVM参数,设置成“true”用于打开动态设置日志级别功能。 未配置,即为false。 Driver、Executor、AM进程的JVM参数如表2所示。在Spark客户端的配置文件“spark-defaults.conf”中进行配置。Driver、Executor、AM进程的日志级别在对应的JVM参数中的“-Dlog4j.configuration”参数指定的log4j配置文件中设置。 表2 进程的JVM参数1 参数 说明 默认日志级别 spark.driver.extraJavaOptions Driver的JVM参数。 INFO spark.executor.extraJavaOptions Executor的JVM参数。 INFO spark.yarn.am.extraJavaOptions AM的JVM参数。 INFO JobHistory Server和JDBCServer的JVM参数如表3所示。在服务端配置文件“ENV_VARS”中进行配置。JobHistory Server和JDBCServer的日志级别在服务端配置文件“log4j.properties”中设置。 表3 进程的JVM参数2 参数 说明 默认日志级别 GC_OPTS JobHistory Server的JVM参数。 INFO SPARK_SUBMIT_OPTS JDBCServer的JVM参数。 INFO 示例: 为了动态修改Executor日志级别为DEBUG,在进程启动之前,修改“spark-defaults.conf”文件中的Executor的JVM参数“spark.executor.extraJavaOptions”,增加如下配置: -Dlog4j.configuration.watch=true 提交用户应用后,修改“spark.executor.extraJavaOptions”中“-Dlog4j.configuration”参数指定的log4j日志配置文件(例如:“-Dlog4j.configuration=file:${BIGDATA_HOME}/FusionInsight_Spark2x_8.1.0.1/install/FusionInsight-Spark2x-*/spark/conf/log4j-executor.properties”)中的日志级别为DEBUG,如下所示: log4j.rootCategory=DEBUG, sparklog DEBUG级别生效会有一定的时延。
  • 配置场景 当Spark Streaming应用与Kafka对接,Spark Streaming应用异常终止并从checkpoint恢复重启后,对于进入Kafka数据的任务,系统默认优先处理应用终止前(A段时间)未完成的任务和应用终止到重启完成这段时间内(B段时间)进入Kafka数据生成的任务,最后再处理应用重启完成后(C段时间)进入Kafka数据生成的任务。并且对于B段时间进入Kafka的数据,Spark将按照终止时间(batch时间)生成相应个数的任务,其中第一个任务读取全部数据,其余任务可能不读取数据,造成任务处理压力不均匀。 若A段时间的任务和B段时间任务处理得较慢,则会影响C段时间任务的处理。针对上述场景,Spark提供Kafka后进先出功能。 图1 Spark Streaming应用重启时间轴 开启此功能后,Spark将优先调度C段时间内的任务,若存在多个C段任务,则按照任务产生的先后顺序调度执行,再执行A段时间和B段时间的任务。另外,对于B段时间进入Kafka的数据,Spark除了按照终止时间生成相应任务,还将这个期间进入Kafka的所有数据均匀分配到各个任务,避免任务处理压力不均匀。 约束条件: 目前该功能只适用于Spark Streaming中的Direct方式,且执行结果与上一个batch时间处理结果没有依赖关系(即无state操作,如updatestatebykey)。对多条数据输入流,需要相对独立无依赖的状态,否则可能导致数据切分后结果发生变化。 Kafka后进先出功能的开启要求应用只能对接Kafka输入源。 若提交应用的同时开启Kafka后进先出和流控功能,对于B段时间进入Kafka的数据,将不启动流控功能,以确保读取这些数据的任务调度优先级最低。应用重新启动后C段时间的任务启用流控功能。
  • 配置场景 SparkSQL在进行shuffle操作时默认的分块数为200。在数据量特别大的场景下,使用默认的分块数就会造成单个数据块过大。如果一个任务产生的单个shuffle数据块大于2G,该数据块在被fetch的时候还会报类似错误: Adjusted frame length exceeds 2147483647: 2717729270 - discarded 例如,SparkSQL运行TPCDS 500G的测试时,使用默认配置出现错误。所以当数据量较大时需要适当的调整该参数。
  • 配置场景 当前,在YARN-Client和YARN-Cluster模式下,两种模式的客户端存在冲突的配置,即当客户端为一种模式的配置时,会导致在另一种模式下提交任务失败。 为避免出现如上情况,添加表1中的配置项,避免两种模式下来回切换参数,提升软件易用性。 YARN-Cluster模式下,优先使用新增配置项的值,即服务端路径和参数。 YARN-Client模式下,直接使用原有的三个配置项的值。 原有的三个配置项为:“spark.driver.extraClassPath”、“spark.driver.extraJavaOptions”、“spark.driver.extraLibraryPath”。 不添加表1中配置项时,使用方式与原有方式一致,程序可正常执行,只是在不同模式下需切换配置。
  • 配置场景 当前版本对于parquet表的压缩格式分以下两种情况进行配置: 对于分区表,需要通过parquet本身的配置项“parquet.compression”设置parquet表的数据压缩格式。如在建表语句中设置tblproperties:"parquet.compression"="snappy"。 对于非分区表,需要通过“spark.sql.parquet.compression.codec”配置项来设置parquet类型的数据压缩格式。直接设置“parquet.compression”配置项是无效的,因为它会读取“spark.sql.parquet.compression.codec”配置项的值。当“spark.sql.parquet.compression.codec”未做设置时默认值为“snappy”,“parquet.compression”会读取该默认值。 因此,“spark.sql.parquet.compression.codec”配置项只适用于设置非分区表的parquet压缩格式。
  • 配置场景 当前Spark SQL执行一个查询时需要使用大量的内存,尤其是在做聚合(Aggregate)和关联(Join)操作时,此时如果内存有限的情况下就很容易出现OutOfMemoryError。有限内存下的稳定性就是确保在有限内存下依然能够正确执行相关的查询,而不出现OutOfMemoryError。 有限内存并不意味着内存无限小,它只是在内存不足于放下大于内存可用总量几倍的数据时,通过利用磁盘来做辅助从而确保查询依然稳定执行,但依然有一些数据是必须留在内存的,如在做涉及到Join的查询时,对于当前用于Join的相同key的数据还是需要放在内存中,如果该数据量较大而内存较小依然会出现OutOfMemoryError。 有限内存下的稳定性涉及到3个子功能: ExternalSort 外部排序功能,当执行排序时如果内存不足会将一部分数据溢出到磁盘中。 TungstenAggregate 新Hash聚合功能,默认对数据调用外部排序进行排序,然后再进行聚合,因此内存不足时在排序阶段会将数据溢出到磁盘,在聚合阶段因数据有序,在内存中只保留当前key的聚合结果,使用的内存较小。 SortMergeJoin、SortMergeOuterJoin 基于有序数据的等值连接。该功能默认对数据调用外部排序进行排序,然后再进行等值连接,因此内存不足时在排序阶段会将数据溢出到磁盘,在连接阶段因数据有序,在内存中只保留当前相同key的数据,使用的内存较小。
共100000条