华为云用户手册

  • Network communication (via Netty) 表5 Network communication参数说明 参数 描述 默认值 是否必选 taskmanager.network.netty.num-arenas Netty内存块数。 1 否 taskmanager.network.netty.server.numThreads Netty服务器线程的数量。 1 否 taskmanager.network.netty.client.numThreads Netty客户端线程数。 1 否 taskmanager.network.netty.client.connectTimeoutSec Netty客户端连接超时。单位:s。 120 否 taskmanager.network.netty.sendReceiveBufferSize Netty发送和接收缓冲区大小。 默认为系统缓冲区大小(cat / proc / sys / net / ipv4 / tcp_ [rw] mem),在现代Linux中为4MB。单位:bytes。 4096 否 taskmanager.network.netty.transport Netty传输类型,“nio”或“epoll”。 nio 否
  • SSL 表4 SSL参数说明 参数 描述 默认值 是否必选 备注 security.ssl.internal.enabled 内部通信SSL总开关,按照集群的安全模式自动配置。 安全模式:true 普通模式:false 是 仅MRS 3.x之前版本 security.ssl.internal.keystore Java keystore文件。 - 是 security.ssl.internal.keystore-password keystore文件解密密码。 - 是 security.ssl.internal.key-password keystore文件中服务端key的解密密码。 - 是 security.ssl.internal.truststore truststore文件包含公共CA证书。 - 是 security.ssl.internal.truststore-password truststore文件解密密码。 - 是 security.ssl.rest.enabled 外部通信SSL总开关,按照集群的安全模式自动配置。 安全模式:true 普通模式:false 是 security.ssl.rest.keystore Java keystore文件。 - 是 security.ssl.rest.keystore-password keystore文件解密密码。 - 是 security.ssl.rest.key-password keystore文件中服务端key的解密密码。 - 是 security.ssl.rest.truststore truststore文件包含公共CA证书。 - 是 security.ssl.rest.truststore-password truststore文件解密密码。 - 是 security.ssl.protocol SSL传输的协议版本。 TLSv1.2 是 适用于所有版本 security.ssl.algorithms 支持的SSL标准算法,具体可参考java官网:http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites。 TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 是 security.ssl.enabled 内部通信SSL总开关,按照集群的安装模式自动配置。 安全模式:true 普通模式:false 是 仅MRS 3.x及之后版本 security.ssl.keystore Java keystore文件。 - 是 security.ssl.keystore-password keystore文件解密密码。 - 是 security.ssl.key-password keystore文件中服务端key的解密密码。 - 是 security.ssl.truststore truststore文件包含公共CA证书。 - 是 security.ssl.truststore-password truststore文件解密密码。 - 是
  • Blob服务端 表2 Blob服务端参数说明 参数 描述 默认值 是否必选 blob.server.port blob服务器端口。 32456-32520 否 blob.service.ssl.enabled blob传输通道是否加密传输,仅在全局开关security.ssl开启时有。 true 是 blob.fetch.retries TaskManager从JobManager下载blob文件的重试次数。 50 否 blob.fetch.num-concurrent JobManager支持的下载blob的并发数。 50 否 blob.fetch.backlog JobManager支持的blob下载队列大小,比如下载Jar包等。单位:个。 1000 否 library-cache-manager.cleanup.interval 当用户取消flink job后,jobmanager删除HDFS上存放用户jar包的时间,单位为s。 仅适用于MRS 3.x及之后版本。 3600 否
  • Distributed Coordination (via Akka) 表3 Distributed Coordination参数说明 参数 描述 默认值 是否必选 备注 akka.ask.timeout akka所有异步请求和阻塞请求的超时时间。如果Flink发生超时失败,可以增大这个值。当机器处理速度慢或者网络阻塞时会发生超时。单位:ms/s/m/h/d。 10s 否 适用于所有版本 akka.lookup.timeout 查找JobManager actor对象的超时时间。单位:ms/s/m/h/d。 10s 否 akka.framesize JobManager和TaskManager间最大消息传输大小。当Flink出现消息大小超过限制的错误时,可以增大这个值。单位:b/B/KB/MB。 10485760b 否 akka.watch.heartbeat.interval Akka DeathWatch机制检测失联TaskManager的心跳间隔。如果TaskManager经常发生由于心跳消息丢失或延误而被错误标记为失联的情况,可以增大这个值。单位:ms/s/m/h/d。 10s 否 akka.watch.heartbeat.pause Akka DeathWatch可接受的心跳暂停时间,较小的数值表示不允许不规律的心跳。单位:ms/s/m/h/d。 60s 否 akka.watch.threshold DeathWath失败检测阈值,较小的数值容易把正常TaskManager标记为失败,较大的值增加了失败检测的时间。 12 否 akka.tcp.timeout 发送连接TCP超时时间,如果经常发生满网络环境下连接TaskManager超时,可以增大这个值。单位:ms/s/m/h/d。 20s 否 akka.throughput Akka批量处理消息的数量,一次操作完后把处理线程归还线程池。较小的数值代表actor消息处理的公平调度,较大的值以牺牲调度公平的代价提高整体性能。 15 否 akka.log.lifecycle.events Akka远程时间日志开关,当需要调试时可打开此开关。 false 否 akka.startup-timeout 远程组件启动失败前的超时时间。该值需带一个时间单位(ms/s/min/h/d) 与akka.ask.timeout的值一致 否 akka.ssl.enabled Akka通信SSL开关,仅在全局开关security.ssl开启时有。 true 是 akka.client-socket-worker-pool.pool-size-factor 计算线程池大小的因子,计算公式:ceil(可用处理器*因子),计算结果限制在pool-size-min和pool-size-max之间。 1.0 否 仅适用于MRS 3.x及之后版本 akka.client-socket-worker-pool.pool-size-max 基于因子计算的线程数上限。 2 否 akka.client-socket-worker-pool.pool-size-min 基于因子计算的线程数下限。 1 否 akka.client.timeout 【说明】客户端超时时间。该值需带一个时间单位(ms/s/min/h/d)。 60s 否 akka.server-socket-worker-pool.pool-size-factor 【说明】计算线程池大小的因子,计算公式:ceil(可用处理器*因子),计算结果限制在pool-size-min和pool-size-max之间。 1.0 否 akka.server-socket-worker-pool.pool-size-max 基于因子计算的线程数上限。 2 否 akka.server-socket-worker-pool.pool-size-min 基于因子计算的线程数下限。 1 否
  • 配置详情 本章节为你介绍如下参数配置: JobManager & TaskManager: JobManager和TaskManager是Flink的主要组件,针对各种安全场景和性能场景,配置项包括通信端口,内存管理,连接重试等。 Blob服务端: JobManager节点上的Blob服务端是用于接收用户在客户端上传的Jar包,或将Jar包发送给TaskManager,传输log文件等,配置项包括端口,SSL,重试次数,并发等。 Distributed Coordination (via Akka): Flink客户端与JobManager的通信,JobManager与TaskManager的通信和TaskManager与TaskManager的通信都基于Akka actor模型。相关参数可以根据网络环境或调优策略进行配置,配置项包括消息发送和等待的超时设置,akka监听机制Deathwatch等。 SSL: 当需要配置安全Flink集群时,需要配置SSL相关配置项,配置项包括SSL开关,证书,密码,加密算法等。 Network communication (via Netty): Flink运行Job时,Task之间的数据传输和反压检测都依赖Netty,某些环境下可能需要对Netty参数进行配置。对于高级调优,可调整部分Netty配置项,默认配置已可满足大规模集群并发高吞吐量的任务。 JobManager Web Frontend: JobManager启动时,会在同一进程内启动Web服务器,访问Web服务器可以获取当前Flink集群的信息,包括JobManager,TaskManager及集群内运行的Job。Web服务器参数的配置项包括端口,临时目录,显示项目,错误重定向,安全相关等。 File Systems: Task运行中会创建结果文件,支持对文件创建行为进行配置,配置项包括文件覆盖策略,目录创建等。 State Backend: Flink提供了HA和作业的异常恢复,并且提供版本升级时作业的暂停恢复。对于作业状态的存储,Flink依赖于state backend,作业的重启依赖于重启策略,用户可以对这两部分进行配置。配置项包括state backend类型,存储路径,重启策略等。 Kerberos-based Security: Flink安全模式下必须配置Kerberos相关配置项,配置项包括kerberos的keytab、principal等。 HA: Flink的HA模式依赖于ZooKeeper,所以必须配置ZooKeeper相关配置,配置项包括ZooKeeper地址,路径,安全认证等。 Environment: 对于JVM配置有特定要求的场景,可以通过配置项传递JVM参数到客户端,JobMananger,TaskManager等。 Yarn: Flink运行在Yarn集群上时,JobManager运行在Application Master上。JobManager的一些配置参数依赖于Yarn,通过配置YARN相关的配置,使Flink更好的运行在Yarn上,配置项包括yarn container的内存,虚拟内核,端口等。 Pipeline: 为适应某些场景对降低时延的需求,设计多个Job间采用Netty直接相连的方式传递数据,即分别使用NettySink用于Server端、NettySource用于Client端进行数据传输。配置项包括NettySink的信息存放路径、NettySink的端口监听范围、连接是否通过SSL加密以及NettySink监听所使用的网络所在域等。
  • JobManager & TaskManager 表1 JobManager & TaskManager参数说明 参数 描述 默认值 是否必选 备注 taskmanager.memory.size TaskManager在JVM堆内存中保留空间的大小,此内存用于排序,哈希表和中间状态的缓存。如果未指定,则会使用JVM堆内存乘以比例taskmanager.memory.fraction。单位:MB。 0 否 仅MRS 3.x之前版本 taskmanager.registration.initial-backoff 两次连续注册的初始间隔时间。单位:ms/s/m/h/d。 时间数值和单位之间有半角字符空格。ms/s/m/h/d表示毫秒、秒、分钟、小时、天。 500 ms 否 taskmanager.registration.refused-backoff JobManager拒绝注册后到允许再次注册的间隔时间。 5 min 否 taskmanager.rpc.port TaskManager的IPC端口范围。 32326-32390 否 适用于所有版本 taskmanager.memory.segment-size 内存管理器和网络堆栈使用的内存缓冲区大小。单位:bytes。 32768 否 taskmanager.data.port TaskManager数据交换端口范围。 32391-32455 否 taskmanager.data.ssl.enabled TaskManager之间数据传输是否使用SSL加密,仅在全局开关security.ssl开启时有效。 false 否 taskmanager.numberOfTaskSlots TaskManager占用的slot数,一般配置成物理机的核数,yarn-session模式下只能使用-s参数传递,yarn-cluster模式下只能使用-ys参数传递。 1 否 parallelism.default 默认并行度,用于未指定并行度的作业。 1 否 taskmanager.memory.fraction TaskManager在JVM堆内存中保留空间的比例,此内存用于排序,哈希表和中间状态的缓存。 0.7 否 taskmanager.memory.off-heap TaskManager是否使用堆外内存,此内存用于排序,哈希表和中间状态的缓存。建议对于大内存,开启此配置提高内存操作的效率。 false 是 taskmanager.memory.preallocate TaskManager是否在启动时分配保留内存空间。当开启堆外内存时,建议开启此配置项。 false 否 task.cancellation.interval 两次连续任务取消操作的间隔时间。单位:ms。 30000 否 client.rpc.port Flink client端Akka system监听端口。 32651-32720 否 仅MRS 3.x及之后版本 jobmanager.heap.size JobManager堆内存大小,yarn-session模式下只能使用-jm参数传递,yarn-cluster模式下只能使用-yjm参数传递,如果小于YARN配置文件中yarn.scheduler.minimum-allocation-mb大小,则使用YARN配置中的值。单位:B/KB/MB/GB/TB。 1024mb 否 taskmanager.heap.size TaskManager堆内存大小,yarn-session模式下只能使用-tm参数传递,yarn-cluster模式下只能使用-ytm参数传递,如果小于YARN配置文件中yarn.scheduler.minimum-allocation-mb大小,则使用YARN配置中的值。单位:B/KB/MB/GB/TB。 1024mb 否 taskmanager.network.numberOfBuffers TaskManager网络传输缓冲栈数量,如果作业运行中出错提示系统中可用缓冲不足,可以增加这个配置项的值。 2048 否 taskmanager.debug.memory.startLogThread 调试Flink内存和GC相关问题时可开启,TaskManager会定时采集内存和GC的统计信息,包括当前堆内,堆外,内存池的使用率和GC时间。 false 否 taskmanager.debug.memory.logIntervalMs TaskManager定时采集内存和GC的统计信息的采集间隔。 0 否 taskmanager.maxRegistrationDuration TaskManager向JobManager注册自己的最长时间,如果超过时间,TaskManager会关闭。 5 min 否 taskmanager.initial-registration-pause 两次连续注册的初始间隔时间。该值需带一个时间单位(ms/s/min/h/d)(比如5秒)。 时间数值和单位之间有半角字符空格。ms/s/m/h/d表示毫秒、秒、分钟、小时、天。 500 ms 否 taskmanager.max-registration-pause TaskManager注册失败最大重试间隔。单位:ms/s/m/h/d。 30 s 否 taskmanager.refused-registration-pause TaskManager注册连接被JobManager拒绝后的重试间隔。单位:ms/s/m/h/d。 10 s 否 classloader.resolve-order 从用户代码加载类时定义类解析策略,这意味着是首先检查用户代码jar(“child-first”)还是应用程序类路径(“parent-first”)。默认设置指示首先从用户代码jar加载类,这意味着用户代码jar可以包含和加载不同于Flink使用的(依赖)依赖项。 child-first 否 slot.idle.timeout Slot Pool中空闲Slot的超时时间(以ms为单位)。 50000 否 slot.request.timeout 从Slot Pool请求Slot的超时(以ms为单位)。 300000 否 task.cancellation.timeout 取消任务超时时间(以ms为单位),超时后会触发TaskManager致命错误。设置为0,取消任务卡住则不会报错。 180000 否 taskmanager.network.detailed-metrics 启用网络队列长度的详细指标监控。 false 否 taskmanager.network.memory.buffers-per-channel 每个传出/传入通道(子分区/输入通道)使用的最大网络缓冲区数.在基于信用的流量控制模式下,这表示每个输入通道中有多少信用。它应配置至少2以获得良好的性能。1个缓冲区用于接收子分区中的飞行中数据,1个缓冲区用于并行序列化。 2 否 taskmanager.network.memory.floating-buffers-per-gate 每个输出/输入门(结果分区/输入门)使用的额外网络缓冲区数。在基于信用的流量控制模式中,这表示在所有输入通道之间共享多少浮动信用。浮动缓冲区基于积压(子分区中的实时输出缓冲区)反馈来分布,并且可以帮助减轻由子分区之间的不平衡数据分布引起的背压。如果节点之间的往返时间较长和/或群集中的机器数量较多,则应增加此值。 8 否 taskmanager.network.memory.fraction 用于网络缓冲区的JVM内存的占比。这决定了TaskManager可以同时拥有多少流数据交换通道以及通道缓冲的程度。如果作业被拒绝或者收到系统没有足够缓冲区的警告,请增加此值或“taskmanager.network.memory.min”和“taskmanager.network.memory.max”。另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此占比。 0.1 否 taskmanager.network.memory.max 网络缓冲区的最大内存大小。该值需带一个大小单位(B/KB/MB/GB/TB)。 1 GB 否 taskmanager.network.memory.min 网络缓冲区的最小内存大小。该值需带一个大小单位(B/KB/MB/GB/TB)。 64 MB 否 taskmanager.network.request-backoff.initial 输入通道的分区请求的最小退避(以ms为单位)。 100 否 taskmanager.network.request-backoff.max 输入通道的分区请求的最大退避(以ms为单位)。 10000 否 taskmanager.registration.timeout TaskManager注册的超时时间,在该时间内未成功注册,TaskManager将终止。该值需带一个时间单位(ms/s/min/h/d)。 5 min 否 resourcemanager.taskmanager-timeout 释放空闲TaskManager的超时(以ms为单位)。 30000 否
  • 配置说明 Flink所有的配置参数都可以在客户端侧进行配置,建议用户直接修改客户端的“flink-conf.yaml”配置文件进行配置,如果通过Manager界面修改Flink服务参数,配置完成之后需要重新下载安装客户端: 配置文件路径:客户端安装路径/Flink/flink/conf/flink-conf.yaml。 文件的配置格式为key: value。 例:taskmanager.heap.size: 1024mb 注意配置项key:与value之间需有空格分隔。
  • 注意事项 当主集群关闭时,此工具将从ZooKeeper节点(RS znode)获得WAL的处理进度以及WAL的处理队列,并将未复制的队列复制到备集群中。 每个主集群的RegionServer在备集群ZooKeeper上的replication节点下都有自己的znode。它包含每个对等集群的一个znode。 当RegionServer故障时,主集群的每个RegionServer都会通过watcher收到通知,并尝试锁定故障RegionServer的znode,包含它的队列。成功创建的RegionServer会将所有队列转移到自己队列的znode下。队列传输后,它们将从旧位置删除。 在主集群关闭期间,ReplicationSyncUp工具将使用来自ZooKeeper节点的信息同步主备集群的数据,并且RegionServer znode的wals将被移动到备集群下。
  • 前提条件 主备集群已经安装并且启动。 主备集群上的时间必须一致,而且主备集群上的NTP服务必须使用同一个时间源。 当主集群HBase服务关闭时,ZooKeeper和HDFS服务应该启动并运行。 该工具应该由启动HBase进程的系统用户运行。 如果处于安全模式,请确保备用集群的HBase系统用户具有主集群HDFS的读取权限。因为它将更新HBase系统ZooKeeper节点和HDFS文件。 主集群HBase故障后,主集群的ZooKeeper,文件系统和网络依然可用。
  • 工具使用 在主集群hbase shell中输入如下命令使用: hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp -Dreplication.sleep.before.failover=1 replication.sleep.before.failover是指在RegionServer启动失败时备份其剩余数据前需要的休眠时间。由于30秒(默认值)的睡眠时间没有任何意义,因此将其设置为1(s),使备份过程更快触发。
  • 场景介绍 HBase本身提供了ImportTsv&LoadIncremental工具来批量加载用户数据。当前提供了HIndexImportTsv来支持加载用户数据的同时可以完成对索引数据的批量加载。HIndexImportTsv继承了HBase批量加载数据工具ImportTsv的所有功能。此外,若在执行HIndexImportTsv工具之前未建表,直接运行该工具,将会在创建表时创建索引,并在生成用户数据的同时生成索引数据。
  • Loader连接配置说明 Loader支持以下多种连接: generic-jdbc-connector:参数配置请参见表1。 ftp-connector:参数配置请参见表2。 sftp-connector:参数配置请参见表3。 hdfs-connector:参数配置请参见表4。 oracle-connector:参数配置请参见表5。 mysql-fastpath-connector:参数配置请参见表7。 oracle-partition-connector:参数配置请参见表6。
  • 查看Consumers信息 登录KafkaManager的WebUI界面。 在集群列表页面单击对应集群名称进入集群Summary页面。 单击“Consumers”查看当前集群的Consumers列表及每个Consumer的消费信息。 图5 Consumers列表 单击Consumer的名称查看消费的Topic列表。 图6 Consumer消费的Topic列表 单击Consumer下Topic列表中的Topic名称,查看该Consumer对Topic的具体消费情况。 图7 Consumer对Topic的具体消费情况
  • KafkaManager介绍 KafkaManager是Apache Kafka的管理工具,提供Kafka集群界面化的Metric监控和集群管理。 通过KafkaManager可以: 支持管理多个Kafka集群 支持界面检查集群状态(主题,消费者,偏移量,分区,副本,节点) 支持界面执行副本的leader选举 使用选择生成分区分配以选择要使用的分区方案 支持界面执行分区重新分配(基于生成的分区方案) 支持界面选择配置创建主题(支持多种Kafka版本集群) 支持界面删除主题(仅支持0.8.2+并设置了delete.topic.enable = true) 支持批量生成多个主题的分区分配,并可选择要使用的分区方案 支持批量运行重新分配多个主题的分区 支持为已有主题增加分区 支持更新现有主题的配置 可以为分区级别和主题级别度量标准启用JMX查询 可以过滤掉zookeeper中没有ids / owner /&offsets /目录的使用者。 父主题: 使用KafkaManager
  • 操作步骤 将业务数据上传到用户的OBS文件系统。 获取用户的AK/SK信息,然后创建一个OBS连接和一个HDFS连接。 具体可参见Loader连接配置说明。 访问Loader页面。 如果是启用了Kerberos认证的分析集群,可参见访问Hue的WebUI。 单击“新建作业”。 在“基本信息”填写参数。 在“名称”填写一个作业的名称。例如“obs2hdfs”。 在“源连接”选择已创建的OBS连接。 “目的连接”选择已创建的HDFS连接。 在“自”填写源连接参数。 在“桶名”填写业务数据所保存的OBS文件系统名称。 在“源目录或文件”填写业务数据在文件系统的具体位置。 如果是单个文件,需要填写包含文件名的完整路径。如果是目录,填写目录的完整路径 “文件格式”填写业务数据文件的类型。 可参见obs-connector。 在“至”填写目的连接参数。 在“定入目录”填写业务数据在HDFS要保存的目录名称。 如果是启用Kerberos认证的集群,当前访问Loader的用户对保存数据的目录需要有写入权限。 在“文件格式”填写业务数据文件的类型。 需要与6.c的类型对应。 在“压缩格式”填写一种压缩的算法。例如选择不压缩“NONE”。 在“是否覆盖”选择已有文件的处理方式,选择“True”。 单击“显示高级属性”,在“换行符”填写业务数据保存时,系统填充的换行字符。 在“字段分割符”填写业务数据保存时,系统填充的分割字符。 可参见hdfs-connector。 在“任务配置”填写作业的运行参数。 在“抽取并发数”填写map任务的个数。 在“加载(写入)并发数”填写reduce任务的个数。 目的连接为HDFS连接时,不显示“加载(写入)并发数”参数。 “单个分片的最大错误记录数”填写错误记录阈值。 在“脏数据目录”填写一个脏数据的保存位置,例如“/user/sqoop/obs2hdfs-dd”。 单击“保存并运行”。 在“管理作业界面”,查看作业运行结果。可以单击“刷新列表”获取作业的最新状态。
  • 操作步骤 MRS 3.x之前版本: 从MySQL官网下载MySQL jdbc驱动程序“mysql-connector-java-5.1.21.jar”,具体MySQL jdbc驱动程序选择参见下表。 表1 版本信息 jdbc驱动程序版本 MySQL版本 Connector/J 5.1 MySQL 4.1、MySQL 5.0、MySQL 5.1、MySQL 6.0 alpha Connector/J 5.0 MySQL 4.1、MySQL 5.0 servers、distributed transaction (XA) Connector/J 3.1 MySQL 4.1、MySQL 5.0 servers、MySQL 5.0 except distributed transaction (XA) Connector/J 3.0 MySQL 3.x、MySQL 4.1 将“mysql-connector-java-5.1.21.jar”上传至MRS master 主备节点loader安装目录 针对MRS 3.x之前版本,上传至“/opt/Bigdata/MRS_XXX/install/FusionInsight-Sqoop-1.99.7/FusionInsight-Sqoop-1.99.7/server/jdbc/” 其中“XXX”为MRS版本号,请根据实际情况修改。 修改“mysql-connector-java-5.1.21.jar”包属主为“omm:wheel”。 修改配置文件“jdbc.properties”。 将“MYSQL”的键值修改为上传的jdbc驱动包名“mysql-connector-java-5.1.21.jar”,例如:MYSQL=mysql-connector-java-5.1.21.jar。 重启Loader服务。
  • 创建作业 访问Loader页面,单击“新建作业”。 在“基本信息”填写参数。 在“名称”填写一个作业的名称。 在“源连接”和“目的连接”选择对应的连接。 选择某个类型的连接,表示从指定的源获取数据,并保存到目的位置。 如果没有需要的连接,可单击“添加新连接”。 在“自”填写源连接的作业配置。 具体请参见Loader作业源连接配置说明。 在“至”填写目的连接的作业配置。 具体请参见Loader作业目的连接配置说明。 在“目的连接”是否选择了数据库类型的连接? 数据库类型的连接包含以下几种: generic-jdbc-connector hbase-connector hive-connector “目的连接”选择数据库类型的连接时,还需要配置业务数据与数据库表字段的对应关系: 是,请执行6。 否,请执行7。 在“字段映射”填写字段对应关系。然后执行7。 “字段映射”的对应关系,表示用户数据中每一列与数据库的表字段的匹配关系。 表1 “字段映射”属性 参数 说明 列号 表示业务数据的字段顺序。 样本 表示业务数据的第一行值样例。 列族 “目的连接”为hbase-connector类型时,支持定义保存数据的具体列族。 目的字段 配置保存数据的具体字段。 类型 显示用户选择字段的类型。 行键 “目的连接”为hbase-connector类型时,需要勾选作为行键的“目的字段”。 如果From是sftp/ftp/obs/hdfs等文件类型连接器,Field Mapping 样值取自文件第一行数据,需要保证第一行数据是完整的,Loader作业不会抽取没有Mapping上的列。 在“任务配置”填写作业的运行参数。 表2 Loader作业运行属性 参数 说明 抽取并发数 设置map任务的个数。 加载(写入)并发数 设置reduce任务的个数。 该参数只有在目的字段为Hbase和Hive时才会显示。 单个分片的最大错误记录数 设置一个错误阈值,如果单个map任务的错误记录超过设置阈值则任务自动结束,已经获取的数据不回退。 说明: “generic-jdbc-connector”的“MYSQL”和“MPPDB”默认批量读写数据,每一批次数据最多只记录一次错误记录。 脏数据目录 设置一个脏数据目录,在出现脏数据的场景中在该目录保存脏数据。如果不设置则不保存。 单击“保存”。
  • hbase-connector 表4 hbase-connector目的连接属性 参数 说明 表名 保存最终数据的HBase表名称,支持通过界面查询并选择。 导入方式 支持BULKLOAD、PUTLIST两种方式导入数据到HBase表。 导入前清空数据 标识是否需要清空目标HBase表中的数据,支持以下两种类型: True:清空表中的数据。 False:不清空表中的数据,选择False时如果表中存在数据,则作业运行会报错。
  • hdfs-connector 表5 hdfs-connector目的连接属性 参数 说明 写入目录 最终数据在HDFS保存时的具体目录。必须指定一个目录。 文件格式 Loader支持HDFS中存储数据的文件格式,默认支持以下两种: CSV_FILE:表示文本格式文件。目的连接为数据库型连接时,只支持文本格式。 BINARY_FILE:表示文本格式以外的二进制文件。 压缩格式 文件在HDFS保存时的压缩行为。支持NONE、DEFLATE、GZIP、BZIP2、LZ4和SNAPPY。 是否覆盖 文件在导入HDFS时对写入目录中原有文件的处理行为,支持以下两种: True:默认清空目录中的文件并导入新文件。 False:不清空文件。如果写入目录中有文件,则作业运行失败。 换行符 最终数据的每行结束标识字符。 说明: hdfs作为目的连接时,当“文件格式”配置为BINARY_FILE时,高级属性中的“换行符”配置无效。 字段分割符 最终数据的每个字段分割标识字符。 说明: hdfs作为目的连接时,当“文件格式”配置为BINARY_FILE时,高级属性中的“字段分割符”配置无效
  • ftp-connector或sftp-connector 表3 ftp-connector或sftp-connector目的连接属性 参数 说明 写入目录 最终数据在文件服务器保存时的具体目录。必须指定一个目录。 文件格式 Loader支持文件服务器中存储数据的文件格式,默认支持以下两种: CSV_FILE:表示文本格式文件。目的连接为数据库型连接时,只支持文本格式。 BINARY_FILE:表示文本格式以外的二进制文件。 换行符 最终数据的每行结束标识字符。 说明: ftp或sftp作为目的连接时,当“文件格式”配置为BINARY_FILE时,高级属性中的“换行符”配置无效。 字段分割符 最终数据的每个字段分割标识字符。 说明: ftp或sftp作为目的连接时,当“文件格式”配置为BINARY_FILE时,高级属性中的“字段分割符”配置无效 编码类型 最终数据的文本编码类型。只对文本类型文件有效。
  • obs-connector 表1 obs-connector目的连接属性 参数 说明 桶名 保存最终数据的OBS文件系统。 写入目录 最终数据在文件系统保存时的具体目录。必须指定一个目录。 文件格式 Loader支持OBS中存储数据的文件格式,默认支持以下两种: CSV_FILE:表示文本格式文件。目的连接为数据库型连接时,只支持文本格式。 BINARY_FILE:表示文本格式以外的二进制文件。 换行符 最终数据的每行结束标识字符。 字段分割符 最终数据的每个字段分割标识字符。 编码类型 最终数据的文本编码类型。只对文本类型文件有效。
  • hdfs-connector 表5 hdfs-connector数据源连接属性 参数 说明 源目录或文件 源数据实际存储的形态,可能是HDFS包含一个目录中的全部数据文件,或者是单个数据文件。 文件格式 Loader支持HDFS中存储数据的文件格式,默认支持以下两种: CSV_FILE:表示文本格式文件。目的连接为数据库型连接时,只支持文本格式。 BINARY_FILE:表示文本格式以外的二进制文件。 换行符 源数据的每行结束标识字符。 说明: hdfs作为源连接时,当“文件格式”配置为BINARY_FILE时,高级属性中的“换行符”配置无效。 字段分割符 源数据的每个字段分割标识字符。 说明: hdfs作为源连接时,当“文件格式”配置为BINARY_FILE时,高级属性中的“字段分割符”配置无效。 文件分割方式 支持以下两种: File:按总文件个数分配map任务处理的文件数量,计算规则为“文件总个数/抽取并发数”。 Size:按文件总大小分配map任务处理的文件大小,计算规则为“文件总大小/抽取并发数”。
  • obs-connector 表1 obs-connector数据源连接属性 参数 说明 桶名 保存源数据的OBS文件系统。 源目录或文件 源数据实际存储的形态,可能是文件系统包含一个目录中的全部数据文件,或者是文件系统包含的单个数据文件。 文件格式 Loader支持OBS中存储数据的文件格式,默认支持以下两种: CSV_FILE:表示文本格式文件。目的连接为数据库型连接时,只支持文本格式。 BINARY_FILE:表示文本格式以外的二进制文件。 换行符 源数据的每行结束标识字符。 字段分割符 源数据的每个字段分割标识字符。 编码类型 源数据的文本编码类型。只对文本类型文件有效。 文件分割方式 支持以下两种: File:按总文件个数分配map任务处理的文件数量,计算规则为“文件总个数/抽取并发数”。 Size:按文件总大小分配map任务处理的文件大小,计算规则为“文件总大小/抽取并发数”。
  • ftp-connector或sftp-connector 表3 ftp-connector或sftp-connector数据源连接属性 参数 说明 源目录或文件 源数据实际存储的形态,可能是文件服务器包含一个目录中的全部数据文件,或者是单个数据文件。 文件格式 Loader支持文件服务器中存储数据的文件格式,默认支持以下两种: CSV_FILE:表示文本格式文件。目的连接为数据库型连接时,只支持文本格式。 BINARY_FILE:表示文本格式以外的二进制文件。 换行符 源数据的每行结束标识字符。 说明: ftp或sftp作为源连接时,当“文件格式”配置为BINARY_FILE时,高级属性中的“换行符”配置无效 字段分割符 源数据的每个字段分割标识字符。 说明: ftp或sftp作为源连接时,当“文件格式”配置为BINARY_FILE时,高级属性中的“字段分割符”配置无效 编码类型 源数据的文本编码类型。只对文本类型文件有效。 文件分割方式 支持以下两种: File:按总文件个数分配map任务处理的文件数量,计算规则为“文件总个数/抽取并发数”。 Size:按文件总大小分配map任务处理的文件大小,计算规则为“文件总大小/抽取并发数”。
  • 关系型数据库连接 关系型数据库连接是Loader与关系型数据库进行数据交换的通道,配置参数如表2所示。 部分参数需要单击“显示高级属性”后展开,否则默认隐藏。 表2 generic-jdbc-connector配置 参数 说明 名称 指定一个Loader连接的名称。 数据库类型 表示Loader连接支持的数据,可以选择“ORACLE”、“MYSQL”和“MPPDB”。 数据库服务器 表示数据库的访问地址,可以是IP地址或者域名。 端口 表示数据库的访问端口。 数据库名称 表示保存数据的具体数据库名。 用户名 表示连接数据库使用的用户名称。 密码 表示此用户对应的密码。需要与实际密码保持一致。
  • OBS连接 OBS连接是Loader与OBS进行数据交换的通道,配置参数如表1所示。 表1 obs-connector配置 参数 说明 名称 指定一个Loader连接的名称。 OBS服务器 输入OBS endpoint地址,一般格式为OBS.Region.DomainName。 例如执行如下命令查看OBS endpoint地址: cat /opt/Bigdata/apache-tomcat-7.0.78/webapps/web/WEB-INF/classes/cloud-obs.properties 端口 访问OBS数据的端口。默认值为“443”。 访问标识(AK) 表示访问OBS的用户的访问密钥AK。 密钥(SK) 表示访问密钥对应的SK。
  • 文件服务器连接 文件服务器连接包含FTP连接和SFTP连接,是Loader与文件服务器进行数据交换的通道,配置参数如表4所示。 表4 ftp-connector或sftp-connector配置 参数 说明 名称 指定一个Loader连接的名称。 主机名或IP 输入文件服务器的访问地址,可以是服务器的主机名或者IP地址。 端口 访问文件服务器的端口。 FTP协议请使用端口“21”。 SFTP协议请使用端口“22”。 用户名 表示文件服务器的用户名称。 密码 表示此用户对应的密码。
  • 补充说明 Flume可靠性保障措施。 Source与Channel、Channel与Sink之间支持事务机制。 Sink Processor支持配置failover、load_balance机制。 例如load_balance示例如下: server.sinkgroups=g1server.sinkgroups.g1.sinks=k1 k2server.sinkgroups.g1.processor.type=load_balanceserver.sinkgroups.g1.processor.backoff=trueserver.sinkgroups.g1.processor.selector=random Flume多客户端聚合级联时的注意事项。 级联时需要走Avro或者Thrift协议进行级联。 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。 Flume客户端可以包含多个独立的数据流,即在一个配置文件properties.properties中配置多个Source、Channel、Sink。这些组件可以链接以形成多个流。 例如在一个配置中配置两个数据流,示例如下: server.sources = source1 source2server.sinks = sink1 sink2server.channels = channel1 channel2#dataflow1 server.sources.source1.channels = channel1server.sinks.sink1.channel = channel1#dataflow2server.sources.source2.channels = channel2server.sinks.sink2.channel = channel2
  • 操作步骤 连接到Spark CarbonData。 根据业务情况,准备好客户端,使用root用户登录安装客户端的节点。 例如在Master2节点更新客户端,则在该节点登录客户端,具体参见使用MRS客户端。 切换用户与配置环境变量。 sudo su - omm source /opt/client/bigdata_env 启用Kerberos认证的集群,执行以下命令认证用户身份。未启用Kerberos认证集群无需执行。 kinit Spark组件用户名 用户需要加入用户组hadoop、hive,主组hadoop。 执行以下命令,连接到Spark运行环境: spark-beeline 执行命令创建CarbonData表。 CarbonData表可用于加载数据和执行查询操作,例如执行以下命令创建CarbonData表: CREATE TABLE x1 (imei string, deviceInformationId int, mac string, productdate timestamp, updatetime timestamp, gamePointId double, contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_EXCLUDE'='mac','DICTIONARY_INCLUDE'='deviceInformationId'); 命令执行结果如下: +---------+--+| result |+---------+--++---------+--+No rows selected (1.551 seconds) 从CSV文件加载数据到CarbonData表。 根据所要求的参数运行命令从CSV文件加载数据,且仅支持CSV文件。LOAD命令中配置的CSV列名,需要和CarbonData表列名相同,顺序也要对应。CSV文件中的数据的列数,以及数据格式需要和CarbonData表匹配。 文件需要保存在HDFS中。用户可以将文件上传到OBS,并在MRS管理控制台“文件管理”将文件从OBS导入HDFS,具体请参考导入导出数据。 如果集群启用了Kerberos认证,则需要在工作环境准备CSV文件,然后可以使用开源HDFS命令,参考5将文件从工作环境导入HDFS,并设置Spark组件用户在HDFS中对文件有读取和执行的权限。 例如,HDFS的“tmp”目录有一个文件“data.csv”,内容如下: x123,111,dd,2017-04-20 08:51:27,2017-04-20 07:56:51,2222,33333 执行导入命令: LOAD DATA inpath 'hdfs://hacluster/tmp/data.csv' into table x1 options('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei, deviceinformationid,mac,productdate,updatetime,gamepointid,contractnumber'); 命令执行结果如下: +---------+--+| Result |+---------+--++---------+--+No rows selected (3.039 seconds) 在CarbonData中查询数据。 获取记录数 为了获取在CarbonData table中的记录数,可以执行以下命令。 select count(*) from x1; 使用Groupby查询 为了获取不重复的“deviceinformationid”记录数,可以执行以下命令。 select deviceinformationid,count (distinct deviceinformationid) from x1 group by deviceinformationid; 使用条件查询 为了获取特定deviceinformationid的记录,可以执行以下命令。 select * from x1 where deviceinformationid='111'; 在执行数据查询操作后,如果查询结果中某一列的结果含有中文字等其他非英文字符,会导致查询结果中的列不能对齐,这是由于不同语言的字符在显示时所占的字宽不尽相同。 执行以下命令退出Spark运行环境。 !quit
  • Flume客户端介绍 Flume客户端由Source、Channel、Sink组成,数据先进入Source然后传递到Channel,最后由Sink发送到客户端外部。各模块说明见表1。 表1 模块说明 名称 说明 Source Source负责接收数据或产生数据,并将数据批量放到一个或多个Channel。Source有两种类型:数据驱动和轮询。 典型的Source样例如下: 和系统集成并接收数据的Sources:Syslog、Netcat。 自动生成事件数据的Sources:Exec、SEQ。 用于Agent和Agent之间通信的IPC Sources:Avro。 Source必须至少和一个Channel关联。 Channel Channel位于Source和Sink之间,用于缓存Source传递的数据,当Sink成功将数据发送到下一跳的Channel或最终数据处理端,缓存数据将自动从Channel移除。 不同类型的Channel提供的持久化水平也是不一样的: Memory Channel:非持久化 File Channel:基于预写式日志(Write-Ahead Logging,简称WAL)的持久化实现 JDBC Channel:基于嵌入Database的持久化实现 Channel支持事务特性,可保证简易的顺序操作,同时可以配合任意数量的Source和Sink共同工作。 Sink Sink负责将数据传输到下一跳或最终目的,成功完成后将数据从Channel移除。 典型的Sink样例如下: 存储数据到最终目的终端Sink,比如:HDFS、Kafka 自动消耗的Sinks,比如:Null Sink 用于Agent和Agent之间通信的IPC sink:Avro Sink必须关联到一个Channel。 Flume客户端可以配置成多个Source、Channel、Sink,即一个Source将数据发送给多个Channel,再由多个Sink发送到客户端外部。 Flume还支持多个Flume客户端配置级联,即Sink将数据再发送给Source。
共100000条