华为云用户手册

  • 安装MRS集群客户端 MRS集群创建成功后,用户可安装集群客户端用于连接集群内各组件服务,进行作业提交等操作。 客户端可以安装在集群内的节点上,也可以安装在集群外的节点上。本指南以在Master1节点上安装客户端为例进行介绍。 MRS集群创建成功后,在集群列表中单击MRS集群名称进入集群概览页面。 单击“集群管理页面 ”后的“前往 Manager”,在弹出的窗口中选择“EIP访问”并配置弹性IP信息。 首次访问时,可点击“管理弹性公网IP”,在弹性公网IP控制台购买一个弹性公网IP,购买成功后刷新弹性公网IP列表并选择。 勾选确认信息后,单击“确定”,登录集群的FusionInsight Manager管理界面。 Manager登录用户名为admin,密码为购买集群时配置的自定义密码。 在“主页”页签的集群名称后单击,单击“下载客户端”下载集群客户端。 图2 下载客户端 在“下载集群客户端”弹窗中,“选择平台类型”必须与待安装节点的架构匹配,例如“x86_64”。同时勾选“仅保存到如下路径”。 图3 下载集群客户端提示框 单击“确定”后,等待客户端软件生成成功。 在MRS服务管理控制台的集群列表中,单击集群名称,在集群的“节点管理”页签,单击包含“master1”的节点名称,在ECS详情页面可通过远程登录方式登录对应节点。 图4 查看Master1节点 使用root用户登录Master1节点,密码为购买集群时配置的自定义密码。 cd /tmp/FusionInsight-Client/ tar -xvf FusionInsight_Cluster_1_Services_Client.tar tar -xvf FusionInsight_Cluster_1_Services_ClientConfig.tar 进入安装包所在目录,安装客户端。 cd FusionInsight_Cluster_1_Services_ClientConfig 执行如下命令安装客户端到指定目录(绝对路径),例如安装到“/opt/client”目录,等待客户端安装完成。 ./install.sh /opt/client ... ... component client is installed successfully ...
  • 准备hadoop应用程序及数据 wordcount是最经典的Hadoop作业,它用来统计海量文本的单词数量。 MRS集群创建成功后,可获取集群客户端内的wordcount样例程序进行运行,也可准备上层业务自行开发的大数据应用程序并上传至集群。 准备wordcount程序。 以root用户登录MRS客户端所在节点。 执行以下命令切换到客户端安装目录,配置环境变量并创建HDFS目录,例如“/user/example”。 cd /opt/client source bigdata_env hdfs dfs -mkdir /user/example hdfs dfs -mkdir /user/example/input 准备数据文件。 数据文件无格式要求,例如文件名为“wordcount1.txt”和“wordcount2.txt”,内容如下所示: vi /opt/wordcount1.txt hello word hello wordcount vi /opt/wordcount2.txt hello mapreduce hello hadoop 执行以下命令,将样例数据上传至HDFS。 hdfs dfs -put /opt/wordcount1.txt /user/example/input hdfs dfs -put /opt/wordcount2.txt /user/example/input
  • 购买集群 登录华为云控制台。 在服务列表中搜索“MapReduce服务”,进入MRS服务管理控制台。 单击“购买集群”,进入“购买集群”页面,选择“快速购买”页签。 根据实际业务规划情况填写集群配置信息(以下参数仅供参考,可根据实际情况调整)。 表1 MRS集群配置参数 参数项 取值 区域 华北-北京四 计费模式 按需计费 集群名称 填写“mrs_demo”或按命名规范命名。 集群类型 选择“自定义”即可。 版本类型 普通版 集群版本 MRS 3.1.5 组件选择 Hadoop分析集群 可用区 可用区1 企业项目 default 虚拟私有云 建议保持默认,例如“vpc-default”。 子网 建议保持默认,例如“subnet-default”。 集群节点 建议保持默认,可根据实际情况调整。 Kerberos认证 不开启 用户名 admin/root 密码 设置密码登录集群管理页面及ECS节点用户的密码,例如:Test!@12345。 确认密码 再次输入设置用户密码。 通信安全授权 勾选确认授权。 图1 购买Hadoop分析集群 单击“立即购买”,进入任务提交成功页面。 单击“返回集群列表”,在“现有集群”列表中可以查看到集群创建的状态。 集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。
  • 安装MRS集群客户端 MRS集群创建成功后,用户可安装集群客户端用于连接集群内各组件服务,进行作业提交等操作。 客户端可以安装在集群内的节点上,也可以安装在集群外的节点上。本指南以在Master1节点上安装客户端为例进行介绍。 MRS集群创建成功后,在集群列表中单击MRS集群名称进入集群概览页面。 单击“集群管理页面 ”后的“前往 Manager”,在弹出的窗口中选择“EIP访问”并配置弹性IP信息。 首次访问时,可点击“管理弹性公网IP”,在弹性公网IP控制台购买一个弹性公网IP,购买成功后刷新弹性公网IP列表并选择。 勾选确认信息后,单击“确定”,登录集群的FusionInsight Manager管理界面。 Manager登录用户名为admin,密码为购买集群时配置的自定义密码。 在“主页”页签的集群名称后单击,单击“下载客户端”下载集群客户端。 图2 下载客户端 在“下载集群客户端”弹窗中,“选择平台类型”必须与待安装节点的架构匹配,例如“x86_64”。同时勾选“仅保存到如下路径”。 图3 下载集群客户端提示框 单击“确定”后,等待客户端软件生成成功。 在MRS服务管理控制台的集群列表中,单击集群名称,在集群的“节点管理”页签,单击包含“master1”的节点名称,在ECS详情页面可通过远程登录方式登录对应节点。 图4 查看Master1节点 使用root用户登录Master1节点,密码为购买集群时配置的自定义密码。 cd /tmp/FusionInsight-Client/ tar -xvf FusionInsight_Cluster_1_Services_Client.tar tar -xvf FusionInsight_Cluster_1_Services_ClientConfig.tar 进入安装包所在目录,安装客户端。 cd FusionInsight_Cluster_1_Services_ClientConfig 执行如下命令安装客户端到指定目录(绝对路径),例如安装到“/opt/client”目录,等待客户端安装完成。 ./install.sh /opt/client ... ... component client is installed successfully ...
  • 购买集群 登录华为云控制台。 在服务列表中搜索“MapReduce服务”,进入MRS服务管理控制台。 单击“购买集群”,进入“购买集群”页面,选择“快速购买”页签。 根据实际业务规划情况填写集群配置信息(以下参数仅供参考,可根据实际情况调整)。 表1 MRS集群配置参数 参数项 取值 区域 华北-北京四 计费模式 按需计费 集群名称 填写“mrs_demo”或按命名规范命名。 集群类型 选择“自定义”即可。 版本类型 LTS版 集群版本 MRS 3.2.0-LTS.1 组件选择 HBase查询集群 可用区 可用区1 企业项目 default 虚拟私有云 建议保持默认,例如“vpc-default”。 子网 建议保持默认,例如“subnet-default”。 集群节点 建议保持默认,可根据实际情况调整。 Kerberos认证 不开启 用户名 admin/root 密码 设置密码登录集群管理页面及ECS节点用户的密码,例如:Test!@12345。 确认密码 再次输入设置用户密码。 通信安全授权 勾选确认授权。 图1 购买流式分析集群 单击“立即购买”,进入任务提交成功页面。 单击“返回集群列表”,在“现有集群”列表中可以查看到集群创建的状态。 集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。
  • 执行Spark程序 本小节提供执行Spark程序的操作指导,旨在指导用户在安全集群模式下运行程序。 前提条件 已编译好待运行的程序及对应的数据文件,如FemaleInfoCollection.jar、input_data1.txt和input_data2.txt,Spark程序开发及数据准备请参见Spark应用开发。 操作步骤 采用远程登录软件(比如:MobaXterm)通过ssh登录(使用集群弹性IP登录)到安全集群的master节点。 登录成功后分别执行下列命令,在“/opt/Bigdata/client”目录下创建test文件夹,在test目录下创建conf文件夹: cd /opt/Bigdata/client mkdir test cd test mkdir conf 使用上传工具(比如:WinScp)将样FemaleInfoCollection.jar、input_data1.txt和input_data2.txt复制到test目录下,将“创建角色和用户”中的步骤5获得的keytab文件和krb5.conf文件复制到conf目录。 执行如下命令配置环境变量并认证已创建用户,例如test。 cd /opt/Bigdata/client source bigdata_env export YARN_USER_CLASSPATH=/opt/Bigdata/client/test/conf/ kinit test 然后按照提示输入密码,无异常提示返回,则完成了用户的kerberos认证。 执行如下命令将数据导入到HDFS中: cd test hdfs dfs -mkdir /tmp/input hdfs dfs -put input_data* /tmp/input 执行如下命令运行程序: cd /opt/Bigdata/client/Spark/spark bin/spark-submit --class com.huawei.bigdata.spark.examples.FemaleInfoCollection --master yarn-client /opt/Bigdata/client/test/FemaleInfoCollection-1.0.jar /tmp/input 程序运行成功后,会显示如下: 图14 程序运行结果
  • 执行MapReduce程序 本小节提供执行MapReduce程序的操作指导,旨在指导用户在安全集群模式下运行程序。 前提条件 已编译好待运行的程序及对应的数据文件,如mapreduce-examples-1.0.jar、input_data1.txt和input_data2.txt,MapReduce程序开发及数据准备请参见MapReduce应用开发。 操作步骤 使用远程登录软件(例如:MobaXterm)通过ssh登录(使用集群弹性IP登录)到安全集群的Master节点。 登录成功后分别执行下列命令,在“/opt/Bigdata/client”目录下创建test文件夹,在test目录下创建conf文件夹: cd /opt/Bigdata/client mkdir test cd test mkdir conf 使用上传工具(比如:WinScp)将mapreduce-examples-1.0.jar、input_data1.txt和input_data2.txt复制到test目录下,将“创建角色和用户”中的步骤5获得的keytab文件和krb5.conf文件复制到conf目录。 执行如下命令配置环境变量并认证已创建用户,例如test。 cd /opt/Bigdata/client source bigdata_env export YARN_USER_CLASSPATH=/opt/Bigdata/client/test/conf/ kinit test 然后按照提示输入密码,无异常提示返回(首次登录需按照系统提示修改密码),则完成了用户的kerberos认证。 执行如下命令将数据导入到HDFS中: cd test hdfs dfs -mkdir /tmp/input hdfs dfs -put input_data* /tmp/input 执行如下命令运行程序: yarn jar mapreduce-examples-1.0.jar com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector /tmp/input /tmp/mapreduce_output 其中: /tmp/input指HDFS文件系统中input的路径。 /tmp/mapreduce_output指HDFS文件系统中output的路径,该目录必须不存在,否则会报错。 程序运行成功后,执行命令hdfs dfs -ls /tmp/mapreduce_output会显示如下: 图13 查看程序运行结果
  • 执行Hive程序 本小节提供执行Hive程序的操作指导,旨在指导用户在安全集群模式下运行程序。 前提条件 已编译好待运行的程序及对应的数据文件,如hive-examples-1.0.jar、input_data1.txt和input_data2.txt,Hive程序开发及数据准备请参见Hive应用开发。 操作步骤 采用远程登录软件(比如:MobaXterm)通过ssh登录(使用集群弹性IP登录)到安全集群的master节点。 登录成功后分别执行下列命令,在“/opt/Bigdata/client”目录下创建test文件夹,在test目录下创建conf文件夹: cd /opt/Bigdata/client mkdir test cd test mkdir conf 使用上传工具(比如:WinScp)将样FemaleInfoCollection.jar、input_data1.txt和input_data2.txt复制到test目录下,将“创建角色和用户”中的步骤5获得的keytab文件和krb5.conf文件复制到conf目录。 执行如下命令配置环境变量并认证已创建用户,例如test。 cd /opt/Bigdata/client source bigdata_env export YARN_USER_CLASSPATH=/opt/Bigdata/client/test/conf/ kinit test 然后按照提示输入密码,无异常提示返回,则完成了用户的kerberos认证。 执行如下命令运行程序: chmod +x /opt/hive_examples -R cd /opt/hive_examples java -cp .:hive-examples-1.0.jar:/opt/hive_examples/conf:/opt/Bigdata/client/Hive/Beeline/lib/*:/opt/Bigdata/client/HDFS/hadoop/lib/* com.huawei.bigdata.hive.example.ExampleMain 程序运行成功后,会显示如下: 图15 程序运行的结果
  • 创建安全集群并登录其Manager 创建安全集群,请参见创建集群页面,开启“Kerberos认证”参数开关,并配置“密码”、“确认密码”参数。该密码用于登录Manager,请妥善保管。 图1 安全集群参数配置 登录MRS管理控制台页面。 在“现有集群”列表,单击指定的集群名称,进入集群信息页面。 单击“集群管理页面”后的“前往Manager”,打开Manager页面。 若用户创建集群时已经绑定弹性公网IP,如图2所示。 添加安全组规则,默认填充的是用户访问公网IP地址9022端口的规则。如需对安全组规则进行查看,修改和删除操作,请单击“管理安全组规则”。 自动获取的访问公网IP与用户本机IP不一致,属于正常现象,无需处理。 9022端口为knox的端口,需要开启访问knox的9022端口权限,才能访问Manager服务。 勾选“我确认xx.xx.xx.xx为可信任的公网访问IP,并允许从该IP访问MRS Manager页面。” 图2 访问Manager页面 若用户创建集群时暂未绑定弹性公网IP,如图3所示。 在弹性公网IP下拉框中选择可用的弹性公网IP或单击“管理弹性公网IP”购买弹性公网IP。 添加安全组规则,默认填充的是用户访问公网IP地址9022端口的规则。如需对安全组规则进行查看,修改和删除操作,请单击“管理安全组规则”。 自动获取的访问公网IP与用户本机IP不一致,属于正常现象,无需处理。 9022端口为knox的端口,需要开启访问knox的9022端口权限,才能访问Manager服务。 勾选“我确认xx.xx.xx.xx为可信任的公网访问IP,并允许从该IP访问MRS Manager页面。” 图3 访问Manager页面设置 单击“确定”,进入Manager登录页面,如需给其他用户开通访问Manager的权限,请参见访问Manager章节,添加对应用户访问公网的IP地址为可信范围。 输入创建集群时默认的用户名“admin”及设置的密码,单击“登录”进入Manager页面。
  • 购买集群 登录华为云控制台。 在服务列表中搜索“MapReduce服务”,进入MRS服务管理控制台。 单击“购买集群”,进入“购买集群”页面,选择“快速购买”页签。 根据实际业务规划情况填写集群配置信息(以下参数仅供参考,可根据实际情况调整)。 表1 MRS集群配置参数 参数项 取值 区域 华北-北京四 计费模式 按需计费 集群名称 填写“mrs_demo”或按命名规范命名。 集群类型 选择“自定义”即可。 版本类型 LTS版 集群版本 MRS 3.2.0-LTS.1 组件选择 实时分析集群 可用区 可用区1 企业项目 default 虚拟私有云 建议保持默认,例如“vpc-default”。 子网 建议保持默认,例如“subnet-default”。 集群节点 建议保持默认,可根据实际情况调整。 Kerberos认证 不开启 用户名 admin/root 密码 设置密码登录集群管理页面及ECS节点用户的密码,例如:Test!@12345。 确认密码 再次输入设置用户密码。 通信安全授权 勾选确认授权。 图1 购买流式分析集群 单击“立即购买”,进入任务提交成功页面。 单击“返回集群列表”,在“现有集群”列表中可以查看到集群创建的状态。 集群创建需要时间,所创集群的初始状态为“启动中”,创建成功后状态更新为“运行中”,请您耐心等待。
  • 安装MRS集群客户端 MRS集群创建成功后,用户可安装集群客户端用于连接集群内各组件服务,进行作业提交等操作。 客户端可以安装在集群内的节点上,也可以安装在集群外的节点上。本指南以在Master1节点上安装客户端为例进行介绍。 MRS集群创建成功后,在集群列表中单击MRS集群名称进入集群概览页面。 单击“集群管理页面 ”后的“前往 Manager”,在弹出的窗口中选择“EIP访问”并配置弹性IP信息。 首次访问时,可点击“管理弹性公网IP”,在弹性公网IP控制台购买一个弹性公网IP,购买成功后刷新弹性公网IP列表并选择。 勾选确认信息后,单击“确定”,登录集群的FusionInsight Manager管理界面。 Manager登录用户名为admin,密码为购买集群时配置的自定义密码。 在“主页”页签的集群名称后单击,单击“下载客户端”下载集群客户端。 图2 下载客户端 在“下载集群客户端”弹窗中,“选择平台类型”必须与待安装节点的架构匹配,例如“x86_64”。同时勾选“仅保存到如下路径”。 图3 下载集群客户端提示框 单击“确定”后,等待客户端软件生成成功。 在MRS服务管理控制台的集群列表中,单击集群名称,在集群的“节点管理”页签,单击包含“master1”的节点名称,在ECS详情页面可通过远程登录方式登录对应节点。 图4 查看Master1节点 使用root用户登录Master1节点,密码为购买集群时配置的自定义密码。 cd /tmp/FusionInsight-Client/ tar -xvf FusionInsight_Cluster_1_Services_Client.tar tar -xvf FusionInsight_Cluster_1_Services_ClientConfig.tar 进入安装包所在目录,安装客户端。 cd FusionInsight_Cluster_1_Services_ClientConfig 执行如下命令安装客户端到指定目录(绝对路径),例如安装到“/opt/client”目录,等待客户端安装完成。 ./install.sh /opt/client ... ... component client is installed successfully ...
  • 入门实践 当完成MRS集群部署后,可以根据自身的业务需求使用MRS提供的一系列常用实践。 表1 MRS常用最佳实践 实践 描述 集群管理 安装并使用MRS集群客户端 在MRS集群创建成功后集群用户需安装MRS集群组件的(不包含Flume)的客户端,通过客户端连接集群内组件。 提交Spark任务到新增Task节点 MRS自定义类型集群可以通过增加Task节点,提升计算能力。集群Task节点主要用于处理数据,不存放持久数据。 通过租户资源绑定新增的Task节点,可实现提交Spark任务到新增的Task节点。 配置MRS集群阈值类告警 MRS集群提供可视化、便捷的监控告警功能。用户可以快速获取集群关键性能指标,并评测集群健康状态。 如果部分阈值类监控告警经评估后对业务影响可忽略、或告警阈值可进行调整,用户也可以根据需要自定义集群监控指标,或屏蔽对应告警,使告警不再上报。 数据分析 使用Spark2x实现车联网车主驾驶行为分析 本实践指导使用Spark实现车主驾驶行为分析。用于了解MRS的基本功能,利用MRS服务的Spark2x组件,对车主的驾驶行为进行分析统计,得到用户驾驶行为的分析结果,分析统计指定时间段内,车主急加速、急减速、空挡滑行、超速、疲劳驾驶等违法行为的次数。 使用Hive加载HDFS数据并分析图书评分情况 本实践指导使用Hive对原始数据进行导入、分析等操作,展示了如何构建弹性、低成本的离线大数据分析。以某图书网站后台用户的点评数据为原始数据,导入Hive表后通过SQL命令筛选出最受欢迎的畅销图书。 使用Hive加载OBS数据并分析企业雇员信息 本实践指导使用Hive对OBS中存储的原始数据进行导入、分析等操作,展示了如何构建弹性、低成本的存算分离大数据分析。以用户开发一个Hive数据分析应用为例,通过客户端连接Hive后,执行HQL语句访问OBS中的Hive数据。进行企业雇员信息的管理、查询。 通过Flink作业处理OBS数据 本实践指导使用MRS集群内置的Flink WordCount作业程序,来分析OBS文件系统中保存的源数据,以统计源数据中的单词出现次数。 MRS支持在大数据存储容量大、计算资源需要弹性扩展的场景下,用户将数据存储在OBS服务中,使用MRS集群仅做数据计算处理的存算分离模式。 数据迁移 数据迁移方案介绍 本迁移指导适用于多种不同场景下的HDFS、HBase、Hive数据向MRS集群的迁移工作。 介绍数据迁移前的准备工作、元数据导出、数据拷贝、数据恢复等内容。 Hadoop数据迁移到华为云MRS服务 本实践使用华为云CDM服务将Hadoop集群中的数据(支持数据量在几十TB级别或以下的数据量级)迁移到华为云MRS服务。 HBase数据迁移到华为云MRS服务 本实践使用华为云CDM服务将HBase集群中的数据(支持数据量在几十TB级别或以下的数据量级)迁移到华为云MRS服务。HBase会把数据存储在HDFS上,主要包括HFile文件和WAL文件,由配置项“hbase.rootdir”指定在HDFS上的路径,华为云MRS的默认存储位置是“/hbase”文件夹下。 HBase自带的一些机制和工具命令也可以实现数据搬迁,例如:通过导出Snapshots快照、Export/Import、CopyTable方式等。 Hive数据迁移到华为云MRS服务 本实践使用华为云CDM服务将Hive集群中的数据(支持数据量在几十TB级别或以下的数据量级)迁移到华为云MRS服务。 Hive数据迁移分两部分内容: Hive的元数据信息,存储在MySQL等数据库中。MRS Hive集群的元数据会默认存储到MRS DBService(华为的Gaussdb数据库),也可以选择RDS(MySQL)作为外置元数据库。 Hive的业务数据,存储在HDFS文件系统或OBS对象存储中。 MySQL数据迁移到MRS集群Hive分区表 本实践使用CDM云服务将MySQL数据导入到MRS集群内的Hive分区表中。 Hive提供类SQL查询语言,帮助用户对大规模的数据进行提取、转换和加载,即通常所称的ETL(Extraction,Transformation,and Loading)操作。对庞大的数据集查询需要耗费大量的时间去处理,在许多场景下,可以通过建立Hive分区方法减少每一次扫描的总数据量,这种做法可以显著地改善性能。 MRS HDFS数据迁移到OBS 本实践以MRS HDFS数据迁移到OBS为例,介绍如何通过CDM将文件类数据迁移到文件中。 系统对接 使用DBeaver访问Phoenix 本实践介绍如何使用DBeaver访问Phoenix。 使用DBeaver访问HetuEngine 本实践介绍如何使用DBeaver访问HetuEngine。 Hive对接外置自建关系型数据库 本实践介绍如何使用Hive对接开源MySQL和Postgres数据库。 在已有Hive数据的集群上外置元数据库后,之前的元数据表不会自动同步。因此在安装Hive之初就要确认好元数据是外置数据库还是内置到DBService,如果是外置自建数据库,则需在安装Hive时或者暂无Hive数据时将元数据外置,安装后不允许修改,否则将会造成原有元数据丢失。 Hive对接CSS服务 本实践介绍如何使用Hive对接CSS的Elasticsearch服务。 利用Elasticsearch-Hadoop插件,完成Hive和CSS服务的Elasticsearch直接的数据交互,通过Hive外部表的方式,可以快速将Elasticsearch索引数据映射到Hive表中。
  • 前提条件 已安装客户端时: 已安装HBase客户端。 当客户端所在主机不是集群中的节点时,需要在客户端所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。 未安装HBase客户端时: Linux环境已安装JDK,版本号需要和IntelliJ IDEA导出Jar包使用的JDK版本一致。 当Linux环境所在主机不是集群中的节点时,需要在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。
  • 通过Java API提交Oozie作业开发思路 通过典型场景,用户可以快速学习和掌握Oozie的开发过程,并且对关键的接口函数有所了解。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,代码示例只涉及了MapReduce作业,其他作业的API调用代码是一样的,只是job配置“job.properties”与工作流配置“workflow.xml”不一样。 完成导入并配置Oozie样例工程操作后即可执行通过Java API提交MapReduce作业和查询作业状态。 父主题: 通过Java API提交Oozie作业
  • MapReduce简介 Hadoop MapReduce是一个使用简易的并行计算软件框架,基于它写出来的应用程序能够运行在由上千个服务器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 一个MapReduce作业(application/job)通常会把输入的数据集切分为若干独立的数据块,由map任务(task)以完全并行的方式来处理。框架会对map的输出先进行排序,然后把结果输入给reduce任务,最后返回给客户端。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。 MapReduce主要特点如下: 大规模并行计算 适用于大型数据集 高容错性和高可靠性 合理的资源调度
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluser模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • MapReduce REST API接口介绍 功能简介 通过HTTP REST API来查看更多MapReduce任务的信息。目前Mapresuce的REST接口可以查询已完成任务的状态信息。完整和详细的接口请直接参考官网上的描述以了解其使用:http://hadoop.apache.org/docs/r3.1.1/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/HistoryServerRest.html 准备运行环境 在节点上安装客户端,例如安装到“/opt/client”目录,可参考“安装客户端”。 进入客户端安装目录“/opt/client”,执行下列命令初始化环境变量。 source bigdata_env 操作步骤 获取MapReduce上已完成任务的具体信息 命令: curl -k -i --negotiate -u : "http://10.120.85.2:19888/ws/v1/history/mapreduce/jobs" 其中10.120.85.2为MapReduce的“JHS_FLOAT_IP”参数的参数值,19888为JobHistoryServer的端口号。 在部分低版本操作系统中使用curl命令访问JobHistoryServer会有兼容性问题,导致无法返回正确结果。 用户能看到历史任务的状态信息(任务id,开始时间,结束时间,是否执行成功等信息) 运行结果 { "jobs":{ "job":[ { "submitTime":1525693184360, "startTime":1525693194840, "finishTime":1525693215540, "id":"job_1525686535456_0001", "name":"QuasiMonteCarlo", "queue":"default", "user":"mapred", "state":"SUCCEEDED", "mapsTotal":1, "mapsCompleted":1, "reducesTotal":1, "reducesCompleted":1 } ] } } 结果分析: 通过这个接口,可以查询当前集群中已完成的MapReduce任务,并且可以得到表1 表1 常用信息 参数 参数描述 submitTime 任务提交时间 startTime 任务开始执行时间 finishTime 任务执行完成时间 queue 任务队列 user 提交这个任务的用户 state 任务执行成功或失败 父主题: MapReduce接口介绍
  • 前提条件 已安装客户端时: 已安装HBase客户端。 当客户端所在主机不是集群中的节点时,需要在客户端所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。 未安装HBase客户端时: Linux环境已安装JDK,版本号需要和IntelliJ IDEA导出Jar包使用的JDK版本一致。 当Linux环境所在主机不是集群中的节点时,需要在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。
  • MapReduce简介 Hadoop MapReduce是一个使用简易的并行计算软件框架,基于它写出来的应用程序能够运行在由上千个服务器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 一个MapReduce作业(application/job)通常会把输入的数据集切分为若干独立的数据块,由map任务(task)以完全并行的方式来处理。框架会对map的输出先进行排序,然后把结果输入给reduce任务,最后返回给客户端。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。 MapReduce主要特点如下: 大规模并行计算 适用于大型数据集 高容错性和高可靠性 合理的资源调度
  • 查看Windows调测结果 Doris应用程序运行完成后,可通过如下方式查看运行情况。 通过IntelliJ IDEA运行结果查看应用程序运行情况。 通过Doris日志获取应用程序运行情况。 各样例程序运行结果如下: “doris-jdbc-example”样例运行成功后,显示信息如下: 2023-08-17 23:13:13,473 | INFO | main | Start execute doris example. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:41) 2023-08-17 23:13:13,885 | INFO | main | Start create database. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:44) 2023-08-17 23:13:13,949 | INFO | main | Database created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:46) 2023-08-17 23:13:13,950 | INFO | main | Start create table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:49) 2023-08-17 23:13:14,132 | INFO | main | Table created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:51) 2023-08-17 23:13:14,133 | INFO | main | Start to insert data into the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:54) 2023-08-17 23:13:14,733 | INFO | main | Inserting data to the table succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:56) 2023-08-17 23:13:14,733 | INFO | main | Start to query table data. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:59) 2023-08-17 23:13:15,079 | INFO | main | Start to print query result. | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:121) 2023-08-17 23:13:15,079 | INFO | main | c1 c2 c3 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:126) 2023-08-17 23:13:15,079 | INFO | main | 0 0 0 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 1 10 100 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 2 20 200 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 3 30 300 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 4 40 400 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 5 50 500 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 6 60 600 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 7 70 700 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 8 80 800 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 9 90 900 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | Querying table data succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:61) 2023-08-17 23:13:15,081 | INFO | main | Start to delete the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:64) 2023-08-17 23:13:15,114 | INFO | main | Table deleted successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:66) 2023-08-17 23:13:15,124 | INFO | main | Doris example execution successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:71) Process finished with exit code 0 Doris对接SpringBoot运行结果 在浏览器中访问链接“http://样例运行节点IP地址:8080/doris/example/executesql”,IDEA正常打印日志,请求返回如下图所示: 图7 返回样例运行信息
  • 查看Linux调测结果 “doris-jdbc-example”样例运行成功后,显示信息如下: 2023-08-17 23:13:13,473 | INFO | main | Start execute doris example. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:41) 2023-08-17 23:13:13,885 | INFO | main | Start create database. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:44) 2023-08-17 23:13:13,949 | INFO | main | Database created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:46) 2023-08-17 23:13:13,950 | INFO | main | Start create table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:49) 2023-08-17 23:13:14,132 | INFO | main | Table created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:51) 2023-08-17 23:13:14,133 | INFO | main | Start to insert data into the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:54) 2023-08-17 23:13:14,733 | INFO | main | Inserting data to the table succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:56) 2023-08-17 23:13:14,733 | INFO | main | Start to query table data. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:59) 2023-08-17 23:13:15,079 | INFO | main | Start to print query result. | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:121) 2023-08-17 23:13:15,079 | INFO | main | c1 c2 c3 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:126) 2023-08-17 23:13:15,079 | INFO | main | 0 0 0 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 1 10 100 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 2 20 200 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 3 30 300 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 4 40 400 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 5 50 500 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 6 60 600 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 7 70 700 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 8 80 800 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 9 90 900 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | Querying table data succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:61) 2023-08-17 23:13:15,081 | INFO | main | Start to delete the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:64) 2023-08-17 23:13:15,114 | INFO | main | Table deleted successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:66) 2023-08-17 23:13:15,124 | INFO | main | Doris example execution successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:71) Doris对接SpringBoot运行结果 在浏览器中访问链接“http://样例运行节点IP地址:8080/doris/example/executesql”,IDEA正常打印日志,请求返回如下图所示: 图3 返回样例运行信息
  • 打印metric信息说明 表2 基本指标项 Metric名称 描述 日志级别 total_request_count 周期时间内查询总次数 INFO active_success_count 周期时间内主集群查询成功次数 INFO active_error_count 周期时间内主集群查询失败次数 INFO active_timeout_count 周期时间内主集群查询超时次数 INFO standby_success_count 周期时间内备集群查询成功次数 INFO standby_error_count 周期时间内备集群查询失败次数 INFO Active Thread pool 周期打印请求主集群的执行线程池信息 DEBUG Standby Thread pool 周期打印请求备集群的执行线程池信息 DEBUG Clear Thread pool 周期打印释放资源的执行线程池信息 DEBUG 表3 针对GET、BatchGET、SCAN请求,分别打印Histogram指标项 Metric名称 描述 日志级别 averageLatency(ms) 平均时延 INFO minLatency(ms) 最小时延 INFO maxLatency(ms) 最大时延 INFO 95thPercentileLatency(ms) 95%请求的最大时延 INFO 99thPercentileLatency(ms) 99%请求的最大时延 INFO 99.9PercentileLatency(ms) 99.9%请求的最大时延 INFO 99.99PercentileLatency(ms) 99.99%请求的最大时延 INFO
  • 将主备集群相关配置设置到HBaseMultiClusterConnection中 该操作仅适用于MRS 3.3.0及之后版本。 创建双读Configuration,取消“com.huawei.bigdata.hbase.examples”包的“TestMain”类main方法中的testHBaseDualReadSample注释,确保“com.huawei.bigdata.hbase.examples”包的“HBaseDualReadSample”类中的“IS_CREATE_CONNECTION_BY_XML”值为“false”。 在“HBaseDualReadSample”类的addHbaseDualXmlParam方法中添加相关配置,相关配置项可参考HBase双读操作相关配置项说明。 private void addHbaseDualXmlParam(Configuration conf) { // We need to set the optional parameters contained in hbase-dual.xml to conf // when we use configuration transfer solution conf.set(CONNECTION_IMPL_KEY, DUAL_READ_CONNECTION); // conf.set("", ""); } 在“HBaseDualReadSample”类的initActiveConf方法中添加主集群客户端相关配置: private void initActiveConf() { // The hbase-dual.xml configuration scheme is used to generate the client configuration of the active cluster. // In actual application development, you need to generate the client configuration of the active cluster. String activeDir = HBaseDualReadSample.class.getClassLoader().getResource(Utils.CONF_DIRECTORY).getPath() + File.separator + ACTIVE_DIRECTORY + File.separator; Configuration activeConf = Utils.createConfByUserDir(activeDir); HBaseMultiClusterConnection.setActiveConf(activeConf); } 在“HBaseDualReadSample”类initStandbyConf方法中添加备集群客户端相关配置: private void initStandbyConf() { // The hbase-dual.xml configuration scheme is used to generate the client configuration of the standby cluster. // In actual application development, you need to generate the client configuration of the standby cluster. String standbyDir = HBaseDualReadSample.class.getClassLoader().getResource(Utils.CONF_DIRECTORY).getPath() + File.separator + STANDBY_DIRECTORY + File.separator; Configuration standbyConf = Utils.createConfByUserDir(standbyDir); HBaseMultiClusterConnection.setStandbyConf(standbyConf); } 确定数据来源的集群。 GET请求,以下代码片段在“com.huawei.bigdata.hbase.examples”包的“HBaseSample”类的testGet方法中添加。 Result result = table.get(get); if (result instanceof DualResult) { LOG.info(((DualResult)result).getClusterId()); } Scan请求,以下代码片段在“com.huawei.bigdata.hbase.examples”包的“HBaseSample”类的testScanData方法中添加。 ResultScanner rScanner = table.getScanner(scan); if (rScanner instanceof HBaseMultiScanner) { LOG.info(((HBaseMultiScanner)rScanner).getClusterId()); } 客户端支持打印metric信息。 “log4j.properties”文件中增加如下内容,客户端将metric信息输出到指定文件。指标项信息可参考打印metric信息说明。 log4j.logger.DUAL=debug,DUAL log4j.appender.DUAL=org.apache.log4j.RollingFileAppender log4j.appender.DUAL.File=/var/log/dual.log //客户端本地双读日志路径,根据实际路径修改,但目录要有写入权限 log4j.additivity.DUAL=false log4j.appender.DUAL.MaxFileSize=${hbase.log.maxfilesize} log4j.appender.DUAL.MaxBackupIndex=${hbase.log.maxbackupindex} log4j.appender.DUAL.layout=org.apache.log4j.PatternLayout log4j.appender.DUAL.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%n
  • 操作场景 HBase客户端应用通过自定义加载主备集群配置项,实现了双读能力。HBase双读作为提高HBase集群系统高可用性的一个关键特性,适用于使用Get读取数据、使用批量Get读取数据、使用Scan读取数据,以及基于二级索引查询。它能够同时读取主备集群数据,减少查询毛刺,具体表现为: 高成功率:双并发读机制,保证每一次读请求的成功率。 可用性:单集群故障时,查询业务不中断。短暂的网络抖动也不会导致查询时间变长。 通用性:双读特性不支持双写,但不影响原有的实时写场景。 易用性:客户端封装处理,业务侧不感知。 HBase双读使用约束: HBase双读特性基于Replication实现,备集群读取的数据可能和主集群存在差异,因此只能实现最终一致性。 目前HBase双读功能仅用于查询。主集群故障时,最新数据无法同步,备集群可能查询不到最新数据。 HBase的Scan操作可能分解为多次RPC。由于相关session信息在不同集群间不同步,数据不能保证完全一致,因此双读只在第一次RPC时生效,ResultScanner close之前的请求会固定访问第一次RPC时使用的集群。 HBase Admin接口、实时写入接口只会访问主集群。所以主集群故障后,不能提供Admin接口功能和实时写入接口功能,只能提供Get、Scan查询服务。
  • HBase双读操作相关配置项说明 表1 hbase-dual.xml配置项 配置项名称 配置项详解 默认值 级别 hbase.dualclient.active.cluster.configuration.path 主集群HBase客户端配置目录 无 必选配置 hbase.dualclient.standby.cluster.configuration.path 备集群HBase客户端配置目录 无 必选配置 dual.client.schedule.update.table.delay.second 更新开启容灾表列表的周期时间 5 可选配置 hbase.dualclient.glitchtimeout.ms 可以容忍主集群的最大毛刺时间 50 可选配置 hbase.dualclient.slow.query.timeout.ms 慢查询告警日志 180000 可选配置 hbase.dualclient.active.cluster.id 主集群id ACTIVE 可选配置 hbase.dualclient.standby.cluster.id 备集群id STANDBY 可选配置 hbase.dualclient.active.executor.thread.max 请求主集群的线程池max大小 100 可选配置 hbase.dualclient.active.executor.thread.core 请求主集群的线程池core大小 100 可选配置 hbase.dualclient.active.executor.queue 请求主集群的线程池queue大小 256 可选配置 hbase.dualclient.standby.executor.thread.max 请求备集群的线程池max大小 100 可选配置 hbase.dualclient.standby.executor.thread.core 请求备集群的线程池core大小 100 可选配置 hbase.dualclient.standby.executor.queue 请求备集群的线程池queue大小 256 可选配置 hbase.dualclient.clear.executor.thread.max 清理资源线程池max大小 30 可选配置 hbase.dualclient.clear.executor.thread.core 清理资源线程池core大小 30 可选配置 hbase.dualclient.clear.executor.queue 清理资源线程池queue大小 Integer. MAX_VALUE 可选配置 dual.client.metrics.enable 客户端metric信息是否打印 true 可选配置 dual.client.schedule.metrics.second 客户端metric信息打印周期 300 可选配置 dual.client.asynchronous.enable 是否异步请求主备集群 false 可选配置
  • Structured Streaming支持的功能 支持对流式数据的ETL操作。 支持流式DataFrames或Datasets的schema推断和分区。 流式DataFrames或Datasets上的操作:包括无类型,类似SQL的操作(比如select、where、groupBy),以及有类型的RDD操作(比如map、filter、flatMap)。 支持基于Event Time的聚合计算,支持对迟到数据的处理。 支持对流式数据的去除重复数据操作。 支持状态计算。 支持对流处理任务的监控。 支持批流join,流流join。 当前JOIN操作支持列表如下: 左表 右表 支持的Join类型 说明 Static Static 全部类型 即使在流处理中,不涉及流数据的join操作也能全部支持 Stream Static Inner 支持,但是无状态 Left Outer 支持,但是无状态 Right Outer 不支持 Full Outer 不支持 Stream Stream Inner 支持,左右表可选择使用watermark或者时间范围进行状态清理 Left Outer 有条件的支持,左表可选择使用watermark进行状态清理,右表必须使用watermark+时间范围 Right Outer 有条件的支持,右表可选择使用watermark进行状态清理,左表必须使用watermark+时间范围 Full Outer 不支持
  • Structured Streaming可靠性说明 Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。 从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下: 不允许source的个数或者类型发生变化。 source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如: 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...) 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic") sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如: File sink允许变更为kafka sink,kafka中只处理新数据。 kafka sink不允许变更为file sink。 kafka sink允许变更为foreach sink,反之亦然。 sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如: 不允许file sink的输出路径发生变更。 允许Kafka sink的输出topic发生变更。 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。 Projection、filter和map-like操作变更,局部场景下能够支持,例如: 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...) Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。 状态操作的变更,在部分场景下会导致状态恢复失败: Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。 Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。 Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。 Source的容错性支持列表 Sources 支持的Options 容错支持 说明 File source path:必填,文件路径 maxFilesPerTrigger:每次trigger最大文件数(默认无限大) latestFirst:是否有限处理新文件(默认值: false) fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false) 支持 支持通配符路径,但不支持以逗号分隔的多个路径。 文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。 Socket Source host:连接的节点ip,必填 port:连接的端口,必填 不支持 - Rate Source rowsPerSecond:每秒产生的行数,默认值1 rampUpTime:在达到rowsPerSecond速度之前的上升时间 numPartitions:生成数据行的并行度 支持 - Kafka Source 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html 支持 - Sink的容错性支持列表 Sinks 支持的output模式 支持Options 容错性 说明 File Sink Append Path:必须指定 指定的文件格式,参见DataFrameWriter中的相关接口 exactly-once 支持写入分区表,按时间分区用处较大 Kafka Sink Append, Update, Complete 参见:https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html at-least-once 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html Foreach Sink Append, Update, Complete None 依赖于ForeachWriter实现 参见https://spark.apache.org/docs/3.1.1/structured-streaming-programming-guide.html#using-foreach ForeachBatch Sink Append, Update, Complete None 依赖于算子实现 参见https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch Console Sink Append, Update, Complete numRows:每轮打印的行数,默认20 truncate:输出太长时是否清空,默认true 不支持容错 - Memory Sink Append, Complete None 不支持容错,在complete模式下,重启query会重建整个表 -
  • Structured Streaming不支持的功能 不支持多个流聚合。 不支持limit、first、take这些取N条Row的操作。 不支持Distinct。 只有当output mode为complete时才支持排序操作。 有条件地支持流和静态数据集之间的外连接。 不支持部分DataSet上立即运行查询并返回结果的操作: count():无法从流式Dataset返回单个计数,而是使用ds.groupBy().count()返回一个包含运行计数的streaming Dataset。 foreach():使用ds.writeStream.foreach(...)代替。 show():使用输出console sink代替。
  • SpringBoot样例工程的命令行形式运行 在IDEA界面左下方单击“Terminal”进入终端,执行命令mvn clean package进行编译。 当输出“BUILD SUCCESS”,表示编译成功,如下图所示。编译成功后将会在样例工程的target下生成含有“-with-dependencies”字段的Jar包。 在Windows或Linux上创建一个目录作为运行目录,如“D:\hive-rest-client-example”(Windows环境)或“/opt/hive-rest-client-example”(Linux环境),将1中生成的“target”目录下包名中含有“-with-dependencies”字段的Jar包放进该路径下,并在该目录下创建子目录“src/main/resources"。将hive-rest-client-example工程resources目录下的所有文件复制到“resources”下。 执行以下命令启动SpringBoot服务: 在Windows环境下执行: cd /d d:\hive-rest-client-example java -jar hive-rest-client-example-8.1.0.1-3.5.0-SNAPSHOT-jar-with-dependencies.jar 在Linux环境下执行: chmod +x /opt/hive-rest-client-example -R cd /opt/hive-rest-client-example java -jar hive-rest-client-example-8.1.0.1-3.5.0-SNAPSHOT-jar-with-dependencies.jar 以上Jar包名称仅供参考,具体名称以实际生成为主。 调用Hive的SpringBoot样例接口触发样例代码运行: Windows环境运行方式: 打开浏览器,输入:http://localhost:8080/hive/example/executesql。 Linux环境下执行运行方式: 在2中存放Jar的节点上执行curl http://localhost:8080/hive/example/executesql命令。 运行样例代码时日志中可能会打印以下日志信息,虽然日志级别显示ERROR,但是不影响执行结果: ERROR 51320 --- [c-8-EventThread] o.a.c.framework.imps.EnsembleTracker : Invalid config event received: {version=100000000, server.48=ZooKeeper节点IP地址:ZooKeeper端口号:ZooKeeper端口号:participant...} 查看样例代码中的HQL所查询出的结果。 Windows环境运行成功结果会有如下信息: =========================== Hive Example Start =========================== Start create table. Table created successfully. Start to insert data into the table. Inserting data to the table succeeded. Start to query table data. Query result : employees_infoa.id employees_infoa.age employees_infoa.name 1 31 SJK 2 25 HS 3 28 HT Querying table data succeeded. Start to delete the table. Table deleted successfully. =========================== Hive Example End =========================== Linux环境运行成功结果会有如下信息: =========================== Hive Example Start =========================== Start create table. Table created successfully. Start to insert data into the table. Inserting data to the table succeeded. Start to query table data. Query result : employees_infoa.id employees_infoa.age employees_infoa.name 1 31 SJK 2 25 HS 3 28 HT Querying table data succeeded. Start to delete the table. Table deleted successfully. =========================== Hive Example End ===========================
  • 打印metric信息说明 表2 基本指标项 Metric名称 描述 日志级别 total_request_count 周期时间内查询总次数 INFO active_success_count 周期时间内主集群查询成功次数 INFO active_error_count 周期时间内主集群查询失败次数 INFO active_timeout_count 周期时间内主集群查询超时次数 INFO standby_success_count 周期时间内备集群查询成功次数 INFO standby_error_count 周期时间内备集群查询失败次数 INFO Active Thread pool 周期打印请求主集群的执行线程池信息 DEBUG Standby Thread pool 周期打印请求备集群的执行线程池信息 DEBUG Clear Thread pool 周期打印释放资源的执行线程池信息 DEBUG 表3 针对GET、BatchGET、SCAN请求,分别打印Histogram指标项 Metric名称 描述 日志级别 averageLatency(ms) 平均时延 INFO minLatency(ms) 最小时延 INFO maxLatency(ms) 最大时延 INFO 95thPercentileLatency(ms) 95%请求的最大时延 INFO 99thPercentileLatency(ms) 99%请求的最大时延 INFO 99.9PercentileLatency(ms) 99.9%请求的最大时延 INFO 99.99PercentileLatency(ms) 99.99%请求的最大时延 INFO
共100000条