华为云用户手册

  • 准备开发环境 Hive组件可以使用JDBC/Python/Python3接口进行应用开发,要准备的开发和运行环境分别如下表所示。 表1 JDBC开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发Hive应用程序的工具。版本要求如下: JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。 表2 Python开发环境 准备项 说明 操作系统 开发环境和运行环境:Linux系统。 安装Python 用于开发Hive应用程序的工具,版本要求不低于2.6.6,最高不超过2.7.13。 安装setuptools Python开发环境的基本配置,版本要求5.0以上。 Python开发工具的详细安装配置可参见配置Hive Python样例工程。 表3 Python3开发环境 准备项 说明 操作系统 开发环境和运行环境:Linux系统。 安装Python3 用于开发Hive应用程序的工具,版本要求不低于3.6,最高不超过3.8。 安装setuptools Python3开发环境的基本配置,版本要求为47.3.1。 Python3开发工具的详细安装配置可参见配置Hive Python3样例工程。
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath应包含Spark客户端Kafka jar包的绝对路径,如/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*
  • 场景说明 在同一个客户端进程内同时访问FusionInsight ZooKeeper和第三方的ZooKeeper时,为了避免访问连接ZooKeeper认证冲突,提供了样例代码使HBase客户端访问FusionInsight ZooKeeper和客户应用访问第三方ZooKeeper。 以下为“src/main/resources”目录下提供的与认证相关的配置文件。 zoo.cfg # The configuration in jaas.conf used to connect fi zookeeper.zookeeper.sasl.clientconfig=Client_new[1] # Principal of fi zookeeper server side. zookeeper.server.principal=zookeeper/hadoop.hadoop.com[2] # Set true if the fi cluster is security mode. # The other two parameters doesn't work if the value is false. zookeeper.sasl.client=true[3] [1] zookeeper.sasl.clientconfig:指定使用jaas.conf文件中的对应配置访问FusionInsight ZooKeeper; [2] zookeeper.server.principal:指定ZooKeeper服务端使用principal; [3] zookeeper.sasl.client:如果MRS集群是安全模式,该值设置为“true”,否则设置为“false”,设置为“false”的情况下,“zookeeper.sasl.clientconfig”和“zookeeper.server.principal”参数不生效。 jaas.conf Client_new { [4] com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="D:\\work\\sample_project\\src\\hbase-examples\\hbase-zk-example\\target\\classes\\conf\\user.keytab" [5] principal="hbaseuser1" useTicketCache=false storeKey=true debug=true; }; Client { [6] org.apache.zookeeper.server.auth.DigestLoginModule required username="bob" password="xxxxxx"; [7] }; [4] Client_new:zoo.cfg中指定的读取配置,当该名称修改时,需要同步修改zoo.cfg中对应配置。 [5] keyTab :指明工程使用的“user.keytab”在运行样例的主机上的保存路径,使用绝对路径便于更好定位文件位置。在Windows环境和Linux环境下配置时需注意区分不同操作系统路径书写方式,即“\\”与“\”差异。 [6] Client:第三方ZooKeeper使用该配置进行访问连接,具体连接认证配置由第三方ZooKeeper版本决定。 [7] password:密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
  • 代码样例 下面代码片段在com.huawei.hadoop.hbase.example包的“HBaseSample”类的testScanDataByIndex方法中: 样例:使用二级索引查找数据 public void testScanDataByIndex() { LOG.info("Entering testScanDataByIndex."); Table table = null; ResultScanner scanner = null; try { table = conn.getTable(tableName); // Create a filter for indexed column. Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOperator.EQUAL, "Li Gang".getBytes()); Scan scan = new Scan(); scan.setFilter(filter); scanner = table.getScanner(scan); LOG.info("Scan indexed data."); for (Result result : scanner) { for (Cell cell : result.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data by index successfully."); } catch (IOException e) { LOG.error("Scan data by index failed."); } finally { if (scanner != null) { // Close the scanner object. scanner.close(); } try { if (table != null) { table.close(); } } catch (IOException e) { LOG.error("Close table failed."); } } LOG.info("Exiting testScanDataByIndex."); }
  • 功能介绍 针对添加了二级索引的用户表,您可以通过Filter来查询数据。其数据查询性能高于针对无二级索引用户表的数据查询。 HIndex支持的Filter类型为“SingleColumnValueFilter”,“SingleColumnValueExcludeFilter”以及“SingleColumnValuePartitionFilter”。 HIndex支持的Comparator为“BinaryComparator”,“BitComparator”,“LongComparator”,“DecimalComparator”,“DoubleComparator”,“FloatComparator”,“IntComparator”,“NullComparator”。 二级索引的使用规则如下: 针对某一列或者多列创建了单索引的场景下: 当查询时使用此列进行过滤时,不管是AND还是OR操作,该索引都会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引将不会被使用,查询性能不会因为索引得到提升。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1) 针对多个列创建的联合索引场景下: 当查询时使用的列(多个),是联合索引所有对应列的一部分或者全部,且列的顺序与联合索引一致时,此索引会被利用来提升查询性能。 例如,针对C1、C2、C3列创建了联合索引,生效的场景包括: Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1) 不生效的场景包括: Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如: Filter_Condition(IndexCol1) AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引不会被使用,查询性能不会因为索引得到提升。 例如: Filter_Condition(IndexCol1) OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2))OR ( Filter_Condition(NonIndexCol1)) 当查询时使用多个列进行范围查询时,只有联合索引中最后一个列可指定取值范围,前面的列只能设置为“=”。 例如:针对C1、C2、C3列创建了联合索引,需要进行范围查询时,只能针对C3设置取值范围,过滤条件为“C1=XXX,C2=XXX,C3=取值范围”。 针对添加了二级索引的用户表,可以通过Filter来查询数据,在单列索引和复合列索引上进行过滤查询,查询结果都与无索引结果相同,且其数据查询性能高于无二级索引用户表的数据查询性能。
  • 样例代码 nameNode=hdfs://hacluster resourceManager=10.1.130.10:26004 queueName=QueueA dataLoadRoot=examples oozie.coord.application.path=${nameNode}/user/oozie_cli/${dataLoadRoot}/apps/dataLoad start=2013-04-02T00:00Z end=2014-04-02T00:00Z workflowAppUri=${nameNode}/user/oozie_cli/${dataLoadRoot}/apps/dataLoad
  • 参数解释 “job.properties”文件中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 nameNode HDFS NameNode集群地址 resourceManager Yarn ResourceManager地址 queueName 流程任务处理时使用的MapReduce队列名 dataLoadRoot 流程任务所在目录名 oozie.coord.application.path Coordinator流程任务在HDFS上的存放路径 start 定时流程任务启动时间 end 定时流程任务终止时间 workflowAppUri Workflow流程任务在HDFS上的存放路径 可以根据业务需要,以“key=values”的格式自定义参数及值。
  • 查看调测结果 Spark应用程序运行完成后,可通过如下方式查看应用程序的运行情况。 通过运行结果数据查看应用程序运行情况。 结果数据存储路径和格式已经由Spark应用程序指定,可通过指定文件获取。 登录Spark WebUI查看应用程序运行情况。 Spark主要有两个Web页面。 Spark UI页面,用于展示正在执行的应用的运行情况。 页面主要包括了Jobs、Stages、Storage、Environment和Executors五个部分。Streaming应用会多一个Streaming标签页。 页面入口:在YARN的Web UI界面,查找到对应的Spark应用程序。单击应用信息的最后一列“ApplicationMaster”,即可进入SparkUI页面。 History Server页面,用于展示已经完成的和未完成的Spark应用的运行情况。 页面包括了应用ID、应用名称、开始时间、结束时间、执行时间、所属用户等信息。单击应用ID,页面将跳转到该应用的SparkUI页面。 通过Spark日志获取应用程序运行情况。 您可以查看Spark日志了解应用运行情况,并根据日志信息调整应用程序。相关日志信息可参考Spark2x日志介绍。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testPut方法中。 /** * Put data */ public void testPut() { LOG.info("Entering testPut."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Insert String upsertSQL = "UPSERT INTO TEST VALUES(1,'John','100000', TO_DATE('1980-01-01','yyyy-MM-dd'))"; try (Connection conn = DriverManager.getConnection(url, props); Statement stat = conn.createStatement()){ // Execute Update SQL stat.executeUpdate(upsertSQL); conn.commit(); LOG.info("Put successfully."); } catch (Exception e) { LOG.error("Put failed.", e); } LOG.info("Exiting testPut."); }
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 运行Python样例代码无需通过Maven打包,只需要上传user.keytab、krb5.conf 文件到客户端所在服务器上。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
  • HBase介绍 HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。HBase设计目标是用来解决关系型数据库在处理海量数据时的局限性。 HBase使用场景有如下几个特点: 处理海量数据(TB或PB级别以上)。 具有高吞吐量。 在海量数据中实现高效的随机读取。 具有很好的伸缩能力。 能够同时处理结构化和非结构化的数据。 不需要完全拥有传统关系型数据库所具备的ACID特性。ACID特性指原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持久性(Durability)。 HBase中的表具有如下特点: 大:一个表可以有上亿行,上百万列。 面向列:面向列(族)的存储和权限控制,列(族)独立检索。 稀疏:对于为空(null)的列,并不占用存储空间,因此,表可以设计的非常稀疏。
  • HBase常用概念 过滤器 过滤器提供了非常强大的特性来帮助用户提高HBase处理表中数据的效率。用户不仅可以使用HBase中预定义好的过滤器,而且可以实现自定义的过滤器。 协处理器 允许用户执行region级的操作,并且可以使用与RDBMS中触发器类似的功能。 keytab文件 存放用户信息的密钥文件。在安全模式下,应用程序采用此密钥文件进行API方式认证。 Client 客户端直接面向用户,可通过Java API、HBase Shell或者Web UI访问服务端,对HBase的表进行读写操作。本文中的HBase客户端特指HBase client的安装包,可参考HBase对外接口介绍。
  • HBase接口类型介绍 由于HBase本身是由java语言开发出来的,且java语言具有简洁通用易懂的特性,推荐用户使用java语言进行HBase应用程序开发。 HBase采用的接口与Apache HBase保持一致。 HBase通过接口调用,可提供的功能如表1所示。 表1 HBase接口提供的功能 功能 说明 CRUD数据读写功能 增查改删 高级特性 过滤器、二级索引,协处理器 管理功能 表管理、集群管理
  • 代码样例 完整样例代码可参考com.huawei.bigdata.hdfs.examples.ColocationExample。 在运行Colocation工程时,需要设置运行用户,此用户需绑定supergroup用户组。 在运行Colocation工程时,HDFS的配置项fs.defaultFS不能配置为viewfs://ClusterX。 初始化 使用Colocation前需要设置运行用户。 private static void init() throws IOException { // 设置用户,若用户没有设置HADOOP_USER_NAME,则使用USER if (System.getenv("HADOOP_USER_NAME") == null && System.getProperty("HADOOP_USER_NAME") == null) { System.setProperty("HADOOP_USER_NAME", USER); } } 获取实例 样例:Colocation的操作使用DFSColocationAdmin和DFSColocationClient实例,在进行创建group等操作前需获取实例。 dfsAdmin = new DFSColocationAdmin(conf); dfs = new DFSColocationClient(); dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf); 创建group 样例:创建一个gid01组,组中包含3个locator。 /** * 创建group * * @throws java.io.IOException */ private static void createGroup() throws IOException { dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01, Arrays.asList(new String[] { "lid01", "lid02", "lid03" })); } 写文件,写文件前必须创建对应的group 样例:写入testfile.txt文件。 /** * 创建并写入文件 * * @throws java.io.IOException */ private static void put() throws IOException { FSDataOutputStream out = dfs.create(new Path(TESTFILE_TXT), true, COLOCATION_GROUP_GROUP01, "lid01"); // 代写入HDFS的数据 byte[] readBuf = "Hello World".getBytes("UTF-8"); out.write(readBuf, 0, readBuf.length); out.close(); } 删除文件 样例:删除testfile.txt文件。 /** * 删除文件 * * @throws java.io.IOException */ @SuppressWarnings("deprecation") private static void delete() throws IOException { dfs.delete(new Path(TESTFILE_TXT)); } 删除group 样例:删除gid01。 /** * 删除group * * @throws java.io.IOException */ private static void deleteGroup() throws IOException { dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01); }
  • 功能简介 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性,将那些需进行关联操作的文件存放在相同数据节点上,在进行关联操作计算时避免了到别的数据节点上获取数据,大大降低网络带宽的占用。 在使用Colocation功能之前,建议用户对Colocation的内部机制有一定了解,包括: • Colocation分配节点原理 • 扩容与Colocation分配 • Colocation与数据节点容量 Colocation分配节点原理 Colocation为locator分配数据节点的时候,locator的分配算法会根据已分配的情况,进行均衡的分配数据节点。 locator分配算法的原理是,查询目前存在的所有locators,读取所有locators所分配的数据节点,并记录其使用次数。根据使用次数,对数据节点进行排序,使用次数少的排在前面,优先选择排在前面的节点。每次选择一个节点后,计数加1,并重新排序,选择后续的节点。 扩容与Colocation分配 集群扩容之后,为了平衡地使用所有的数据节点,使新的数据节点的分配频率与旧的数据节点趋于一致,有如下两种策略可以选择,如表1所示。 表1 分配策略 编号 策略 说明 1 删除旧的locators,为集群中所有数据节点重新创建locators。 在未扩容之前分配的locators,平衡的使用了所有数据节点。当扩容后,新加入的数据节点并未分配到已经创建的locators中,所以使用Colocation来存储数据的时候,只会往旧的数据节点存储数据。 由于locators与特定数据节点相关,所以当集群进行扩容的时候,就需要对Colocation的locators分配进行重新规划。 2 创建一批新的locators,并重新规划数据存放方式。 旧的locators使用的是旧的数据节点,而新创建的locators偏重使用新的数据节点,所以需要根据实际业务对数据的使用需求,重新规划locators的使用。 一般的,建议用户在进行集群扩容之后采用策略1来重新分配locators,可以避免数据偏重使用新的数据节点。 Colocation与数据节点容量 由于使用Colocation进行存储数据的时候,会固定存储在指定的locators所对应的数据节点上面,所以如果不对locator进行规划,会造成数据节点容量不均衡。下面总结了保证数据节点容量均衡的两个主要的使用原则,如表2所示。 表2 使用原则 编号 使用原则 说明 1 所有的数据节点在locators中出现的频率一样。 如何保证频率一样:假如数据节点有N个,则创建locators的数量应为N的整数倍(N个、2N个……)。 2 对于所有locators的使用需要进行合理的数据存放规划,让数据均匀的分布在这些locators中。 无 HDFS的二次开发过程中,可以获取DFSColocationAdmin和DFSColocationClient实例,进行从location创建group、删除group、写文件和删除文件的操作。 使用Colocation功能,用户指定了DataNode,会造成某些节点上数据量很大。数据倾斜严重,导致HDFS写任务失败。 由于数据倾斜,导致MapReduce只会在某几个节点访问,造成这些节点上负载很大,而其他节点闲置。 针对单个应用程序任务,只能使用一次DFSColocationAdmin和DFSColocationClient实例。如果每次对文件系统操作都获取此实例,会创建过多HDFS链接,消耗HDFS资源。 Colocation提供了文件同分布的功能,执行集群balancer或mover操作时,会移动数据块,使Colocation功能失效。因此,使用Colocation功能时,建议将HDFS配置项dfs.datanode.block-pinning.enabled设置为true,此时执行集群Balancer或Mover操作时,使用Colocation写入的文件将不会被移动,从而保证了文件同分布。
  • 回答 Spark任务在运行过程中,driver会创建一个spark-开头的本地临时目录,用于存放业务jar包,配置文件等,同时在本地创建一个blockmgr-开头的本地临时目录,用于存放block data。此两个目录会在Spark应用运行结束时自动删除。 此两个目录的存放路径优先通过SPARK_LOCAL_DIRS环境变量指定,若不存在该环境变量,则设置为spark.local.dir的值,若此配置还不存在,则使用java.io.tmpdir的值。客户端默认配置中spark.local.dir被设置为/tmp,因此默认使用系统/tmp目录。 但存在一些特殊情况,如driver进程未正常退出,比如被kill -9命令结束进程,或者Java虚拟机直接崩溃等场景,导致driver的退出流程未正常执行,则可能导致该部分目录无法被正常清理,残留在系统中。 当前只有yarn-client模式和local模式的driver进程会产生上述问题,在yarn-cluster模式中,已将container内进程的临时目录设置为container临时目录,当container退出时,由container自动清理该目录,因此yarn-cluster模式不存在此问题。
  • 创建Doris表 以Java JDBC方式执行SQL语句在集群中dbName变量对应的数据库下创建tableName对应的表。 String createTableSql = "create table if not exists " + dbName + "." + tableName + " (\n" + "c1 int not null,\n" + "c2 int not null,\n" + "c3 string not null\n" + ") engine=olap\n" + "unique key(c1, c2)\n" + "distributed by hash(c1) buckets 1"; public static void execDDL(Connection connection, String sql) throws Exception { try (PreparedStatement statement = connection.prepareStatement(sql)) { statement.execute(); } catch (Exception e) { logger.error("Execute sql {} failed.", sql, e); throw new Exception(e); } } 父主题: Doris JDBC接口调用样例程序
  • 通过HSFabric实现KeyTab文件认证 KeyTab文件认证,需要“jaas-zk.conf”、“krb5.conf”和“user.keytab”文件。 “krb5.conf”和“user.keytab”文件参考MRS组件应用安全认证说明章节获得。 “jaas-zk.conf”文件如下定义,“principal”为MRS组件应用安全认证说明中添加的认证用户名称+@+域名(域名为“krb5.conf”文件中的“default_realm”字段值,例如“HADOOP.COM”),“keyTab”为“user.keytab”文件的路径。 Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/opt/client/user.keytab" principal="hivetest@系统域名" useTicketCache=false storeKey=true debug=true; }; “jaas-zk.conf”配置文件中“keyTab”的路径需根据实际的路径修改。 例如: Windows路径:“D:\\hetu-examples\\hetu-examples-security\\src\\main\\resources\\user.keytab”。 Linux路径:“/opt/client/user.keytab”。
  • 通过python API的方式提交Flink SQL作业到Yarn上代码样例 下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。 import logging import sys import os from pyflink.table import (EnvironmentSettings, TableEnvironment) def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql() 表2 使用Python提交SQL作业参数说明 参数 说明 示例 file_path “datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql SQL示例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;
  • 通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例 下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”。 import os import logging import sys from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema from pyflink.common.typeinfo import Types from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableEnvironment, EnvironmentSettings def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交前修改sql路径 # file_path = "/opt/client/Flink/flink/insertData2kafka.sql" # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql" file_path = "insertData2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() def read_write_kafka(): # find kafka connector jars env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar" # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar" # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar" # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars(specific_jars) kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'} deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties=kafka_properties) print("---------read ---------------") ds = env.add_source(kafka_consumer) serialization_schema = JsonRowSerializationSchema.builder().with_type_info( type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config=kafka_properties) print("--------write------------------") ds.add_sink(kafka_producer) env.execute("pyflink kafka test") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") print("------------------insert data to kafka----------------") exec_sql() print("------------------read_write_kafka----------------") read_write_kafka() 表1 使用Python提交普通作业参数说明 参数 说明 示例 bootstrap.servers Kafka的Broker实例业务IP和端口。 192.168.12.25:21005 specific_jars “客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准: specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar" specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar file_path “insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql SQL示例: create table kafka_sink_table ( age int, name varchar(10) ) with ( 'connector' = 'kafka', 'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'test_group', 'format' = 'json' ); create TABLE datagen_source_table ( age int, name varchar(10) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink_table SELECT * FROM datagen_source_table;
  • 提供Join能力 表12 提供Join能力的相关接口 API 说明 def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2] 通过给定的key在一个窗口范围内join两条数据流。 join操作的key值通过where和eaualTo方法进行指定,代表两条流过滤出包含等值条件的数据。 def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2] 通过给定的key在一个窗口范围内co-group两条数据流。 coGroup操作的key值通过where和eaualTo方法进行指定,代表两条流通过该等值条件进行分区处理。
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用特别的类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,数据经过对设置的key值进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作,join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 提供分流能力 表8 提供分流能力的相关接口 API 说明 def split(selector: OutputSelector[T]): SplitStream[T] 传入OutputSelector,重写select方法确定分流的依据(即打标记),构建SplitStream流。即对每个元素做一个字符串的标记,作为选择的依据,打好标记之后就可以通过标记选出并新建某个标记的流。 def select(outputNames: String*): DataStream[T] 从一个SplitStream中选出一个或多个流。 outputNames指的是使用split方法对每个元素做的字符串标记的序列。
  • 提供设置eventtime属性的能力 表6 提供设置eventtime属性的能力的相关接口 API 说明 def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 为了能让event time窗口可以正常触发窗口计算操作,需要从记录中提取时间戳。 def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]
  • 访问开源 ZooKeeper 使用“testConnectApacheZk”连接开源ZooKeeper的代码,只需要将以下代码中的“xxx.xxx.xxx.xxx”修改为需要连接的开源的ZooKeeper的IP,端口号按照实际情况修改。如果仅需运行访问第三方Zookeeper的样例,需注释掉main函数中的“testConnectHive”方法。 digestZK = new org.apache.zookeeper.ZooKeeper("xxx.xxx.xxx.xxx:端口号", 60000, null); ZooKeeper连接使用完后需要关闭连接,否则可能导致连接泄露。可根据业务实际情况进行处理,代码如下: //使用try-with-resources方式,try语句执行完后会自动关闭ZooKeeper连接。 try (org.apache.zookeeper.ZooKeeper digestZk = new org.apache.zookeeper.ZooKeeper("xxx.xxx.xxx.xxx:端口号", 600000, null)) { ... }
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单的互通,而需要在通信之前进行相互认证,以确保通信的安全性。 用户在开发Oozie应用程序时,某些场景下需要Oozie与Hadoop、Hive等之间进行通信。那么Oozie应用程序中需要写入安全认证代码,确保Oozie程序能够正常运行。 安全认证有两种方式: 命令行认证: 提交Oozie应用程序运行前,在Oozie客户端执行如下命令获得认证。 kinit 组件业务用户 代码认证(Kerberos安全认证): 通过获取客户端的principal和keytab文件在应用程序中进行认证,用于Kerberos安全认证的keytab文件和principal文件您可以联系管理员创建并获取,具体使用方法在样例代码中会有详细说明。 目前样例代码统一调用LoginUtil类进行安全认证,支持Oracle JAVA平台和IBM JAVA平台。 代码示例中请根据实际情况,修改“USERNAME”为实际用户名,例如“developuser”。 private static void login(String keytabFilePath, String krb5FilePath, String user) throws IOException { Configuration conf = new Configuration(); conf.set(KERBEROS_PRINCIPAL, user); conf.set(KEYTAB_FILE, keytabFilePath); conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); conf.set(HADOOP_SECURITY_AUTHORIZATION, "true"); /* * if need to connect zk, please provide jaas info about zk. of course, * you can do it as below: * System.setProperty("java.security.auth.login.config", confDirPath + * "jaas.conf"); but the demo can help you more : Note: if this process * will connect more than one zk cluster, the demo may be not proper. you * can contact us for more help */ LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, user, keytabFilePath); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); LoginUtil.login(user, keytabFilePath, krb5FilePath, conf); }
  • 回答 由于checkpoint中包含了spark应用的对象序列化信息、task执行状态信息、配置信息等,因此,当存在以下问题时,从checkpoint恢复spark应用将会失败。 业务代码变更且变更类未明确指定SerialVersionUID。 spark内部类变更,且变更类未明确指定SerialVersionUID。 另外,由于checkpoint保存了部分配置项,因此可能导致业务修改了部分配置项后,从checkpoint恢复时,配置项依然保持为旧值的情况。当前只有以下部分配置会在从checkpoint恢复时重新加载。 "spark.yarn.app.id", "spark.yarn.app.attemptId", "spark.driver.host", "spark.driver.bindAddress", "spark.driver.port", "spark.master", "spark.yarn.jars", "spark.yarn.keytab", "spark.yarn.principal", "spark.yarn.credentials.file", "spark.yarn.credentials.renewalTime", "spark.yarn.credentials.updateTime", "spark.ui.filters", "spark.mesos.driver.frameworkId", "spark.yarn.jars"
  • 简介 Yarn是一个分布式的资源管理系统,用于提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer还可以周期性的在已有的代码上进行修改,可是随着代码的增加以及原MapReduce框架设计的不足,在原MapReduce框架上进行修改变得越来越困难,所以MapReduce的committer决定从架构上重新设计MapReduce,使下一代的MapReduce(MRv2/Yarn)框架具有更好的扩展性、可用性、可靠性、向后兼容性和更高的资源利用率,以及能支持除了MapReduce计算框架外的更多的计算框架。
  • 基本概念 ResourceManager(RM) RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。 ApplicationMaster(AM) 用户提交的每个应用程序均包含一个AM,主要功能包括: 与RM调度器协商以获取资源(用Container表示)。 将得到的资源进一步分配给内部任务。 与NM通信以启动/停止任务。 监控所有任务的运行状态,并在任务运行失败时重新为任务申请资源以重启任务。 NodeManager(NM) NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它会接收并处理来自AM的Container启动/停止等各种请求。 Container Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。
共100000条