华为云用户手册

  • Interceptors Flume的拦截器(Interceptor)支持在数据传输过程中修改或丢弃传输的基本单元Event。用户可以通过在配置中指定Flume内建拦截器的类名列表,也可以开发自定义的拦截器来实现Event的修改或丢弃。Flume内建支持的拦截器如下表所示,本章节会选取一个较为复杂的作为示例。其余的用户可以根据需要自行配置使用。官网参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html 拦截器用在Flume的Source、Channel之间,大部分的Source都带有Interceptor参数。用户可以依据需要配置。 Flume支持一个Source配置多个拦截器,各拦截器名称用空格分开。 指定拦截器的顺序就是它们被调用的顺序。 使用拦截器在Header中插入的内容,都可以在Sink中读取并使用。 表5 Flume内建支持的拦截器类型 拦截器类型 简要描述 Timestamp Interceptor 该拦截器会在Event的Header中插入一个时间戳。 Host Interceptor 该拦截器会在Event的Header中插入当前Agent所在节点的IP或主机名。 Remove Header Interceptor 该拦截器会依据Header中包含的符合正则匹配的字符串,丢弃掉对应的Event。 UUID Interceptor 该拦截器会为每个Event的Header生成一个UUID字符串。 Search and Replace Interceptor 该拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。与Java Matcher.replaceAll() 的规则相同。 Regex Filtering Interceptor 该拦截器通过将Event的Body体解释为文本文件,与配置的正则表达式进行匹配来选择性的过滤Event。提供的正则表达式可用于排除或包含事件。 Regex Extractor Interceptor 该拦截器使用正则表达式抽取原始events中的内容,并将该内容加入events的header中。 下面以Regex Filtering Interceptor 为例说明Interceptor使用(其余的可参考官网配置): 表6 Regex Filtering Interceptor配置参数说明 选项名称 默认值 描述 type - 组件类型名称,必须写为regex_filter。 regex - 用于匹配事件的正则表达式。 excludeEvents false 默认收集匹配到的Event。设置为true,则会删除匹配的Event,保留不匹配的。 配置示例(为了方便观察,此模型使用了netcat tcp作为Source源,logger作为Sink)。配置好如下参数后,在Linux的配置的主机节点上执行Linux命令“telnet 主机名或IP 44444”,并任意敲入符合正则和不符合正则的字符串。会在日志中观察到,只有匹配到的字符串被传输了。 #define the source、channel、sink server.sources = r1 server.channels = c1 server.sinks = k1 #config the source server.sources.r1.type = netcat server.sources.r1.bind = ${主机IP} server.sources.r1.port = 44444 server.sources.r1.interceptors= i1 server.sources.r1.interceptors.i1.type= regex_filter server.sources.r1.interceptors.i1.regex= (flume)|(myflume) server.sources.r1.interceptors.i1.excludeEvents= false server.sources.r1.channels = c1 #config the channel server.channels.c1.type = memory server.channels.c1.capacity = 1000 server.channels.c1.transactionCapacity = 100 #config the sink server.sinks.k1.type = logger server.sinks.k1.channel = c1
  • Channel Selector Channel Selector可以允许一个Source对接多个Channel,通过选择不同的Selector类型来将Source的数据进行分流或者复制,目前Flume提供的Channel Selector有两种:Replicating和Multiplexing。 Replicating:表示Source的数据同步发送给所有Channel。 Multiplexing:表示根据Event中的Header的指定字段的值来进行判断,从而选择相应的Channel进行发送,从而起到根据业务类型进行分流的目的。 Replicating配置样例: client.sources = kafkasource client.channels = channel1 channel2 client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource client.sources.kafkasource.kafka.topics = topic1,topic2 client.sources.kafkasource.kafka.consumer.group.id = flume client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007 client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT client.sources.kafkasource.batchDurationMillis = 1000 client.sources.kafkasource.batchSize = 800 client.sources.kafkasource.channels = channel1 c el2 client.sources.kafkasource.selector.type = replicating client.sources.kafkasource.selector.optional = channel2 表1 Replicating配置样例参数说明 选项名称 默认值 描述 Selector.type replicating Selector类型,应配置为replicating Selector.optional - 可选Channel,可以配置为列表 Multiplexing配置样例: client.sources = kafkasource client.channels = channel1 channel2 client.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource client.sources.kafkasource.kafka.topics = topic1,topic2 client.sources.kafkasource.kafka.consumer.group.id = flume client.sources.kafkasource.kafka.bootstrap.servers = 10.69.112.108:21007 client.sources.kafkasource.kafka.security.protocol = SASL_PLAINTEXT client.sources.kafkasource.batchDurationMillis = 1000 client.sources.kafkasource.batchSize = 800 client.sources.kafkasource.channels = channel1 channel2 client.sources.kafkasource.selector.type = multiplexing client.sources.kafkasource.selector.header = myheader client.sources.kafkasource.selector.mapping.topic1 = channel1 client.sources.kafkasource.selector.mapping.topic2 = channel2 client.sources.kafkasource.selector.default = channel1 表2 Multiplexing配置样例参数说明 选项名称 默认值 描述 Selector.type replicating Selector类型,应配置为multiplexing Selector.header Flume.selector.header - Selector.default - - Selector.mapping.* - - Multiplexing类型的Selector的样例中,选择Event中Header名称为topic的字段来进行判断,当Header中topic字段的值为topic1时,向channel1发送该Event,当Header中topic字段的值为topic2时,向channel2发送该Event。 这种Selector需要借助Source中Event的特定Header来进行Channel的选择,需要根据业务场景选择合理的Header来进行数据分流。
  • 模块间性能 根据模块间极限性能对比,可以看到对于前端是SpoolDir Source的场景下,Kafka Sink和HDFS Sink都能满足吞吐量要求,但是HBase Sink由于自身写入性能较低的原因,会成为性能瓶颈,会导致数据都积压在Channel中。但是如果有必须使用HBase Sink或者其他性能容易成为瓶颈的Sink的场景时,可以选择使用Channel Selector或者Sink Group来满足性能要求。
  • 注意事项 仅在没有数据丢失的情况下支持将Decimal数据类型从较低精度更改为较高精度 例如: 无效场景:将Decimal数据精度从(10,2)更改为(10,5)无效,因为在这种情况下,只有scale增加,但总位数保持不变。 有效场景:将Decimal数据精度从(10,2)更改为(12,3)有效,因为总位数增加2,但是scale仅增加1,这不会导致任何数据丢失。 将Decimal数据类型从较低精度更改为较高精度,其允许的最大精度(precision,scale)范围为(38,38),并且只适用于不会导致数据丢失的有效提升精度的场景。
  • Hive权限模型 使用Hive组件,必须对Hive数据库和表(含外表和视图)拥有相应的权限。在MRS中,完整的Hive权限模型由Hive元数据权限与HDFS文件权限组成。使用数据库或表时所需要的各种权限都是Hive权限模型中的一种。 Hive元数据权限。 与传统关系型数据库类似,MRS的Hive数据库包含“建表”和“查询”权限,Hive表和列包含“查询”、“插入”和“删除”权限。Hive中还包含拥有者权限“OWNERSHIP”和“Hive管理员权限”。 Hive数据文件权限,即HDFS文件权限。 Hive的数据库、表对应的文件保存在HDFS中。默认创建的数据库或表保存在HDFS目录“/user/hive/warehouse”。系统自动以数据库名称和数据库中表的名称创建子目录。访问数据库或者表,需要在HDFS中拥有对应文件的权限,包含“读”、“写”和“执行”权限。 用户对Hive数据库或表执行不同操作时,需要关联不同的元数据权限与HDFS文件权限。例如,对Hive数据表执行查询操作,需要关联元数据权限“查询”,以及HDFS文件权限“读”和“写”。 使用Manager界面图形化的角色管理功能来管理Hive数据库和表的权限,只需要设置元数据权限,系统会自动关联HDFS文件权限,减少界面操作,提高效率。
  • 回答 当执行器中此次数据查询和加载所需要的堆外内存不足时,便会抛出此异常。 在这种情况下,请增大“carbon.unsafe.working.memory.in.mb”和“spark.yarn.executor.memoryOverhead”的值。 详细信息请参考如何在CarbonData中配置非安全内存? 该内存被数据查询和加载共享。所以如果加载和查询需要同时进行,建议将“carbon.unsafe.working.memory.in.mb”和“spark.yarn.executor.memoryOverhead”的值配置为2048 MB以上。 可以使用以下公式进行估算: 数据加载所需内存: (“carbon.number.of.cores.while.loading”的值[默认值 = 6]) x 并行加载数据的表格 x (“offheap.sort.chunk.size.inmb”的值[默认值 = 64 MB] + “carbon.blockletgroup.size.in.mb”的值[默认值 = 64 MB] + 当前的压缩率[64 MB/3.5]) = ~900 MB 每表格 数据查询所需内存: (SPARK_EXECUTOR_INSTANCES. [默认值 = 2]) x ( carbon.blockletgroup.size.in.mb [默认值 = 64 MB] +“carbon.blockletgroup.size.in.mb”解压内容[默认值 = 64 MB * 3.5]) x (每个执行器核数[默认值 = 1]) = ~ 600 MB
  • 操作步骤 使用Ranger管理员用户rangeradmin登录Ranger管理页面,具体操作可参考登录Ranger管理界面。 在首页中单击“HBASE”区域的组件插件名称如“HBase”。 单击“Add New Policy”,添加HBase权限控制策略。 根据业务需求配置相关参数。 表1 HBase权限参数 参数名称 描述 Policy Name 策略名称,可自定义,不能与本服务内其他策略名称重复。 Policy Conditions IP过滤策略,可自定义,配置当前策略适用的主机节点,可填写一个或多个IP或IP段,并且IP填写支持“*”通配符,例如:192.168.1.10,192.168.1.20或者192.168.1.*。 Policy Label 为当前策略指定一个标签,可以根据这些标签搜索报告和筛选策略。 HBase Table 将适用该策略的表。 可支持通配符“*”,例如“table1:*”表示table1下的所有表。 “Include”策略适用于当前输入的对象,“Exclude”表示策略适用于除去当前输入内容之外的其他对象。 说明: Ranger界面上HBase服务插件的“hbase.rpc.protection”参数值必须和HBase服务端的“hbase.rpc.protection”参数值保持一致。具体请参考Ranger界面添加或者修改HBase策略时,无法使用通配符搜索已存在的HBase表。 HBase Column-family 将适用该策略的列族。 “Include”策略适用于当前输入的对象,“Exclude”表示策略适用于除去当前输入内容之外的其他对象。 HBase Column 将适用该策略的列。 “Include”策略适用于当前输入的对象,“Exclude”表示策略适用于除去当前输入内容之外的其他对象。 Description 策略描述信息。 Audit Logging 是否审计此策略。 Allow Conditions 策略允许条件,配置本策略内允许的权限及例外。 在“Select Role”、“Select Group”、“Select User”列选择已创建好的需要授予权限的Role、用户组或用户,单击“Add Conditions”,添加策略适用的IP地址范围,单击“Add Permissions”,添加对应权限。 Read:读权限 Write:写权限 Create:创建权限 Admin:管理权限 Select/Deselect All:全选/取消全选 如需让当前条件中的用户或用户组管理本条策略,可勾选“Delegate Admin”使这些用户或用户组成为受委托的管理员。被委托的管理员可以更新、删除本策略,还可以基于原始策略创建子策略。 如需添加多条权限控制规则,可单击按钮添加。如需删除权限控制规则,可单击按钮删除。 Exclude from Allow Conditions:配置策略例外条件。 Deny All Other Accesses 是否拒绝其它所有访问。 True:拒绝其它所有访问 False:设置为False,可配置Deny Conditions。 Deny Conditions 策略拒绝条件,配置本策略内拒绝的权限及例外,配置方法与“Allow Conditions”类似。 拒绝条件的优先级高于“Allow Conditions”中配置的允许条件。 Exclude from Deny Conditions:配置排除在拒绝条件之外的例外规则。 表2 设置权限 任务场景 角色授权操作 设置HBase管理员权限 在首页中单击“HBase”区域的组件插件名称,例如“HBase”。 选择“Policy Name”为“all - table, column-family, column”的策略,单击按钮编辑策略。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 设置用户创建表的权限 在“HBase Table”配置表名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Create”。 该用户具有以下操作权限: create table drop table truncate table alter table enable table flush table flush region compact disable enable desc 设置用户写入数据的权限 在“HBase Table”配置表名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Write”。 该用户具有put,delete,append,incr,bulkload等操作权限。 设置用户读取数据的权限 在“HBase Table”配置表名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Read”。 该用户具有get,scan操作权限。 设置用户管理命名空间或表的权限 在“HBase Table”配置表名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Admin”。 该用户具有rsgroup,peer,assign,balance等操作权限。 设置列的读取或写入权限 在“HBase Table”配置表名。 在“HBase Column-family”配置列族名。 在“Allow Conditions”区域,单击“Select User”下选择框选择用户。 单击“Add Permissions”,勾选“Read”或者“Write”。 如果用户在hbase shell中执行desc操作,需要同时给该用户赋予hbase:quota表的读权限。 (可选)添加策略有效期。在页面右上角单击“Add Validity period”,设置“Start Time”和“End Time”,选择“Time Zone”。单击“Save”保存。如需添加多条策略有效期,可单击按钮添加。如需删除策略有效期,可单击按钮删除。 单击“Add”,在策略列表可查看策略的基本信息。等待策略生效后,验证相关权限是否正常。 如需禁用某条策略,可单击按钮编辑策略,设置策略开关为“Disabled”。 如果不再使用策略,可单击按钮删除策略。
  • 加密 Spark支持Akka和HTTP(广播和文件服务器)协议的SSL,但WebUI和块转移服务仍不支持SSL。 SSL必须在每个节点上配置,并使用特殊协议为通信涉及到的每个组件进行配置。 表24 参数说明 参数 描述 默认值 spark.ssl.enabled 是否在所有被支持协议上开启SSL连接。 与spark.ssl.xxx类似的所有SSL设置指示了所有被支持协议的全局配置。为了覆盖特殊协议的全局配置,在协议指定的命名空间中必须重写属性。 使用“spark.ssl.YYY.XXX”设置覆盖由YYY指示的特殊协议的全局配置。目前YYY可以是基于Akka连接的akka或广播与文件服务器的fs。 false spark.ssl.enabledAlgorithms 以逗号分隔的密码列表。指定的密码必须被JVM支持。 - spark.ssl.keyPassword key-store的私人密钥密码。 - spark.ssl.keyStore key-store文件的路径。该路径可以绝对或相对于开启组件的目录。 - spark.ssl.keyStorePassword key-store的密码。 - spark.ssl.protocol 协议名。该协议必须被JVM支持。本页所有协议的参考表。 - spark.ssl.trustStore trust-store文件的路径。该路径可以绝对或相对于开启组件的目录。 - spark.ssl.trustStorePassword trust-store的密码。 -
  • 开启Spark进程间的认证机制 目前Spark进程间支持共享密钥方式的认证机制,通过配置spark.authenticate可以控制Spark在通信过程中是否做认证。这种认证方式只是通过简单的握手来确定通信双方享有共同的密钥。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表26 参数说明 参数 描述 默认值 spark.authenticate 在Spark on YARN模式下,将该参数配置成true即可。密钥的生成和分发过程是自动完成的,并且每个应用独占一个密钥。 true
  • PORT 表21 参数说明 参数 描述 默认值 spark.ui.port 应用仪表盘的端口,显示内存和工作负载数据。 JDBCServer2x:4040 SparkResource2x:0 spark.blockManager.port 所有BlockManager监测的端口。这些同时存在于Driver和Executor上。 随机端口范围 spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 随机端口范围
  • Compression 数据压缩是一个以CPU换内存的优化策略,因此当Spark内存严重不足的时候(由于内存计算的特质,这种情况非常常见),使用压缩可以大幅提高性能。目前Spark支持三种压缩算法:snappy,lz4,lzf。Snappy为默认压缩算法,并且调用native方法进行压缩与解压缩,在Yarn模式下需要注意堆外内存对Container进程的影响。 表27 参数说明 参数 描述 默认值 spark.io.compression.codec 用于压缩内部数据的codec,例如RDD分区、广播变量和shuffle输出。默认情况下,Spark支持三种压缩算法:lz4,lzf和snappy。可以使用完全合格的类名称指定算法,例如org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.LZFCompressionCodec及org.apache.spark.io.SnappyCompressionCodec。 lz4 spark.io.compression.lz4.block.size 当使用LZ4压缩算法时LZ4压缩中使用的块大小(字节)。当使用LZ4时降低块大小同样也会降低shuffle内存使用。 32768 spark.io.compression.snappy.block.size 当使用Snappy压缩算法时Snappy压缩中使用的块大小(字节)。当使用Snappy时降低块大小同样也会降低shuffle内存使用。 32768 spark.shuffle.compress 是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。 true spark.shuffle.spill.compress 是否压缩在shuffle期间溢出的数据。使用spark.io.compression.codec进行压缩。 true spark.eventLog.compress 设置当spark.eventLog.enabled设置为true时是否压缩记录的事件。 false spark.broadcast.compress 在发送之前是否压缩广播变量。建议压缩。 true spark.rdd.compress 是否压缩序列化的RDD分区(例如StorageLevel.MEMORY_ONLY_SER的分区)。牺牲部分额外CPU的时间可以节省大量空间。 false
  • 安全性 Spark目前支持通过共享密钥认证。可以通过spark.authenticate配置参数配置认证。该参数控制Spark通信协议是否使用共享密钥执行认证。该认证是确保双边都有相同的共享密钥并被允许通信的基本握手。如果共享密钥不同,通信将不被允许。共享密钥通过如下方式创建: 对于YARN部署的Spark,将spark.authenticate配置为真会自动处理生成和分发共享密钥。每个应用程序会独占一个共享密钥。 对于其他类型部署的Spark,应该在每个节点上配置Spark参数spark.authenticate.secret。所有Master/Workers和应用程序都将使用该密钥。 表25 参数说明 参数 描述 默认值 spark.acls.enable 是否开启Spark acls。如果开启,它将检查用户是否有访问和修改job的权限。请注意这要求用户可以被识别。如果用户被识别为无效,检查将不被执行。UI可以使用过滤器认证和设置用户。 true spark.admin.acls 逗号分隔的有权限访问和修改所有Spark job的用户/管理员列表。如果在共享集群上运行并且工作时有MRS集群管理员或开发人员帮助调试,可以使用该列表。 admin spark.authenticate 是否Spark认证其内部连接。如果不是运行在YARN上,请参见spark.authenticate.secret。 true spark.authenticate.secret 设置Spark各组件之间验证的密钥。如果不是运行在YARN上且认证未开启,需要设置该项。 - spark.modify.acls 逗号分隔的有权限修改Spark job的用户列表。默认情况下只有开启Spark job的用户才有修改列表的权限(例如删除列表)。 - spark.ui.view.acls 逗号分隔的有权限访问Spark web ui的用户列表。默认情况下只有开启Spark job的用户才有访问权限。 -
  • TIMEOUT Spark默认配置能很好的处理中等数据规模的计算任务,但一旦数据量过大,会经常出现超时导致任务失败的场景。在大数据量场景下,需调大Spark中的超时参数。 表23 参数说明 参数 描述 默认值 spark.files.fetchTimeout 获取通过驱动程序的SparkContext.addFile()添加的文件时的通信超时(秒)。 60s spark.network.timeout 所有网络交互的默认超时(秒)。如未配置,则使用该配置代替spark.core.connection.ack.wait.timeout, spark.akka.timeout, spark.storage.blockManagerSlaveTimeoutMs或spark.shuffle.io.connectionTimeout。 360s spark.core.connection.ack.wait.timeout 连接时应答的超时时间(单位:秒)。为了避免由于GC带来的长时间等待,可以设置更大的值。 60
  • WebUI WebUI展示了Spark应用运行的过程和状态。 表13 参数说明 参数 描述 默认值 spark.ui.killEnabled 允许停止Web UI中的stage和相应的job。 说明: 出于安全考虑,将此配置项的默认值设置成false,以避免用户发生误操作。如果需要开启此功能,则可以在spark-defaults.conf配置文件中将此配置项的值设为true。请谨慎操作。 true spark.ui.port 应用程序dashboard的端口,显示内存和工作量数据。 JDBCServer2x:4040 SparkResource2x:0 IndexServer2x:22901 spark.ui.retainedJobs 在垃圾回收之前Spark UI和状态API记住的job数。 1000 spark.ui.retainedStages 在垃圾回收之前Spark UI和状态API记住的stage数。 1000
  • EventLog的周期清理 JobHistory上的Event log是随每次任务的提交而累积的,任务提交的次数多了之后会造成太多文件的存放。Spark提供了周期清理Evnet log的功能,用户可以通过配置开关和相应的清理周期参数来进行控制。 表17 参数说明 参数 描述 默认值 spark.history.fs.cleaner.enabled 是否打开清理功能。 true spark.history.fs.cleaner.interval 清理功能的检查周期。 1d spark.history.fs.cleaner.maxAge 日志的最长保留时间。 4d
  • Kryo Kryo是一个非常高效的Java序列化框架,Spark中也默认集成了该框架。几乎所有的Spark性能调优都离不开将Spark默认的序列化器转化为Kryo序列化器的过程。目前Kryo序列化只支持Spark数据层面的序列化,还不支持闭包的序列化。设置Kryo序列元,需要将配置项“spark.serializer”设置为“org.apache.spark.serializer.KryoSerializer”,同时也搭配设置以下的配置项,优化Kryo序列化的性能。 表18 参数说明 参数 描述 默认值 spark.kryo.classesToRegister 使用Kryo序列化时,需要注册到Kryo的类名,多个类之间用逗号分隔。 - spark.kryo.referenceTracking 当使用Kryo序列化数据时,是否跟踪对同一个对象的引用情况。适用于对象图有循环引用或同一对象有多个副本的情况。否则可以设置为关闭以提升性能。 true spark.kryo.registrationRequired 是否需要使用Kryo来注册对象。当设为“true”时,如果序列化一个未使用Kryo注册的对象则会抛出异常。当设为“false”(默认值)时,Kryo会将未注册的类名称一同写到序列化对象中。该操作会带来大量性能开销,所以在用户还没有从注册队列中删除相应的类时应该开启该选项。 false spark.kryo.registrator 如果使用Kryo序列化,使用Kryo将该类注册至定制类。如果需要以定制方式注册类,例如指定一个自定义字段序列化器,可使用该属性。否则spark.kryo.classesToRegister会更简单。它应该设置为一个扩展KryoRegistrator的类。 - spark.kryoserializer.buffer.max Kryo序列化缓冲区允许的最大值,单位为兆字节。这个值必须大于尝试序列化的对象。当在Kryo中遇到“buffer limit exceeded”异常时可以适当增大该值。也可以通过配置项spark.kryoserializer.buffer.max配置。 64MB spark.kryoserializer.buffer Kryo序列化缓冲区的初始值,单位为兆字节。每个worker的每个核心都会有一个缓冲区。如果有需要,缓冲区会增大到spark.kryoserializer.buffer.max设置的值。也可以通过配置项spark.kryoserializer.buffer配置。 64KB
  • Broadcast Broadcast用于Spark进程间数据块的传输。Spark中无论Jar包、文件还是闭包以及返回的结果都会使用Broadcast。目前的Broadcast支持两种方式,Torrent与HTTP。前者将会把数据切成小片,分布到集群中,有需要时从远程获取;后者将文件存入到本地磁盘,有需要时通过HTTP方式将整个文件传输到远端。前者稳定性优于后者,因此Torrent为默认的Broadcast方式。 表19 参数说明 参数 描述 默认值 spark.broadcast.factory 使用的广播方式。 org.apache.spark.broadcast.TorrentBroadcastFactory spark.broadcast.blockSize TorrentBroadcastFactory的块大小。该值过大会降低广播时的并行度(速度变慢),过小可能会影响BlockManager的性能。 4096 spark.broadcast.compress 在发送广播变量之前是否压缩。建议压缩。 true
  • Storage 内存计算是Spark的最大亮点,Spark的Storage主要管理内存资源。Storage中主要存储RDD在Cache过程中产生的数据块。JVM中堆内存是整体的,因此在Spark的Storage管理中,“Storage Memory Size”变成了一个非常重要的概念。 表20 参数说明 参数 描述 默认值 spark.storage.memoryMapThreshold 超过该块大小的Block,Spark会对该磁盘文件进行内存映射。这可以防止Spark在内存映射时映射过小的块。一般情况下,对接近或低于操作系统的页大小的块进行内存映射会有高开销。 2m
  • EventLog Spark应用在运行过程中,实时将运行状态以JSON格式写入文件系统,用于HistoryServer服务读取并重现应用运行时状态。 表16 参数说明 参数 描述 默认值 spark.eventLog.enabled 是否记录Spark事件,用于应用程序在完成后重构webUI。 true spark.eventLog.dir 如果spark.eventLog.enabled为true,记录Spark事件的目录。在此目录下,Spark为每个应用程序创建文件,并将应用程序的事件记录到文件中。用户也可设置为统一的与HDFS目录相似的地址,这样History server就可以读取历史文件。 hdfs://hacluster/spark2xJobHistory2x spark.eventLog.compress spark.eventLog.enabled为true时,是否压缩记录的事件。 false
  • HistoryServer HistoryServer读取文件系统中的EventLog文件,展示已经运行完成的Spark应用在运行时的状态信息。 表14 参数说明 参数 描述 默认值 spark.history.fs.logDirectory History server的日志目录 - spark.history.ui.port JobHistory侦听连接的端口。 18080 spark.history.fs.updateInterval History server所显示信息的更新周期,单位为秒。每次更新检查持久存储中针对事件日志进行的更改。 10s spark.history.fs.update.interval.seconds 每个事件日志更新检查的间隔。与spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。 10s spark.history.updateInterval 该配置项与spark.history.fs.update.interval.seconds和spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。 10s
  • Driver配置 Spark Driver可以理解为Spark提交应用的客户端,所有的代码解析工作都在这个进程中完成,因此该进程的参数尤其重要。下面将以如下顺序介绍Spark中进程的参数设置: JavaOptions:Java命令中“-D”后面的参数,可以由System.getProperty获取。 ClassPath:包括Java类和Native的Lib加载路径。 Java Memory and Cores:Java进程的内存和CPU使用量。 Spark Configuration:Spark内部参数,与Java进程无关。 表10 参数说明 参数 描述 默认值 spark.driver.extraJavaOptions 传递至driver(驱动程序)的一系列额外JVM选项。例如,GC设置或其他日志记录。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置参数 spark.driver.extraClassPath 附加至driver的classpath的额外classpath条目。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置参数 spark.driver.userClassPathFirst (试验性)当在驱动程序中加载类时,是否授权用户添加的jar优先于Spark自身的jar。这种特性可用于减缓Spark依赖和用户依赖之间的冲突。目前该特性仍处于试验阶段,仅用于Cluster模式中。 false spark.driver.extraLibraryPath 设置一个特殊的library path在启动驱动程序JVM时使用。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 JDBCServer2x: ${SPARK_INSTALL_HOME}/spark/native SparkResource2x: ${DATA_NODE_INSTALL_HOME}/hadoop/lib/native spark.driver.cores 驱动程序进程使用的核数。仅适用于Cluster模式。 1 spark.driver.memory 驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512M, 2G)。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 4G spark.driver.maxResultSize 对每个Spark action操作(例如“collect”)的所有分区序列化结果的总量限制,至少1M,设置成0表示不限制。如果总量超过该限制,工作任务会中止。限制值设置过高可能会引起驱动程序的内存不足错误(取决于spark.driver.memory和JVM的对象内存开销)。设置合理的限制可以避免驱动程序出现内存不足的错误。 1G spark.driver.host Driver监测的主机名或IP地址,用于Driver与Executor进行通信。 (local hostname) spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 (random)
  • 普通Shuffle配置 表9 参数说明 参数 描述 默认值 spark.shuffle.spill 若设为“true”,通过将数据溢出至磁盘来限制reduce任务期间内存的使用量。 true spark.shuffle.spill.compress 是否压缩shuffle期间溢出的数据。使用spark.io.compression.codec指定的算法进行数据压缩。 true spark.shuffle.file.buffer 每个shuffle文件输出流的内存缓冲区大小(单位:KB)。这些缓冲区可以减少创建中间shuffle文件流过程中产生的磁盘寻道和系统调用次数。也可以通过配置项spark.shuffle.file.buffer.kb设置。 32KB spark.shuffle.compress 是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。 true spark.reducer.maxSizeInFlight 从每个reduce任务同时fetch的map任务输出最大值(单位:MB)。由于每个输出要求创建一个缓冲区进行接收,这代表了每个reduce任务固定的内存开销,所以除非拥有大量内存,否则保持低值。也可以通过配置项spark.reducer.maxMbInFlight设置。 48MB
  • ExecutorLaucher配置 ExecutorLauncher只有在Yarn-Client模式下才会存在的角色,Yarn-Client模式下,ExecutorLauncher和Driver不在同一个进程中,需要对ExecutorLauncher的参数进行特殊的配置。 表11 参数说明 参数 描述 默认值 spark.yarn.am.extraJavaOptions 在Client模式下传递至YARN Application Master的一系列额外JVM选项。在Cluster模式下使用spark.driver.extraJavaOptions。 参考快速配置参数 spark.yarn.am.memory 针对Client模式下YARN Application Master使用的内存数量,与JVM内存设置字符串格式一致(例如:512m,2g)。在集群模式下,使用spark.driver.memory。 1G spark.yarn.am.memoryOverhead 和“spark.yarn.driver.memoryOverhead”一样,但只针对Client模式下的Application Master。 - spark.yarn.am.cores 针对Client模式下YARN Application Master使用的核数。在Cluster模式下,使用spark.driver.cores。 1
  • Executor配置 Executor也是单独一个Java进程,但不像Driver和AM只有一个,Executor可以有多个进程,而目前Spark只支持相同的配置,即所有Executor的进程参数都必然是一样的。 表12 参数说明 参数 描述 默认值 spark.executor.extraJavaOptions 传递至Executor的额外JVM选项。例如,GC设置或其他日志记录。请注意不能通过此选项设置Spark属性或heap大小。Spark属性应该使用SparkConf对象或调用spark-submit脚本时指定的spark-defaults.conf文件来设置。Heap大小可以通过spark.executor.memory来设置。 参考快速配置参数 spark.executor.extraClassPath 附加至Executor classpath的额外的classpath。这主要是为了向后兼容Spark的历史版本。用户一般不用设置此选项。 - spark.executor.extraLibraryPath 设置启动executor JVM时所使用的特殊的library path。 参考快速配置参数 spark.executor.userClassPathFirst (试验性)与spark.driver.userClassPathFirst相同的功能,但应用于Executor实例。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512M,2G)。 4G spark.executorEnv.[EnvironmentVariableName] 添加由EnvironmentVariableName指定的环境变量至executor进程。用户可以指定多个来设置多个环境变量。 - spark.executor.logs.rolling.maxRetainedFiles 设置系统即将保留的最新滚动日志文件的数量。旧的日志文件将被删除。默认关闭。 - spark.executor.logs.rolling.size.maxBytes 设置滚动Executor日志的文件的最大值。默认关闭。数值以字节为单位设置。若要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 - spark.executor.logs.rolling.strategy 设置executor日志的滚动策略。默认滚动关闭。可以设置为“time”(基于时间的滚动)或“size”(基于大小的滚动)。当设置为“time”,使用spark.executor.logs.rolling.time.interval属性的值作为日志滚动的间隔。当设置为“size”,使用spark.executor.logs.rolling.size.maxBytes设置滚动的最大文件大小滚动。 - spark.executor.logs.rolling.time.interval 设置executor日志滚动的时间间隔。默认关闭。合法值为“daily”、“hourly”、“minutely”或任意秒。若要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 daily
  • Netty/NIO及Hash/Sort配置 Shuffle是大数据处理中最重要的一个性能点,网络是整个Shuffle过程的性能点。目前Spark支持两种Shuffle方式,一种是Hash,另外一种Sort。网络也有两种方式,Netty和NIO。 表8 参数说明 参数 描述 默认值 spark.shuffle.manager 处理数据的方式。有两种实现方式可用:sort和hash。sort shuffle对内存的使用率更高,是Spark 1.2及后续版本的默认选项。 SORT spark.shuffle.consolidateFiles (仅hash方式)若要合并在shuffle过程中创建的中间文件,需要将该值设置为“true”。文件创建的少可以提高文件系统处理性能,降低风险。使用ext4或者xfs文件系统时,建议设置为“true”。由于文件系统限制,在ext3上该设置可能会降低8核以上机器的处理性能。 false spark.shuffle.sort.bypassMergeThreshold 该参数只适用于spark.shuffle.manager设置为sort时。在不做map端聚合并且reduce任务的partition数小于或等于该值时,避免对数据进行归并排序,防止系统处理不必要的排序引起性能下降。 200 spark.shuffle.io.maxRetries (仅Netty方式)如果设为非零值,由于IO相关的异常导致的fetch失败会自动重试。该重试逻辑有助于大型shuffle在发生长GC暂停或者网络闪断时保持稳定。 12 spark.shuffle.io.numConnectionsPerPeer (仅Netty方式)为了减少大型集群的连接创建,主机间的连接会被重新使用。对于拥有较多硬盘和少数主机的集群,此操作可能会导致并发性不足以占用所有磁盘,所以用户可以考虑增加此值。 1 spark.shuffle.io.preferDirectBufs (仅Netty方式)使用off-heap缓冲区减少shuffle和高速缓存块转移期间的垃圾回收。对于off-heap内存被严格限制的环境,用户可以将其关闭以强制所有来自Netty的申请使用堆内内存。 true spark.shuffle.io.retryWait (仅Netty方式)等待fetch重试期间的时间(秒)。重试引起的最大延迟为maxRetries * retryWait,默认是15秒。 5
  • Spark长时间任务安全认证配置 安全模式下,使用Spark CLI(如spark shell、spark sql、spark submit)时,如果使用kinit命令进行安全认证,当执行长时间运行任务时,会因为认证过期导致任务失败。 在客户端的“spark-defaults.conf”配置文件中设置如下参数,配置完成后,重新执行Spark CLI即可。 当参数值为“true”时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 表3 参数说明 参数名称 含义 默认值 spark.kerberos.principal 具有Spark操作权限的principal。请联系MRS集群管理员获取对应principal。 - spark.kerberos.keytab 具有Spark操作权限的Keytab文件名称和文件路径。请联系MRS集群管理员获取对应Keytab文件。 - spark.security.bigdata.loginOnce Principal用户是否只登录一次。true为单次登录;false为多次登录。 单次登录与多次登录的区别在于:Spark社区使用多次Kerberos用户登录多次的方案,但容易出现TGT过期或者Token过期异常导致应用无法长时间运行。DataSight修改了Kerberos登录方式,只允许用户登录一次,可以有效的解决过期问题。限制在于,Hive相关的principal与keytab的配置项必须与Spark配置相同。 说明: 当参数值为true时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 true
  • Spark Streaming Kafka Receiver是Spark Streaming一个重要的组成部分,它负责接收外部数据,并将数据封装为Block,提供给Streaming消费。最常见的数据源是Kafka,Spark Streaming对Kafka的集成也是最完善的,不仅有可靠性的保障,而且也支持从Kafka直接作为RDD输入。 表7 参数说明 参数 描述 默认值 spark.streaming.kafka.maxRatePerPartition 使用Kafka direct stream API时,从每个Kafka分区读取数据的最大速率(每秒记录数量)。 - spark.streaming.blockInterval 在被存入Spark之前Spark Streaming Receiver接收数据累积成数据块的间隔(毫秒)。推荐最小值为50毫秒。 200ms spark.streaming.receiver.maxRate 每个Receiver接收数据的最大速率(每秒记录数量)。配置设置为0或者负值将不会对速率设限。 - spark.streaming.receiver.writeAheadLog.enable 是否使用ReliableKafkaReceiver。该Receiver支持流式数据不丢失。 false
  • Spark Streaming Spark Streaming是在Spark批处理平台提供的流式数据的处理能力,以“mini-batch”的方式处理从外部输入的数据。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表6 参数说明 参数 描述 默认值 spark.streaming.receiver.writeAheadLog.enable 启用预写日志(WAL)功能。所有通过Receiver接收的输入数据将被保存至预写日志,预写日志可以保证Driver程序出错后数据可以恢复。 false spark.streaming.unpersist 由Spark Streaming产生和保存的RDDs自动从Spark的内存中强制移除。Spark Streaming接收的原始输入数据也将自动清除。设置为false时原始输入数据和存留的RDDs不会自动清除,因此在streaming应用外部依然可以访问,但是这会占用更多的Spark内存。 true
  • 配置是否使用笛卡尔积功能 要启动使用笛卡尔积功能,需要在Spark的“spark-defaults.conf”配置文件中进行如下设置。 表2 笛卡尔积参数说明 参数 说明 默认值 spark.sql.crossJoin.enabled 是否允许隐性执行笛卡尔积。 “true”表示允许 “false”表示不允许,此时只允许query中显式包含CROSS JOIN语法。 true JDBC应用在服务端的“spark-defaults.conf”配置文件中设置该参数。 Spark客户端提交的任务在客户端配的“spark-defaults.conf”配置文件中设置该参数。
  • Python Spark Python Spark是Spark除了Scala、Java两种API之外的第三种编程语言。不同于Java和Scala都是在JVM平台上运行,Python Spark不仅会有JVM进程,还会有自身的Python进程。以下配置项只适用于Python Spark场景,而其他配置项也同样可以在Python Spark中生效。 表4 参数说明 参数 描述 默认值 spark.python.profile 在Python worker中开启profiling。通过sc.show_profiles()展示分析结果。或者在driver退出前展示分析结果。可以通过sc.dump_profiles(path) 将结果转储到磁盘中。如果一些分析结果已经手动展示,那么在Driver退出前,它们将不会再自动展示。 默认使用pyspark.profiler.BasicProfiler,可以在初始化SparkContext时传入指定的profiler来覆盖默认的profiler。 false spark.python.worker.memory 聚合过程中每个python worker进程所能使用的内存大小,其值格式同指定JVM内存一致,如512m,2g。如果进程在聚集期间所用的内存超过了该值,数据将会被写入磁盘。 512m spark.python.worker.reuse 是否重用python worker。如是,它将使用固定数量的Python workers,那么下一批提交的task将重用这些Python workers,而不是为每个task重新fork一个Python进程。 该功能在大型广播下非常有用,因为此时对下一批提交的task不需要将数据从JVM再一次传输至Python worker。 true
共100000条