华为云用户手册

  • 代码样例 完整样例代码可参考com.huawei.bigdata.hdfs.examples.ColocationExample。 在运行Colocation工程时,需要将HDFS用户绑定supergroup用户组。 在运行Colocation工程时,HDFS的配置项fs.defaultFS不能配置为viewfs://ClusterX。 初始化 使用Colocation前需要进行kerberos安全认证。 private static void init() throws IOException { conf.set(KEYTAB, PATH_TO_KEYTAB); conf.set(PRINCIPAL, PRNCIPAL_NAME); LoginUtil.setJaasConf(LOGIN_CONTEXT_NAME, PRNCIPAL_NAME, PATH_TO_KEYTAB); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } 获取实例 样例:Colocation的操作使用DFSColocationAdmin和DFSColocationClient实例,在进行创建group等操作前需获取实例。 dfsAdmin = new DFSColocationAdmin(conf); dfs = new DFSColocationClient(); dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf); 创建group 样例:创建一个gid01组,组中包含3个locator。 /** * 创建group * * @throws java.io.IOException */ private static void createGroup() throws IOException { dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01, Arrays.asList(new String[] { "lid01", "lid02", "lid03" })); } 写文件,写文件前必须创建对应的group 样例:写入testfile.txt文件。 /** * 创建并写入文件 * * @throws java.io.IOException */ private static void put() throws IOException { FSDataOutputStream out = dfs.create(new Path(TESTFILE_TXT), true, COLOCATION_GROUP_GROUP01, "lid01"); // 待写入HDFS的数据 byte[] readBuf = "Hello World".getBytes("UTF-8"); out.write(readBuf, 0, readBuf.length); out.close(); } 删除文件 样例:删除testfile.txt文件。 /** * 删除文件 * * @throws java.io.IOException */ @SuppressWarnings("deprecation") private static void delete() throws IOException { dfs.delete(new Path(TESTFILE_TXT)); } 删除group 样例:删除gid01。 /** * 删除group * * @throws java.io.IOException */ private static void deleteGroup() throws IOException { dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01); }
  • 功能简介 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性,将那些需进行关联操作的文件存放在相同数据节点上,在进行关联操作计算时避免了到别的数据节点上获取数据,大大降低网络带宽的占用。 在使用Colocation功能之前,建议用户对Colocation的内部机制有一定了解,包括: Colocation分配节点原理 扩容与Colocation分配 Colocation与数据节点容量 Colocation分配节点原理 Colocation为locator分配数据节点的时候,locator的分配算法会根据已分配的情况,进行均衡的分配数据节点。 locator分配算法的原理是,查询目前存在的所有locators,读取所有locators所分配的数据节点,并记录其使用次数。根据使用次数,对数据节点进行排序,使用次数少的排在前面,优先选择排在前面的节点。每次选择一个节点后,计数加1,并重新排序,选择后续的节点。 扩容与Colocation分配 集群扩容之后,为了平衡地使用所有的数据节点,使新的数据节点的分配频率与旧的数据节点趋于一致,有如下两种策略可以选择,如表1所示。 表1 分配策略 编号 策略 说明 1 删除旧的locators,为集群中所有数据节点重新创建locators。 在未扩容之前分配的locators,平衡的使用了所有数据节点。当扩容后,新加入的数据节点并未分配到已经创建的locators中,所以使用Colocation来存储数据的时候,只会往旧的数据节点存储数据。 由于locators与特定数据节点相关,所以当集群进行扩容的时候,就需要对Colocation的locators分配进行重新规划。 2 创建一批新的locators,并重新规划数据存放方式。 旧的locators使用的是旧的数据节点,而新创建的locators偏重使用新的数据节点,所以需要根据实际业务对数据的使用需求,重新规划locators的使用。 一般的,建议用户在进行集群扩容之后采用策略1来重新分配locators,可以避免数据偏重使用新的数据节点。 Colocation与数据节点容量 由于使用Colocation进行存储数据的时候,会固定存储在指定的locators所对应的数据节点上面,所以如果不对locator进行规划,会造成数据节点容量不均衡。下面总结了保证数据节点容量均衡的两个主要的使用原则,如表2所示。 表2 使用原则 编号 使用原则 说明 1 所有的数据节点在locators中出现的频率一样。 如何保证频率一样:假如数据节点有N个,则创建locators的数量应为N的整数倍(N个、2N个……)。 2 对于所有locators的使用需要进行合理的数据存放规划,让数据均匀的分布在这些locators中。 无 HDFS的二次开发过程中,可以获取DFSColocationAdmin和DFSColocationClient实例,进行从location创建group、删除group、写文件和删除文件的操作。 使用Colocation功能,用户指定了DataNode,会造成某些节点上数据量很大。数据倾斜严重,导致HDFS写任务失败。 由于数据倾斜,导致MapReduce只会在某几个节点访问,造成这些节点上负载很大,而其他节点闲置。 针对单个应用程序任务,只能使用一次DFSColocationAdmin和DFSColocationClient实例。如果每次对文件系统操作都获取此实例,会创建过多HDFS链接,消耗HDFS资源。 Colocation提供了文件同分布的功能,执行集群Balancer或Mover操作时,会移动数据块,使Colocation功能失效。因此,使用Colocation功能时,建议将HDFS配置项dfs.datanode.block-pinning.enabled设置为true,此时执行集群Balancer或Mover操作时,使用Colocation写入的文件将不会被移动,从而保证了文件同分布。
  • 场景说明 该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 该样例逻辑过程如下: 以HDFS文本文件为输入数据: log1.txt:数据输入文件 YuanJing,male,10 GuoYijun,male,5 Map阶段: 获取输入数据的一行并提取姓名信息。 查询HBase一条数据。 查询Hive一条数据。 将HBase查询结果与Hive查询结果进行拼接作为Map输出。 Reduce阶段: 获取Map输出中的最后一条数据。 将数据输出到HBase。 将数据保存到HDFS。
  • 数据规划 创建HDFS数据文件。 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。 在HDFS上创建一个文件夹,“/tmp/examples/multi-components/mapreduce/input/”,并上传data.txt到此目录,命令如下: 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p /tmp/examples/multi-components/mapreduce/input/ 在Linux系统HDFS客户端使用命令hdfs dfs -put data.txt /tmp/examples/multi-components/mapreduce/input/ 创建HBase表并插入数据。 在Linux系统HBase客户端执行source bigdata_env,并使用命令hbase shell。 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令create 'table1', 'cf'。 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'。 执行命令quit退出。 创建Hive表并载入数据。 在Linux系统Hive客户端使用命令beeline。 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/stayTime,使用命令CREATE TABLE person(name STRING, gender STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;。 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH '/tmp/examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person;。 执行命令!q退出。 由于Hive加载数据将HDFS对应数据目录清空,所以需再次执行1。
  • 问题 向YARN服务器提交MapReduce任务后,客户端提示如下信息后长时间无响应。 16/03/03 16:44:56 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 44 for admin on ha-hdfs:hacluster 16/03/03 16:44:56 INFO security.TokenCache: Got dt for hdfs://hacluster; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hacluster, Ident: (HDFS_DELEGATION_TOKEN token 44 for admin) 16/03/03 16:44:56 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to 53 16/03/03 16:44:57 INFO input.FileInputFormat: Total input files to process : 200 16/03/03 16:44:57 INFO mapreduce.JobSubmitter: number of splits:200 16/03/03 16:44:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1456738266914_0005 16/03/03 16:44:57 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hacluster, Ident: (HDFS_DELEGATION_TOKEN token 44 for admin) 16/03/03 16:44:57 INFO impl.YarnClientImpl: Submitted application application_1456738266914_0005 16/03/03 16:44:57 INFO mapreduce.Job: The url to track the job: https://linux2:8090/proxy/application_1456738266914_0005/ 16/03/03 16:44:57 INFO mapreduce.Job: Running job: job_1456738266914_0005
  • Spark应用程序开发流程 Spark包含Spark Core、Spark SQL和Spark Streaming三个组件,其应用开发流程都是相同的。 开发流程中各阶段的说明如图1和表1所示。 图1 Spark应用程序开发流程 表1 Spark应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Spark的基本概念,根据实际场景选择需要了解的概念,分为Spark Core基本概念、Spark SQL基本概念和Spark Streaming基本概念。 基本概念 准备开发和运行环境 Spark的应用程序支持使用Scala、Java、Python三种语言进行开发。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。Spark的运行环境即Spark客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备开发用户 开发用户用于运行样例工程。用户需要有HDFS、YARN、Kafka和Hive权限,才能运行Spark样例工程。 准备MRS应用开发用户 准备工程 Spark提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个Spark工程。 导入并配置Spark样例工程 新建Spark样例工程(可选) 准备安全认证 如果您使用的是安全集群,需要进行安全认证。 配置Spark应用安全认证 根据场景开发工程 提供了Scala、Java、Python三种不同语言的样例工程,还提供了Streaming、SQL、JDBC客户端程序以及Spark on HBase四种不同场景的样例工程。 帮助用户快速了解Spark各部件的编程接口。 开发Spark应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测Spark应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调优程序 您可以根据程序运行情况,对程序进行调优,是其性能满足业务场景诉求。 调优完成后,请重新进行编译和运行。 Spark2x性能调优
  • 操作步骤 客户端机器必须安装有Python3,其版本不低于3.6,最高不能超过3.8。 在客户端机器的命令行终端输入python3可查看Python版本号。如下显示Python版本为3.8.2。 Python 3.8.2 (default, Jun 23 2020, 10:26:03) [GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux Type "help", "copyright", "credits" or "license" for more information. 客户端机器必须安装有setuptools,版本可取47.3.1。 具体软件,请到对应的官方网站获取。 https://pypi.org/project/setuptools/#files 将下载的setuptools压缩文件复制到客户端机器上,解压后进入解压目录,在客户端机器的命令行终端执行python3 setup.py install。 如下内容表示安装setuptools的47.3.1版本成功。 Finished processing dependencies for setuptools==47.3.1 若提示setuptools的47.3.1版本安装不成功,则需要检查环境是否有问题或是Python自身原因导致的。 安装Python客户端到客户端机器。 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\hive-examples”目录下的样例工程文件夹“python3-examples”或“dependency_python3.9”文件夹(MRS 3.3.0及之后版本支持)。 进入“python3-examples”文件夹。 根据python3的版本,选择进入“dependency_python3.6”或“dependency_python3.7”或“dependency_python3.8”文件夹。 执行whereis easy_install命令,找到easy_install程序路径。如果有多个路径,使用easy_install --version确认选择setuptools对应版本的easy_install,如/usr/local/bin/easy_install。 MRS 3.2.0之前版本,使用对应的easy_install命令,依次安装dependency_python3.x文件夹下的egg文件。如: /usr/local/bin/easy_install future-0.18.2-py3.8.egg 输出以下关键内容表示安装egg文件成功。 Finished processing dependencies for future==0.18.2 对于“dependency_python3.x”文件夹下同时存在aarch64与x86_64版本的“egg”文件,需要根据操作系统选取其中一个版本安装即可,使用uname -p命令确认当前操作系统架构。 MRS 3.2.0及之后版本,使用对应的easy_install命令,安装dependency_python3.x文件夹下的egg文件,egg文件存在依赖关系,可使用通配符安装,如: “dependency_python3.6”目录: /usr/local/bin/easy_install future*egg six*egg python*egg sasl-*linux-$(uname -p).egg thrift-*egg thrift_sasl*egg “dependency_python3.7”目录: /usr/local/bin/easy_install future*egg six*egg sasl-*linux-$(uname -p).egg thrift-*egg thrift_sasl*egg “dependency_python3.8”目录: /usr/local/bin/easy_install future*egg six*egg python*egg sasl-*linux-$(uname -p).egg thrift-*linux-$(uname -p).egg thrift_sasl*egg “dependency_python3.9”目录(MRS 3.3.0及之后版本): /usr/local/bin/easy_install future*egg six*egg sasl-*linux-$(uname -p).egg six-*.egg thrift-*linux-$(uname -p).egg thrift_sasl*egg 每个egg文件安装输出以下关键内容表示安装成功。 Finished processing dependencies for *** 安装成功后,“python3-examples/pyCLI_nosec.py”为Python客户端样例代码,“python3-examples/pyhive/hive.py”为Python客户端接口API。
  • 配置运行环境网络 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts域名信息。 场景一:配置本地Windows开发环境与MRS集群节点内网络互通。 登录FusionInsight Manager,在“主页”右上方单击“下载客户端”,“选择客户端类型”设置为“仅配置文件”,单击“确定”,等待客户端文件包生成后根据浏览器提示下载客户端到本地并解压。 例如,客户端配置文件压缩包为“FusionInsight_Cluster_1_Services_Client.tar”,解压后得到“FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles.tar”,继续解压该文件。 复制解压目录下的“hosts”文件中的内容到本地hosts文件中。 在应用开发过程中,如需在本地Windows系统中调测应用程序,需要确保本地节点能与“hosts”文件中所列出的各主机在网络上互通。 Windows本地hosts文件存放路径举例:“C:\WINDOWS\system32\drivers\etc\hosts”。 场景二:配置Linux环境与MRS集群节点内网络互通。 在节点中安装MRS集群客户端。 例如客户端安装目录为“/opt/client”。 获取配置文件: 登录FusionInsight Manager,在“主页”右上方单击“下载客户端”,“选择客户端类型”设置为“仅配置文件”,勾选“仅保存到如下路径”,单击“确定”,下载客户端配置文件至集群主OMS点。 以root登录主OMS节点,进入客户端配置文件所在路径(默认为“/tmp/FusionInsight-Client/”)。 例如客户端软件包为“FusionInsight_Cluster_1_Services_Client.tar”,下载路径为主管理节点的“/tmp/FusionInsight-Client”: cd /tmp/FusionInsight-Client tar -xvf FusionInsight_Cluster_1_Services_Client.tar tar -xvf FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles.tar cd FusionInsight_Cluster_1_Services_ClientConfig_ConfigFiles 检查客户端节点网络连接。 在安装客户端过程中,系统会自动配置客户端节点“hosts”文件,建议检查“/etc/hosts”文件内是否包含集群内节点的主机名信息,如未包含,需要手动复制解压目录下的“hosts”文件中的内容到客户端所在节点的hosts文件中,确保本地机器能与集群各主机在网络上互通。
  • Kafka开发应用时,需要准备的开发和运行环境如表1所示: 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装和配置IntelliJ IDEA 开发环境的基本配置。版本要求:JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端: Oracle JDK:支持1.8版本 IBM JDK:支持1.8.5.11版本 TaiShan客户端: OpenJDK:支持1.8.0_272版本 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • 未安装客户端时编译并运行程序 进入工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包。 将导出的Jar包上传至Linux运行环境的任意目录下,例如“/optclient”。 将工程中的“lib”文件夹和“conf”文件夹上传至和Jar包相同的Linux运行环境目录下,例如“/opt/client”(其中“lib”目录汇总包含了工程中依赖的所有的Jar包,“conf”目录包含运行jar包所需的集群相关配置文件,请参考准备运行环境)。 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令运行Jar包。 java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.HdfsExample java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample:时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 已安装客户端时编译并运行程序 进入样例工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包,例如“HDFSTest-XXX.jar”,jar包名称以实际打包结果为准。 将导出的Jar包上传至Linux客户端运行环境的任意目录下,例如“/opt/client”。 配置环境变量: cd /opt/client source bigdata_env 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令,运行Jar包。 hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.HdfsExample hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 前提条件 已安装客户端时: 已安装HDFS客户端。 当客户端所在主机不是集群中的节点时,需要在客户端所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。 未安装客户端时: Linux环境已安装JDK,版本号需要和IDEA导出Jar包使用的JDK版本一致。 当Linux环境所在主机不是集群中的节点时,需要在Linux环境所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。
  • 常用接口 YARN常用的Java类有如下几个。 ApplicationClientProtocol 用于Client与ResourceManager之间。Client通过该协议可实现将应用程序提交到ResourceManager上,查询应用程序的运行状态或者中止应用程序等功能。 表1 ApplicationClientProtocol常用方法 方法 说明 forceKillApplication(KillApplicationRequest request) Client通过此接口请求RM中止一个已提交的任务。 getApplicationAttemptReport(GetApplicationAttemptReportRequest request) Client通过此接口从RM获取指定ApplicationAttempt的报告信息。 getApplicationAttempts(GetApplicationAttemptsRequest request) Client通过此接口从RM获取所有ApplicationAttempt的报告信息。 getApplicationReport(GetApplicationReportRequest request) Client通过此接口从RM获取某个应用的报告信息。 getApplications(GetApplicationsRequest request) Client通过此接口从RM获取满足一定过滤条件的应用的报告信息。 getClusterMetrics(GetClusterMetricsRequest request) Client通过此接口从RM获取集群的Metrics。 getClusterNodes(GetClusterNodesRequest request) Client通过此接口从RM获取集群中的所有节点信息。 getContainerReport(GetContainerReportRequest request) Client通过此接口从RM获取某个Container的报告信息。 getContainers(GetContainersRequest request) Client通过此接口从RM获取某个ApplicationAttemp的所有Container的报告信息。 getDelegationToken(GetDelegationTokenRequest request) Client通过此接口获取授权票据,用于container访问相应的service。 getNewApplication(GetNewApplicationRequest request) Client通过此接口获取一个新的应用ID号,用于提交新的应用。 getQueueInfo(GetQueueInfoRequest request) Client通过此接口从RM中获取队列的相关信息。 getQueueUserAcls(GetQueueUserAclsInfoRequest request) Client通过此接口从RM中获取当前用户的队列访问权限信息。 moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest request) 移动一个应用到新的队列。 submitApplication(SubmitApplicationRequest request) Client通过此接口提交一个新的应用到RM。 ApplicationMasterProtocol 用于ApplicationMaster与ResourceManager之间。ApplicationMaster使用该协议向ResourceManager注册、申请资源、获取各个任务的运行情况等。 表2 ApplicationMasterProtocol常用方法 方法 说明 allocate(AllocateRequest request) AM通过此接口提交资源分配申请。 finishApplicationMaster(FinishApplicationMasterRequest request) AM通过此接口通知RM其运行成功或者失败。 registerApplicationMaster(RegisterApplicationMasterRequest request) AM通过此接口向RM进行注册。 ContainerManagementProtocol 用于ApplicationMaster与NodeManager之间。ApplicationMaster使用该协议要求NodeManager启动/中止Container或者查询Container的运行状态。 表3 ContainerManagementProtocol常用方法 方法 说明 getContainerStatuses(GetContainerStatusesRequest request) AM通过此接口向NM请求Containers的当前状态信息。 startContainers(StartContainersRequest request) AM通过此接口向NM提供需要启动的containers列表的请求。 stopContainers(StopContainersRequest request) AM通过此接口请求NM停止一系列已分配的Containers。
  • 代码样例 以下代码片段在“hbase-zk-example\src\main\java\com\huawei\hadoop\hbase\example”包的“TestZKSample”类中,用户主要需要关注“login”和“connectApacheZK”这两个方法。 private static void login(String keytabFile, String principal) throws IOException { conf = HBaseConfiguration.create(); //In Windows environment String confDirPath = TestZKSample.class.getClassLoader().getResource("").getPath() + File.separator;[1] //In Linux environment //String confDirPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; // Set zoo.cfg for hbase to connect to fi zookeeper. conf.set("hbase.client.zookeeper.config.path", confDirPath + "zoo.cfg"); if (User.isHBaseSecurityEnabled(conf)) { // jaas.conf file, it is included in the client pakcage file System.setProperty("java.security.auth.login.config", confDirPath + "jaas.conf"); // set the kerberos server info,point to the kerberosclient System.setProperty("java.security.krb5.conf", confDirPath + "krb5.conf"); // set the keytab file name conf.set("username.client.keytab.file", confDirPath + keytabFile); // set the user's principal try { conf.set("username.client.kerberos.principal", principal); User.login(conf, "username.client.keytab.file", "username.client.kerberos.principal", InetAddress.getLocalHost().getCanonicalHostName()); } catch (IOException e) { throw new IOException("Login failed.", e); } } } private void connectApacheZK() throws IOException, org.apache.zookeeper.KeeperException { try { // Create apache zookeeper connection. ZooKeeper digestZk = new ZooKeeper("127.0.0.1:2181", 60000, null); LOG.info("digest directory:{}", digestZk.getChildren("/", null)); LOG.info("Successfully connect to apache zookeeper."); } catch (InterruptedException e) { LOG.error("Found error when connect apache zookeeper ", e); } }
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 创建目录 * * @throws java.io.IOException */ private void mkdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!createPath(destPath)) { LOG.error("failed to create destPath " + DEST_PATH); return; } LOG.info("success to create path " + DEST_PATH); } /** * create file path * * @param filePath * @return * @throws java.io.IOException */ private boolean createPath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { fSystem.mkdirs(filePath); } return true; }
  • 参数解释 MapReduce Action节点中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 name map-reduce action的名称 resourceManager MapReduce ResourceManager地址 name-node HDFS NameNode地址 queueName 任务处理时使用的MapReduce队列名 mapred.mapper.class Mapper类名 mapred.reducer.class Reducer类名 mapred.input.dir MapReduce处理数据的输入目录 mapred.output.dir MapReduce处理后结果数据输出目录 mapred.map.tasks MapReduce map任务个数 “${变量名}”表示:该值来自job.properties所定义。 例如:${nameNode}表示的就是“hdfs://hacluster”。(可参见job.properties)
  • 准备开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程 安装Maven 开发环境基本配置。用于项目管理,贯穿软件开发生命周期。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 准备开发用户 参考获取MRS应用开发样例工程进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • 查看调测结果 ClickHouse应用程序运行完成后,可通过以下方式查看程序运行情况: 通过运行结果查看程序运行情况。 通过ClickHouse日志获取应用运行情况,即“logs”目录下的日志文件:clickhouse-example.log。 运行clickhouse-examples的完整样例后,控制台显示部分运行结果如下: 2023-09-19 16:20:48,344 | INFO | main | loadBalancerIPList is 192.168.5.132, loadBalancerHttpPort is 21422, user is ck_user, clusterName is default_cluster, isSec is true, password is Admin12!. | com.huawei.clickhouse.examples.Demo.main(Demo.java:42) 2023-09-19 16:20:48,350 | INFO | main | ckLbServerList current member is 0, ClickhouseBalancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Demo.getCkLbServerList(Demo.java:110) 2023-09-19 16:20:48,436 | INFO | main | Current load balancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:68) 2023-09-19 16:20:50,781 | INFO | main | Execute query:drop table if exists testdb.testtb on cluster default_cluster no delay | com.huawei.clickhouse.examples.Util.exeSql(Util.java:73) 2023-09-19 16:20:51,504 | INFO | main | Execute time is 723 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:77) 2023-09-19 16:20:51,511 | INFO | main | Current load balancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:68) 2023-09-19 16:20:51,897 | INFO | main | Execute query:drop table if exists testdb.testtb_all on cluster default_cluster no delay | com.huawei.clickhouse.examples.Util.exeSql(Util.java:73) 2023-09-19 16:20:52,421 | INFO | main | Execute time is 524 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:77) 2023-09-19 16:20:52,422 | INFO | main | Current load balancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:68) 2023-09-19 16:20:52,946 | INFO | main | Execute query:create database if not exists testdb on cluster default_cluster | com.huawei.clickhouse.examples.Util.exeSql(Util.java:73) 2023-09-19 16:20:53,405 | INFO | main | Execute time is 458 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:77) 2023-09-19 16:20:53,406 | INFO | main | Current load balancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:68) 2023-09-19 16:20:53,757 | INFO | main | Execute query:create table testdb.testtb on cluster default_cluster (name String, age UInt8, date Date)engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/testdb.testtb','{replica}') partition by toYYYYMM(date) order by age | com.huawei.clickhouse.examples.Util.exeSql(Util.java:73) 2023-09-19 16:20:54,243 | INFO | main | Execute time is 485 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:77) 2023-09-19 16:20:54,244 | INFO | main | Current load balancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:68) 2023-09-19 16:20:54,640 | INFO | main | Execute query:create table testdb.testtb_all on cluster default_cluster as testdb.testtb ENGINE = Distributed(default_cluster,testdb,testtb, rand()); | com.huawei.clickhouse.examples.Util.exeSql(Util.java:73) 2023-09-19 16:20:55,175 | INFO | main | Execute time is 535 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:77) 2023-09-19 16:20:55,175 | INFO | main | Current load balancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Util.insertData(Util.java:143) 2023-09-19 16:20:58,868 | INFO | main | Insert batch time is 503 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:160) 2023-09-19 16:21:01,015 | INFO | main | Insert batch time is 631 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:160) 2023-09-19 16:21:02,521 | INFO | main | Inert all batch time is 4163 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:164) 2023-09-19 16:21:02,522 | INFO | main | Current load balancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:68) 2023-09-19 16:21:03,051 | INFO | main | Execute query:select * from testdb.testtb_all order by age limit 10 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:73) 2023-09-19 16:21:03,430 | INFO | main | Execute time is 379 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:77) 2023-09-19 16:21:03,433 | INFO | main | Current load balancer is 192.168.5.132:21422 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:68) 2023-09-19 16:21:03,760 | INFO | main | Execute query:select toYYYYMM(date),count(1) from testdb.testtb_all group by toYYYYMM(date) order by count(1) DESC limit 10 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:73) 2023-09-19 16:21:04,361 | INFO | main | Execute time is 600 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:77) 2023-09-19 16:21:04,362 | INFO | main | name age date | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,362 | INFO | main | huawei_9 12 2021-04-20 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,362 | INFO | main | huawei_17 15 2021-05-23 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | huawei_5 24 2021-04-15 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | huawei_13 39 2020-07-04 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | huawei_3 49 2021-06-27 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | huawei_15 50 2020-06-26 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | huawei_11 53 2020-08-14 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | huawei_12 56 2021-12-19 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | huawei_19 57 2021-10-31 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | huawei_0 57 2020-03-01 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,363 | INFO | main | toYYYYMM(date) count() | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202105 3 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202110 2 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202104 2 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202008 2 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202007 2 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202106 2 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202012 1 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202109 1 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,364 | INFO | main | 202003 1 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:04,365 | INFO | main | 202011 1 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:158) 2023-09-19 16:21:05,044 | INFO | main | Name is: huawei_9, age is: 12 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,044 | INFO | main | Name is: huawei_17, age is: 15 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,045 | INFO | main | Name is: huawei_5, age is: 24 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,045 | INFO | main | Name is: huawei_13, age is: 39 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,045 | INFO | main | Name is: huawei_3, age is: 49 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,045 | INFO | main | Name is: huawei_15, age is: 50 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,045 | INFO | main | Name is: huawei_11, age is: 53 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,045 | INFO | main | Name is: huawei_12, age is: 56 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,045 | INFO | main | Name is: huawei_19, age is: 57 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) 2023-09-19 16:21:05,046 | INFO | main | Name is: huawei_0, age is: 57 | com.huawei.clickhouse.examples.ClickhouseJDBCHaDemo.queryData(ClickhouseJDBCHaDemo.java:78) Process finished with exit code 0
  • 功能介绍 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 主要分为三个部分: 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承Mapper抽象类实现。 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类CollectionReducer继承Reducer抽象类实现。 main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群。
  • 参数解释 FS Action节点中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 name FS活动的名称 delete 删除指定的文件和目录的标签 move 将文件从源目录移动到目标目录的标签 chmod 修改文件或目录权限的标签 path 当前文件路径 source 源文件路径 target 目标文件路径 permissions 权限字符串 “${变量名}”表示:该值来自job.properties所定义。 例如:${nameNode}表示的就是“hdfs://hacluster”。(可参见job.properties)
  • 简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • 基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过Socket、Kafka和文件等形式导入,在Flink系统处理后,在通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
  • 架构 Flink架构如图2所示。 图2 Flink架构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 Flink系统提供的关键能力: 低时延 提供ms级时延的处理能力。 Exactly Once 提供异步快照机制,保证所有数据真正只处理一次。 HA JobManager支持主备模式,保证无单点故障。 水平扩展能力 TaskManager支持手动水平扩展。
  • Flink开发接口简介 Flink DataStream API提供Scala和Java两种语言的开发方式,如表1所示。 表1 Flink DataStream API接口 功能 说明 Scala API 提供Scala语言的API,提供过滤、join、窗口、聚合等数据处理能力。由于Scala语言的简洁易懂,推荐用户使用Scala接口进行程序开发。 Java API 提供Java语言的API,提供过滤、join、窗口、聚合等数据处理能力。
  • 样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Flink相关样例工程,安全模式路径为“flink-examples/flink-examples-security”,普通模式路径为“flink-examples/flink-examples-normal”: 表2 Flink相关样例工程 样例工程 描述 FlinkCheckpointJavaExample 异步Checkpoint机制程序的应用开发示例。 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性,即:当应用出现异常并恢复后,各个算子的状态能够处于统一的状态。 相关业务场景介绍请参见Flink开启Checkpoint样例程序。 FlinkCheckpointScalaExample FlinkHBaseJavaExample 通过Flink API作业读写HBase数据的应用开发示例。 相关业务场景介绍请参见Flink读取HBase表样例程序。 FlinkHudiJavaExample 通过Flink API作业读写Hudi数据的应用开发示例。 相关业务场景介绍请参见Flink读取Hudi表样例程序。 FlinkKafkaJavaExample 向Kafka生产并消费数据程序的应用开发示例。 通过调用flink-connector-kafka模块的接口,生产并消费数据。 相关业务场景介绍请参见Flink Kafka样例程序。 FlinkKafkaScalaExample FlinkPipelineJavaExample Job Pipeline程序的应用开发示例。 相关业务场景介绍请参见Flink Job Pipeline样例程序。 发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据并打印输出。 FlinkPipelineScalaExample FlinkRESTAPIJavaExample 调用FlinkServer的RestAPI创建租户的应用开发示例。 相关业务场景介绍请参见FlinkServer REST API样例程序。 FlinkStreamJavaExample DataStream程序的应用开发示例。 相关业务场景介绍请参见Flink DataStream样例程序。 假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,可通过Flink应用程序实现例如实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息的功能。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Stream SQL Join程序的应用开发示例。 相关业务场景介绍请参见Flink Join样例程序。 假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。实现实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询的功能。 FlinkStreamSqlJoinScalaExample flink-sql 使用客户端通过jar作业提交SQL作业的应用开发示例。 相关业务场景介绍请参见Flink Jar作业提交SQL样例程序。 pyflink-example 提供Python读写Kafka作业和Python提交SQL作业的样例。 相关业务场景介绍请参见PyFlink样例程序。
  • 样例工程运行依赖包参考信息 Flink客户端lib目录、opt目录中都有flink jar包,其中lib目录中默认是flink核心jar包,opt目录中是对接外部组件的jar包(例如flink-connector-kafka*.jar),若应用开发中需要请手动复制相关jar包到lib目录中。 针对Flink提供的几个样例工程,其对应的运行依赖包如下: 表1 样例工程运行依赖包 样例工程 依赖包 依赖包获取地址 DataStream程序 异步Checkpoint机制程序 flink-dist_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 使用Flink Jar提交SQL作业程序 FlinkServer REST API程序 flink-dist_*.jar flink-table_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 向Kafka生产并消费数据程序 kafka-clients-*.jar flink-connector-kafka_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 pipeline程序 flink-connector-netty_*.jar flink-dist_*.jar flink-connector-netty_*.jar可在二次开发样例代码编译后产生的lib文件夹下获取。 flink-dist_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Stream SQL Join程序 kafka-clients-*.jar flink-connector-kafka_*.jar flink-dist_*.jar flink-table_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Flink读写HBase程序 flink-connector-hbase*.jar flink-dist_*.jar flink-table_*.jar hbase-clients-*.jar flink-connector-hbase_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 hbase-clients-*.jar由HBase组件发布提供,可在HBase组件客户端或者服务端安装路径下的lib目录下获取。 Flink读写Hudi程序 hbase-unsafe-*.jar 可在二次开发样例代码编译后产生的lib文件夹下获取。
  • HDFS应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 HDFS应用程序开发流程 表1 HDFS应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HDFS的基本概念。 HDFS应用开发简介 准备开发和运行环境 使用IntelliJ IDEA工具,请根据指导完成开发环境配置。 HDFS的运行环境即HDFS客户端,请根据指导完成客户端的安装和配置。 准备HDFS应用开发和运行环境 准备工程 HDFS提供了不同场景下的样例程序,可以导入样例工程进行程序学习。 导入并配置HDFS样例工程 根据场景开发工程 提供样例工程,帮助用户快速了解HDFS各部件的编程接口。 开发HDFS应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测HDFS应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测HDFS应用 父主题: HDFS开发指南(普通模式)
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testGet方法中。 public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // Specify the column name. byte[][] qualifier = { Bytes.toBytes("name"), Bytes.toBytes("address") }; // Specify RowKey. byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Create the Table instance. table = conn.getTable(tableName); // Instantiate a Get object. Get get = new Get(rowKey); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); get.addColumn(familyName, qualifier[1]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testGet."); }
  • 代码样例 以下代码片段是登录,创建Connection并创建表的示例,在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的HBaseSample方法中。 private TableName tableName = null; private Connection conn = null; public HBaseSample(Configuration conf) throws IOException { this.tableName = TableName.valueOf("hbase_sample_table"); this.conn = ConnectionFactory.createConnection(conf); }
  • 功能介绍 HBase通过ConnectionFactory.createConnection(configuration)方法创建Connection对象。传递的参数为上一步创建的Configuration。 Connection封装了底层与各实际服务器的连接以及与ZooKeeper的连接。Connection通过ConnectionFactory类实例化。创建Connection是重量级操作,Connection是线程安全的,因此,多个客户端线程可以共享一个Connection。 典型的用法,一个客户端程序共享一个单独的Connection,每一个线程获取自己的Admin或Table实例,然后调用Admin对象或Table对象提供的操作接口。不建议缓存或者池化Table、Admin。Connection的生命周期由调用者维护,调用者通过调用close(),释放资源。
共100000条