华为云用户手册

  • 前提条件 已创建包含Doris服务的集群,集群内各服务运行正常。 待连接Doris数据库的节点与MRS集群网络互通。 创建具有Doris管理权限的用户。 集群已启用Kerberos认证(安全模式) 在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。 使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。 Doris中已安装并启动DBroker实例。 已安装Hive客户端。 如果Doris通过Broker Load跨集群导入数据,需要配置跨集群互信,相关操作可参考配置跨Manager集群互信。
  • 操作场景 该任务指导MRS集群管理员在Manager创建并设置Hive的角色。Hive角色可设置Hive管理员权限以及Hive数据表的数据操作权限。 用户使用Hive并创建数据库需要加入hive组,不需要角色授权。用户在Hive和HDFS中对自己创建的数据库或表拥有完整权限,可直接创建表、查询数据、删除数据、插入数据、更新数据以及授权他人访问表与对应HDFS目录与文件。默认创建的数据库或表保存在HDFS目录“/user/hive/warehouse”。 安全模式支持创建Hive角色,普通模式不支持创建Hive角色。 如果当前组件使用了Ranger进行权限控制,须基于Ranger配置相关策略进行权限管理,具体操作可参考添加Hive的Ranger访问权限策略。
  • 使用Yarn客户端 安装客户端,具体请参考安装MRS客户端。 以客户端安装用户,登录安装客户端的节点。 执行以下命令,切换到客户端安装目录。 cd /opt/client 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行以下命令进行用户认证。普通模式集群无需执行用户认证。 kinit 组件业务用户 直接执行Yarn命令。例如: yarn application -list
  • 转换流程 Loader读取源端数据,通过输入算子将数据按规则逐一转换成字段,再通过转换算子,对这些字段做清洗或转换,最后通过输出算子将处理后的字段,输出到目标端。 每个作业,如果进行数据转换操作,有且只能有一个输入算子,有且只能有一个输出算子。 不符合转换规则的数据,将成为脏数据跳过。 从关系型数据库导入数据到HDFS/OBS,可以不用配置数据转换,数据将按“,”分隔保存到HDFS/OBS。 从HDFS/OBS导出数据到关系型数据库,可以不用配置数据转换,数据将按“,”分隔保存到关系型数据库。
  • 算子简介 Loader算子包括以下类型: 输入算子 数据转换的第一步,负责将数据转换成字段,每次转换有且只能有一种输入算子,涉及HBase或Hive导入导出时,必须填写。 转换算子 数据转换的中间转换步骤,属于可选类型,各个转换算子可任意搭配使用。转换算子是针对字段而言,必须先使用输入算子,将数据转换成字段。 输出算子 数据转换的最后一步,每次转换有且只能有一种输出算子,用于输出处理后的字段。涉及HBase或Hive导入导出时,必须填写。 表1 算子分类一览表 类型 描述 输入 CSV文件输入:将文件的每一行按指定分隔符转换成多个输入字段。 固定宽度文件输入:将文件的每一行,按可配置长度的字符或字节,转换成多个输入字段。 表输入:将关系型数据库表的指定列按顺序转换成同等数量的输入字段。 HBase输入:将HBase表的指定列转换成同等数量的输入字段。 HTML输入:将HTML文件中的元素转换成输入字段。 Hive输入:将Hive表的指定列转换成同等数量的输入字段。 转换 长整型时间转换:实现长整型数值与日期类型的互换。 空值转换:将空值替换成指定值。 增加常量字段:生成常量字段。 随机值转换:生成随机数字段。 拼接转换:拼接已有字段,生成新字段。 分隔转换:将已有字段,按指定分隔符,分隔出新字段。 取模转换:对已有字段取模,生成新字段。 剪切字符串:通过指定起始位置,截取已有字符串类型的字段,生成新字段。 EL操作转换:指定算法,对字段值进行运算,目前支持的算法有:md5sum、sha1sum、sha256sum和sha512sum等。 字符串大小写转换:对已有的字符串类型字段,切换大小写,生成新字段。 字符串逆序转换:对已有的字符串类型字段,做逆序变换,生成新字段。 字符串空格清除转换:对已有的字符串类型字段,清除左右空格,生成新字段。 过滤行转换:配置逻辑条件过滤掉含触发条件的行。 更新域:当满足某些条件时,更新字段的值。 输出 Hive输出:将已生成的字段输出到Hive表。 表输出:将已生成的字段输出到关系型数据库表。 文件输出:将已生成的字段通过分隔符连接并输出到文件。 HBase输出:将已生成的字段输出到HBase表。
  • 日志级别 Flume提供了如表2所示的日志级别。 运行日志的级别优先级从高到低分别是FATAL、ERROR、WARN、INFO、DEBUG,程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。 表2 日志级别 日志类型 级别 描述 运行日志 FATAL FATAL表示系统运行的致命错误信息。 ERROR ERROR表示系统运行的错误信息。 WARN WARN表示当前事件处理存在异常信息。 INFO INFO表示记录系统及各事件正常运行状态信息。 DEBUG DEBUG表示记录系统及系统的调试信息。 如果您需要修改日志级别,请执行如下操作: 请参考修改集群服务配置参数,进入Flume的“全部配置”页面。 左边菜单栏中选择所需修改的角色所对应的日志菜单。 选择所需修改的日志级别。 保存配置,在弹出窗口中单击“确定”使配置生效。 配置完成后即生效,不需要重启服务。
  • 参数说明 表1 算子参数说明 参数 含义 类型 是否必填 默认值 HBase表类型 配置HBase表类型,可选项为normal(普通HBase表)和phoenix表。 enum 是 normal NULL值处理方式 配置NULL值处理方式。选中单选框时是将转换为空字符串并保存。不选中单选框时是不保存数据。 boolean 否 不选中单选框 HBase输出字段 配置HBase输出信息: 字段名:配置输出字段的字段名。 表名:配置HBase表名。 列族名:配置HBase列族名,如果HBase/Phoenix建表时未配置列族名,默认列族名为 '0'。 列名:配置HBase列名。 类型:配置字段类型,字段类型为“DATE”或“TIME”或“TIMESTAMP”时,需指定特定时间格式,其他类型指定无效。时间格式如:“yyyyMMdd HH:mm:ss”。 长度:配置字段长度,字段值实际长度太长则按配置的长度截取,“类型”为“CHAR”时实际长度不足则空格补齐,“类型”为“VARCHAR”时实际长度不足则不补齐。 主键:配置是否为主键列。普通HBase表主键只能指定一个;phoenix表主键可以指定多个,配置多个列为主键时,会按照配置列的先后顺序对其进行拼接。必需配置一个主键列。 map 是 无
  • 操作步骤 以omm用户登录集群主OMS节点,修改配置文件“${CONTROLLER_HOME}/etc/om/controller.properties”中参数“controller.backup.conf.script.execute.timeout”值为“10000000”(根据当前集群中的DBService数据量调大超时时间)。 以omm用户登录集群备OMS节点,重复执行1。 以omm用户登录主OMS节点,执行以下命令查询BackupRecoveryPluginProcess进程id,并结束此进程。 jps|grep -i BackupRecoveryPluginProcess kill -9 查询到的PID 登录到Manager页面重新执行DBService备份任务。 执行以下命令查看BackupRecoveryPluginProcess进程是否已开启。 jps|grep -i BackupRecoveryPluginProcess
  • 操作步骤 读数据服务端调优 参数入口: 进入HBase服务参数“全部配置”界面,具体操作请参考修改集群服务配置参数章节。 表1 影响实时读数据配置项 配置参数 描述 默认值 GC_OPTS HBase利用内存完成读写操作。提高HBase内存可以有效提高HBase性能。 GC_OPTS主要需要调整HeapSize的大小和NewSize的大小。调整HeapSize大小的时候,建议将Xms和Xmx设置成相同的值,这样可以避免JVM动态调整HeapSize大小的时候影响性能。调整NewSize大小的时候,建议把其设置为HeapSize大小的1/8。 HMaster:当HBase集群规模越大、Region数量越多时,可以适当调大HMaster的GC_OPTS参数。 RegionServer:RegionServer需要的内存一般比HMaster要大。在内存充足的情况下,HeapSize可以相对设置大一些。 说明: 主HMaster的HeapSize为4G的时候,HBase集群可以支持100000 region数的规模。根据经验值,集群每增加35000个region,HeapSize增加2G,主HMaster的HeapSize不建议超过32GB。 HMaster -server -Xms4G -Xmx4G -XX:NewSize=512M -XX:MaxNewSize=512M -XX:MetaspaceSize=128M -XX:MaxMetaspaceSize=512M -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=65 -XX:+PrintGCDetails -Dsun.rmi.dgc.client.gcInterval=0x7FFFFFFFFFFFFFE -Dsun.rmi.dgc.server.gcInterval=0x7FFFFFFFFFFFFFE -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M Region Server -server -Xms6G -Xmx6G -XX:NewSize=1024M -XX:MaxNewSize=1024M -XX:MetaspaceSize=128M -XX:MaxMetaspaceSize=512M -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=65 -XX:+PrintGCDetails -Dsun.rmi.dgc.client.gcInterval=0x7FFFFFFFFFFFFFE -Dsun.rmi.dgc.server.gcInterval=0x7FFFFFFFFFFFFFE -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M hbase.regionserver.handler.count 表示RegionServer在同一时刻能够并发处理多少请求。如果设置过高会导致激烈线程竞争,如果设置过小,请求将会在RegionServer长时间等待,降低处理能力。根据资源情况,适当增加处理线程数。 建议根据CPU的使用情况,可以选择设置为100至300之间的值。 200 hfile.block.cache.size HBase缓存区大小,主要影响查询性能。根据查询模式以及查询记录分布情况来决定缓存区的大小。如果采用随机查询使得缓存区的命中率较低,可以适当降低缓存区大小。 当offheap关闭时,默认值为0.25。当offheap开启时,默认值是0.1。 如果同时存在读和写的操作,这两种操作的性能会互相影响。如果写入导致的flush和Compaction操作频繁发生,会占用大量的磁盘IO操作,从而影响读取的性能。如果写入导致阻塞较多的Compaction操作,就会出现Region中存在多个HFile的情况,从而影响读取的性能。所以如果读取的性能不理想的时候,也要考虑写入的配置是否合理。 读数据客户端调优 Scan数据时需要设置caching(一次从服务端读取的记录条数,默认是1),如果使用默认值读性能会降到极低。 当不需要读一条数据所有的列时,需要指定读取的列,以减少网络IO。 只读取RowKey时,可以为Scan添加一个只读取RowKey的filter(FirstKeyOnlyFilter或KeyOnlyFilter)。 读数据表设计调优 表2 影响实时读数据相关参数 配置参数 描述 默认值 COMPRESSION 配置数据的压缩算法,这里的压缩是HFile中block级别的压缩。对于可以压缩的数据,配置压缩算法可以有效减少磁盘的IO,从而达到提高性能的目的。 说明: 并非所有数据都可以进行有效压缩。例如一张图片的数据,因为图片一般已经是压缩后的数据,所以压缩效果有限。常用的压缩算法是SNAPPY,因为它有较好的Encoding/Decoding速度和可以接受的压缩率。 NONE BLOCKSIZE 配置HFile中block块的大小,不同的block块大小,可以影响HBase读写数据的效率。越大的block块,配合压缩算法,压缩的效率就越好;但是由于HBase的读取数据是以block块为单位的,所以越大的block块,对于随机读的情况,性能可能会比较差。 如果要提升写入的性能,一般扩大到128KB或者256KB,可以提升写数据的效率,也不会影响太大的随机读性能。单位:字节。 65536 DATA_BLOCK_ENCODING 配置HFile中block块的编码方法。当一行数据中存在多列时,一般可以配置为“FAST_DIFF”,可以有效的节省数据存储的空间,从而提供性能。 NONE
  • 使用HBase客户端 安装客户端,具体请参考安装客户端章节。 以客户端安装用户,登录安装客户端的节点。 执行以下命令切换到客户端目录。 cd /opt/hadoopclient 执行以下命令配置环境变量。 source bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有创建HBase表的权限,具体请参见角色管理配置拥有对应权限的角色,参考创建用户章节,为用户绑定对应角色。如果当前集群未启用Kerberos认证,则无需执行此命令。 kinit 组件业务用户 例如,kinit hbaseuser。 直接执行HBase组件的客户端命令。 hbase shell
  • 前提条件 已安装客户端。例如安装目录为“/opt/hadoopclient”,以下操作的客户端目录只是举例,请根据实际安装目录修改。 各组件业务用户由MRS集群管理员根据业务需要创建。 “机机”用户需要下载keytab文件,“人机”用户第一次登录时需修改密码。 非root用户使用HBase客户端,请确保该HBase客户端目录的属主为该用户,否则请参考如下命令修改属主。 chown user:group -R 客户端安装目录/HBase
  • 基于索引查询 在具有索引的用户表中,可以使用SingleColumnValueFilter来查询数据。当查询条件可以命中索引时,查询速度远快于原表查询。 索引的命中规则如下: 多个AND条件查询 当用于查询的列至少包含索引第一个列时,使用索引会提高查询性能。 例如,为C1、C2和C3创建组合索引。 该索引在以下情况下生效: Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) 该索引在下列情况下不生效: Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3) 当在查询中使用“索引列和非索引列”进行过滤时,使用索引可提高查询性能。当非索引列命中覆盖列时,查询性能最优;如果有需经常查询的非索引列,建议定义为覆盖列。例如: Filter_Condition(IndexCol1)AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1) 当多个列用于查询时,只能为组合索引中的最后一列指定值范围,而其他列只能设置为指定值。 例如,为C1、C2和C3创建组合索引。在范围查询中,只能为C3设置数值范围,过滤条件为“C1 = XXX,C2 = XXX,C3 = 数值范围”。 多个OR条件查询 例如,为C1、C2和C3创建组合索引。 仅对索引列首个字段进行过滤时(支持范围过滤),使用索引可提高查询性能。 Filter_Condition(IndexCol1)OR Filter_Condition(IndexCol1)OR Filter_Condition(IndexCol1) 对非索引和非索引列进行过滤时,无法命中索引,查询性能不会提高。 Filter_Condition(IndexCol1)OR Filter_Condition(NonIndexCol1) 组合查询时,最外层包含OR条件时无法命中索引,查询性能不会提高。 Filter_Condition(IndexCol1)OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2))OR(Filter_Condition(NonIndexCol1)) 减少OR条件使用,尤其是OR条件+范围条件,命中索引的情况下也会造成大范围查询,速度较慢。
  • 配置描述 在客户端的“mapred-site.xml”配置文件中调整如下参数。 “mapred-site.xml”配置文件在客户端安装路径的conf目录下,例如“/opt/client/Yarn/config”。 表1 参数说明 参数 描述 默认值 mapreduce.reduce.shuffle.max-host-failures MR任务在reduce过程中读取远端shuffle数据允许失败的次数。当设置次数大于5时,可以降低客户端应用的失败率。 5 mapreduce.client.submit.file.replication MR任务在运行时依赖的相关job文件在HDFS上的备份。当备份数大于10时,可以降低客户端应用的失败率。 10
  • 配置参数 在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 参数 说明 默认值 spark.executor.execute.shutdown.cleaner 配置为true后,支持executor退出时执行自定义代码。 false spark.executor.execute.shutdown.cleaner.max.timeout executor执行自定义代码的超时时间。 240s
  • Flume客户端Cgroup使用指导 该操作指导用户加入、退出Cgroup,查询Cgroup状态以及更改Cgroup cpu阈值。 加入Cgroup 执行以下命令,加入Cgroup,假设Flume客户端安装路径为“/opt/FlumeClient”,Cgroup cpu阈值设置为50%: cd /opt/FlumeClient/fusioninsight-flume-1.9.0/bin ./flume-manage.sh cgroup join 50 该命令不仅可以加入Cgroup,同时也可以更改Cgroup cpu阈值。 Cgroup cpu阈值取值范围为1~100*N之间的整数,N表示机器cpu核数。 查询Cgroup状态 执行以下命令,查询Cgroup状态,假设Flume客户端安装路径为“/opt/FlumeClient”: cd /opt/FlumeClient/fusioninsight-flume-1.9.0/bin ./flume-manage.sh cgroup status 退出Cgroup 执行以下命令,退出Cgroup,假设Flume客户端安装路径为“/opt/FlumeClient”: cd /opt/FlumeClient/fusioninsight-flume-1.9.0/bin ./flume-manage.sh cgroup exit 客户端安装完成后,会自动创建默认Cgroup。如果安装客户端时未配置“-s”参数,则默认值为“-1”,表示agent进程不受cpu使用率限制。 加入、退出Cgroup时,agent进程不受影响。如果agent进程未启动,加入、退出Cgroup仍然可以成功执行,待下一次agent启动时生效。 客户端卸载完成后,安装时期创建的Cgroup会自动删除。
  • 回答 正常情况下,相同rowkey值的数据加载到HBase是有先后顺序的,HBase以最近的时间戳的数据为最新数据,一般的默认查询中,没有指定时间戳的,就会对相同rowkey值的数据仅返回最新数据。 使用bulkload加载数据,由于数据在内存中处理生成HFile,速度是很快的,很可能出现相同rowkey值的数据具有相同时间戳,从而造成查询结果混乱的情况。 建议在建表和数据加载时,设计好rowkey值,尽量避免在同一个数据文件中存在相同rowkey值的情况。
  • 操作步骤 停止Flume角色的客户端。 假设Flume客户端安装路径为“/opt/FlumeClient”,执行以下命令,停止Flume客户端: cd /opt/FlumeClient/fusioninsight-flume-Flume组件版本号/bin ./flume-manage.sh stop 执行脚本后,显示如下信息,说明成功的停止了Flume客户端: Stop Flume PID=120689 successful.. Flume客户端停止后会自动重启,如果不需自动重启,请执行以下命令: ./flume-manage.sh stop force 需要启动时,可执行以下命令: ./flume-manage.sh start force 卸载Flume角色的客户端。 假设Flume客户端安装路径为“/opt/FlumeClient”,执行以下命令,卸载Flume客户端: cd /opt/FlumeClient/fusioninsight-flume-Flume组件版本号/inst ./uninstall.sh
  • 配置描述 进入Mapreduce服务参数“全部配置”界面,在搜索框中输入参数名称。具体操作请参考修改集群服务配置参数章节。 表1 参数描述 参数 描述 默认值 mapreduce.cluster.acls.enabled 是否开启对Job History Server 权限控制的开关。 true mapreduce.cluster.administrators 用于指定MapReduce集群管理员列表,可以配置用户和用户组,用户或者用户组之间用逗号间隔,用户和用户组之间用空格间隔,举例:userA,userB groupA,groupB。当配置为*时表示所有用户或用户组。 mapred supergroup,System_administrator_186
  • 操作场景 该章节主要介绍如何在HBase Shell命令行查询慢请求或超大请求信息。慢请求是指通过hbase shell命令查询服务端时,RPC请求响应时长超过阈值(即HBase服务端配置参数“hbase.ipc.warn.response.time”,默认值为“3000”ms)的请求;超大请求是指通过hbase shell命令查询服务端时,RPC请求一次返回数据量大小超过阈值(即HBase服务端配置参数“hbase.ipc.warn.response.size”,默认值为“5MB”)的请求。 每个RegionServer节点默认会缓存最近的256条慢请求和超大请求,可以通过FusionInsight Manager中HBase服务端配置参数“hbase.regionserver.slowlog.ringbuffer.size”调整缓存的大小。 该章节内容适用于MRS 3.3.0 及之后版本。
  • 命令说明 该操作主要涉及新增的hbase shell命令如下: get_slowlog_responses:查询慢请求信息。 get_largelog_responses:查询超大请求信息。 clear_slowlog_responses:清理RegionServer缓存中的数据。 可以在hbase shell中执行如下命令查看相关命令如何使用: help 'cmdName' 例如,执行help 'clear_slowlog_responses'查看clear_slowlog_responses命令的使用方法:
  • 操作场景 Hive业务还可能需要关联使用其他组件,例如HQL语句触发MapReduce任务需要设置Yarn权限,或者Hive over HBase的场景需要HBase权限。以下介绍Hive关联Yarn和Hive over HBase两个场景下的操作。 安全模式下Yarn和HBase的权限管理默认是开启的,因此在安全模式下默认需要配置Yarn和HBase权限。 在普通模式下,Yarn和HBase的权限管理默认是关闭的,即任何用户都有权限,因此普通模式下默认不需要配置Yarn和HBase权限。如果用户修改了YARN或者HBase的配置来开启权限管理,则修改后也需要配置Yarn和HBase权限。 如果当前组件使用了Ranger进行权限控制,须基于Ranger配置相关策略进行权限管理,具体操作可参考添加Hive的Ranger访问权限策略。
  • Flink作业大小表Join去重 在双流关联的业务模型中,关联算子接收到其中一个流发送的大量重复数据,则会导致下游算子需要处理大量重复数据,影响作业性能。 如A表字段(P1,A1,A2)使用如下方式关联B表字段(P1,B1,B2,B3)生成C的场景中,B表信息发生大量更新,但是B中的所需字段没有更新,在该关联中仅用到了B表的B1和B2字段,对于B表,每个记录更新只更新B3字段,B1和B2不更新,因此当B表更新,可以忽略更新后的数据。 select A.A1,B.B1,B.B2 from A join B on A.P1=B.P1 为解决如上问题可通过使用hint单独为左表(duplicate.left)或右表(duplicate.right)设置去重: 格式 为左表设置去重 /*+ OPTIONS('duplicate.left'='true')*/ 为右表设置去重 /*+ OPTIONS('duplicate.right'='true')*/ 同时为左表和右表设置去重 /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ 在SQL语句中配置 如同时为左表“user_info”和右表“user_score”设置去重。 CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- 为左表和右表设置去重 user_score /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ as d ON t.user_id = d.user_id;
  • Flink作业大小表Join Flink作业双流Join时存在大小表数据,通过内核broadcast策略确保小表数据发送到Join的task中,通过rebalance策略将大表数据打散到Join中,提高Flink SQL易用性,增强作业稳定性。 在使用Flink SQL时,该特性通过hints方法指定Join的左表或右表为广播表,另一张表为rebalance表,SQL语句实例如下,分别以A\C作为小表实例: 以A表作为广播表 使用Join方式 SELECT /*+ BROADCAST(A) */ a2, b2 FROM A JOIN B ON a1 = b1 使用Where方式 SELECT /*+ BROADCAST(A) */ a2, b2 FROM A, B WHERE a1 = b1 以A和C表作为广播表 SELECT /*+ BROADCAST(A, C) */ a2, b2, c2 FROM A JOIN B ON a1 = b1 JOIN C ON a1 = c1 支持通过“/*+ BROADCAST(smallTable1, smallTable2) */”方式使用该特性,兼容开源双流Join逻辑。 不支持开源双流Join和该特性的切换,因为该特性会将数据广播到每个Join算子。 不支持LEFT JOIN时小表为左表,RIGHT JOIN时小表为右表。
  • 配置描述 查看Yarn服务配置参数 参考修改集群服务配置参数进入Yarn服务参数“全部配置”界面,在搜索框中输入表1中参数名称。 表1 参数描述 参数 描述 默认值 yarn.acl.enable Yarn权限控制启用开关。 true yarn.webapp.filter-entity-list-by-user 严格视图启用开关,开启后,登录用户只能查看该用户有权限查看的内容。当要开启该功能时,同时需要设置参数“yarn.acl.enable”为true。 true 查看Mapreduce服务配置参数 参考修改集群服务配置参数进入Mapreduce服务参数“全部配置”界面,在搜索框中输入表2中参数名称。 表2 参数描述 参数 描述 默认值 mapreduce.cluster.acls.enabled MR JobHistoryServer权限控制启用开关。该参数为客户端参数,当JobHistoryServer服务端开启权限控制之后该参数生效。 true yarn.webapp.filter-entity-list-by-user MR JobHistoryServer严格视图启用开关,开启后,登录用户只能查看该用户有权限查看的内容。该参数为JobHistoryServer的服务端参数,表示JHS开启了权限控制,但是否要对某一个特定的Application进行控制,是由客户端参数:“mapreduce.cluster.acls.enabled”决定。 true 以上配置会影响restful API和shell命令结果,即以上配置开启后,restful API调用和shell命令运行所返回的内容只包含调用用户有权查看的信息。 当yarn.acl.enable或mapreduce.cluster.acls.enabled设置为false时,即关闭Yarn或Mapreduce的权限校验功能。此时任何用户都可以在Yarn或MapReduce上提交任务和查看任务信息,存在安全风险,请谨慎使用。
  • 参数说明 表1 Loader常用参数 配置参数 说明 默认值 范围 mapreduce.client.submit.file.replication MapReduce任务在运行时依赖的相关job文件在HDFS上的副本数。当集群中DataNode个数小于该参数值时,副本数等于DataNode的个数。当DataNode个数大于或等于该参数值,副本数为该参数值。 10 3~256 loader.fault.tolerance.rate 容错率。 值大于0时使能容错机制。使能容错机制时建议将作业的Map数设置为大于等于3,推荐在作业数据量大的场景下使用。 0 0~1.0 loader.input.field.separator 默认的输入字段分割符,需要配置输入与输出转换步骤才生效,转换步骤的内容可以为空;如果作业的转换步骤中没有配置分割符,则以此处的默认分割符为准。 , - loader.input.line.separator 默认的输入行分割符,需要配置输入与输出转换步骤才生效,转换步骤的内容可以为空;如果作业的转换步骤中没有配置分割符,则以此处的默认分割符为准。 - - loader.output.field.separator 默认的输出字段分割符,需要配置输入与输出转换步骤才生效,转换步骤的内容可以为空;如果作业的转换步骤中没有配置分割符,则以此处的默认分割符为准。 , - loader.output.line.separator Loader输出数据的行分隔符。 - - 由于容错率的统计需要时间,为保证使用效果,建议在作业运行时间在2分钟以上时使用“loader.fault.tolerance.rate”参数。 此处参数设置的为Loader全局的默认分割符,如果作业的转换步骤中配置了分割符,则以转换步骤为准,转换步骤中没有配置分割符则以此处的默认分割符为准。
  • 前提条件 已安装Oozie、ZooKeeper服务,且服务正常运行。 没有任务正在运行。 如果当前集群不是安装最新的版本包,需要从“$BIGDATA_HOME/FusionInsight_Porter_x.x.x/install/FusionInsight-Oozie-x.x.x/oozie-x.x.x/embedded-oozie-server/webapp/WEB-INF/lib”路径复制“curator-x-discovery-x.x.x.jar”包到“$BIGDATA_HOME/FusionInsight_Porter_x.x.x/install/FusionInsight-Oozie-x.x.x/oozie-x.x.x/lib”目录下。
  • 日志级别 Flink中提供了如表3所示的日志级别。日志级别优先级从高到低分别是ERROR、WARN、INFO、DEBUG。程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。 表3 日志级别 级别 描述 ERROR ERROR表示当前时间处理存在错误信息。 WARN WARN表示当前事件处理存在异常信息。 INFO INFO表示记录系统及各事件正常运行状态信息。 DEBUG DEBUG表示记录系统及系统的调试信息。 如果您需要修改日志级别,请执行如下操作: 请参考修改集群服务配置参数,进入Flink的“全部配置”页面。 左边菜单栏中选择所需修改的角色所对应的日志菜单。 选择所需修改的日志级别。 保存配置,在弹出窗口中单击“确定”使配置生效。 配置完成后不需要重启服务,重新下载客户端使配置生效。 也可以直接修改客户端“客户端安装目录/Flink/flink/conf/”中log4j-cli.properties、log4j.properties、log4j-session.properties文件中对应的日志级别配置项。 通过客户端提交作业时会在客户端log文件夹中生成相应日志文件,由于系统默认umask值是0022,所以日志默认权限为644;如果需要修改文件权限,需要修改umask值;例如修改omm用户umask值: 在“/home/omm/.baskrc”文件末尾添加“umask 0026”; 执行命令source /home/omm/.baskrc使文件权限生效。
  • 配置场景 Spark SQL Adaptive Execution特性用于使Spark SQL在运行过程中,根据中间结果优化后续执行流程,提高整体执行效率。当前已实现的特性如下: 自动设置shuffle partition数 在启用Adaptive Execution特性前,Spark SQL根据spark.sql.shuffle.partitions配置指定shuffle时的partition个数。此种方法在一个应用中执行多种SQL查询时缺乏灵活性,无法保证所有场景下的性能更优。开启Adaptive Execution后,Spark SQL将自动为每个shuffle过程动态设置partition个数,而不是使用通用配置,使每次shuffle过程自动使用最合理的partition数。 动态调整执行计划 在启用Adaptive Execution特性前,Spark SQL根据RBO和CBO的优化结果创建执行计划,此种方法忽略了数据在运行过程中的结果集变化。比如基于某个大表创建的视图,与其他大表join时,即便视图的结果集很小,也无法将执行计划调整为BroadcastJoin。启用Adaptive Execution特性后,Spark SQL能够在运行过程中根据前面stage的运行结果动态调整后续的执行计划,从而获得更好的执行性能。 自动处理数据倾斜 在执行SQL语句时,如果存在数据倾斜,可能导致单个executor内存溢出、任务执行缓慢等问题。启动Adaptive Execution特性后,Spark SQL能自动处理数据倾斜场景,对倾斜的分区,启动多个task进行处理,每个task读取部分shuffle输出文件,再对这部分任务的Join结果进行Union操作,以达到消除数据倾斜的效果
  • 注意事项 Join数据倾斜问题 执行任务的时候,任务进度长时间维持在99%,这种现象叫数据倾斜。 数据倾斜是经常存在的,因为有少量的Reduce任务分配到的数据量和其他Reduce差异过大,导致大部分Reduce都已完成任务,但少量Reduce任务还没完成的情况。 解决数据倾斜的问题,可通过设置“set hive.optimize.skewjoin=true”并调整hive.skewjoin.key的大小。hive.skewjoin.key是指Reduce端接收到多少个key即认为数据是倾斜的,并自动分发到多个Reduce。
  • Sort Merge Bucket Map Join 使用Sort Merge Bucket Map Join必须满足以下2个条件: join的两张表都很大,内存中无法存放。 两张表都按照join key进行分桶(clustered by (column))和排序(sorted by(column)),且两张表的分桶数正好是倍数关系。 通过如下设置,启用Sort Merge Bucket Map Join: set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; 这种Map Join也没有Reduce任务,是在Map任务前启动MapReduce Local Task,将小表内容按桶读取到本地,在本机保存多个桶的HashTable备份并写入HDFS,并保存在Distributed Cache中,在Map Task中从本地磁盘或者Distributed Cache中按桶一个一个读取小表内容,然后与大表做匹配直接得到结果并输出。
共100000条