华为云用户手册

  • 代码样例 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。 下面代码片段仅为演示,具体代码参见SparkHivetoHbasePythonExample: # -*- coding:utf-8 -*- from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("SparkHivetoHbase") \ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.SparkHivetoHbase') # 创建类实例并调用方法 spark._jvm.SparkHivetoHbase().hivetohbase(spark._jsc) # 停止SparkSession spark.stop()
  • 带“*”查询 假定现在有时间序列“root.sg.d1.s1”和“root.sg.d1.s2”。 执行SELECT example(*) from root.sg.d1 那么结果集中将包括“example(root.sg.d1.s1)”和“example(root.sg.d1.s2)”的结果。 执行SELECT example(s1, *) from root.sg.d1 那么结果集中将包括“example(root.sg.d1.s1, root.sg.d1.s1)”和“example(root.sg.d1.s1, root.sg.d1.s2)”的结果。 执行SELECT example(*, *) from root.sg.d1 那么结果集中将包括“example(root.sg.d1.s1, root.sg.d1.s1)”,“example(root.sg.d1.s2, root.sg.d1.s1)”,“example(root.sg.d1.s1, root.sg.d1.s2)” 和“ example(root.sg.d1.s2, root.sg.d1.s2)”的结果。
  • 带自定义输入参数的查询 用户可以在进行UDF查询的时候,向UDF传入任意数量的键值对参数。键值对中的键和值都需要被单引号或者双引号引起来。 键值对参数只能在时间序列后传入。 例如: SELECT example(s1, 'key1'='value1', 'key2'='value2'), example(*, 'key3'='value3') FROM root.sg.d1; SELECT example(s1, s2, 'key1'='value1', 'key2'='value2') FROM root.sg.d1;
  • 在Linux环境中调测ClickHouse Springboot样例程序 ClickHouse springboot应用程序也支持在Linux环境中运行。在程序代码完成开发后,您可以上传Jar包至准备好的Linux运行环境中运行。 前提条件 Linux环境已安装JDK,版本号需要和IntelliJ IDEA导出Jar包使用的JDK版本一致,并设置好Java环境变量。 编译并运行程序 在IDEA中右侧单击“Maven”,展开“Lifecycle”,双击“package”,对当前工程进行打包。 使用root用户登录ClickHouse客户端节点,创建运行目录,例如“/opt/test”,在IDEA的“target”目录下获取带有“-with-dependencies”的jar包,并将jar包和idea中conf文件夹一同上传到“/opt/test”目录,如: 执行如下命令,配置环境变量并运行jar包: cd 客户端安装路径 source bigdata_env cd /opt/test java -jar clickhouse-examples-1.0-SNAPSHOT-jar-with-dependencies.jar 显示结果如下: 调用ClickHouse的SpringBoot样例接口触发样例代码运行: Windows环境运行方式: 打开浏览器,输入:http://ClickHouse客户端节点IP:8080/clickhouse/executeQuery,查看浏览器返回信息: ClickHouse springboot client runs normally. Linux环境下执行运行方式: 登录ClickHouse客户端节点,执行如下命令,查看linux下shell日志打印和日志文件打印: curl http://localhost:8080/clickhouse/executeQuery vi clickhouse-springboot-example.log
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单的互通,而需要在通信之前进行相互认证,以确保通信的安全性。 用户在提交MapReduce应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交MapReduce的应用程序中需要写入安全认证代码,确保MapReduce程序能够正常运行。 安全认证有两种方式: 命令行认证: 提交MapReduce应用程序运行前,在MapReduce客户端执行如下命令获得认证。 kinit 组件业务用户 代码认证: 通过获取客户端的principal和keytab文件在应用程序中进行认证。
  • 代码样例 下面代码片段在com.huawei.bigdata.hbase.examples包的“TestMain”类的init方法中。 private static void init() throws IOException { // Default load from conf directory conf = HBaseConfiguration.create(); //In Windows environment String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator;[1] //In Linux environment //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; conf.addResource(new Path(userdir + "core-site.xml"), false); conf.addResource(new Path(userdir + "hdfs-site.xml"), false); conf.addResource(new Path(userdir + "hbase-site.xml"), false); } [1]userdir获取的是编译后资源路径下conf目录的路径。初始化配置用到的core-site.xml、hdfs-site.xml、hbase-site.xml文件,需要放置到“src/main/resources/conf”的目录下。
  • HBase应用开发流程介绍 本文档主要基于Java API对HBase进行应用开发。 开发流程中各阶段的说明如图1和表1所示。 图1 HBase应用程序开发流程 表1 HBase应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HBase的基本概念,了解场景需求,设计表等。 HBase常用概念 准备开发和运行环境 HBase的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。HBase的运行环境即HBase客户端,根据指导完成客户端的安装和配置。 准备HBase应用开发和运行环境 准备工程 HBase提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置HBase样例工程 准备安全认证 如果您使用的是安全集群,需要进行安全认证。 配置HBase应用安全认证 根据场景开发工程 提供了Java语言的样例工程,包含从建表、写入到删除表全流程的样例工程。 开发HBase应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测HBase应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测HBase应用 父主题: HBase开发指南(安全模式)
  • 创建Doris数据库 以Java JDBC方式执行SQL语句在集群中创建dbName变量对应的数据库。 String createDatabaseSql = "create database if not exists " + dbName; public static void execDDL(Connection connection, String sql) throws Exception { try (PreparedStatement statement = connection.prepareStatement(sql)) { statement.execute(); } catch (Exception e) { logger.error("Execute sql {} failed.", sql, e); throw new Exception(e); } } 父主题: Doris JDBC接口调用样例程序
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testScanData方法中。 public void testScanData() { LOG.info("Entering testScanData."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // Set the cache size. scan.setCaching(1000); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data successfully."); } catch (IOException e) { LOG.error("Scan data failed " ,e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testScanData."); }
  • 回答 bulkload是通过启动MapReduce任务直接生成HFile文件,再将HFile文件注册到HBase,因此错误的使用bulkload会因为启动MapReduce任务而占用更多的集群内存和CPU资源,也可能会生成大量很小的HFile文件频繁的触发Compaction,导致查询速度急剧下降。 错误的使用put,会造成数据加载慢,当分配给RegionServer内存不足时会造成RegionServer内存溢出从而导致进程退出。 下面给出bulkload和put适合的场景: bulkload适合的场景: 大量数据一次性加载到HBase。 对数据加载到HBase可靠性要求不高,不需要生成WAL文件。 使用put加载大量数据到HBase速度变慢,且查询速度变慢时。 加载到HBase新生成的单个HFile文件大小接近HDFS block大小。 put适合的场景: 每次加载到单个Region的数据大小小于HDFS block大小的一半。 数据需要实时加载。 加载数据过程不会造成用户查询速度急剧下降。
  • ClickHouse简介 ClickHouse是面向联机分析处理的列式数据库,支持SQL查询,且查询性能好,特别是基于大宽表的聚合分析查询性能非常优异,比其他分析型数据库速度快一个数量级。 ClickHouse的设计优点: 数据压缩比高 多核并行计算 向量化计算引擎 支持嵌套数据结构 支持稀疏索引 支持数据Insert和Update ClickHouse的应用场景: 实时数仓场景 使用流式计算引擎(如Flink)把实时数据写入ClickHouse,借助ClickHouse的优异查询性能,在亚秒级内响应多维度、多模式的实时查询分析请求。 离线查询场景 把规模庞大的业务数据导入到ClickHouse,构造数亿至数百亿记录规模、数百以上的维度的大宽表,随时进行个性化统计和持续探索式查询分析,辅助商业决策,具有非常好的查询体验。
  • 基本概念 cluster cluster(集群)在ClickHouse里是一种逻辑的概念,它可以由用户根据需要自由的定义,与通常理解的集群有一定的差异。多个ClickHouse节点之间是一种松耦合的关系,各自独立存在。 shards shard(分片)是对cluster的横向切分,1个cluster可以由多个shard组成。 replicas replica(副本),1个shard可以有多个replica组成。 partition partition(分区),针对的是本地replica而言的,可以理解为是一种纵向切分。 MergeTree ClickHouse拥有非常庞大的表引擎体系,MergeTree作为家族系统最基础的表引擎,提供了数据分区、一级索引和二级索引等功能。在创建表的时候需要指定表引擎,不同的表引擎会决定一张数据表的最终“性格”,比如数据表拥有何种特性、数据以何种形式被存储以及如何被加载。
  • 问题 Flink任务配置State Backend为RocksDB时,运行报如下错误: Caused by: java.lang.UnsatisfiedLinkError: /srv/BigData/hadoop/data1/nm/usercache/***/appcache/application_****/rocksdb-lib-****/librocksdbjni-linux64.so: /lib64/libpthread.so.0: version `GLIBC_2.12` not found (required by /srv/BigData/hadoop/***/librocksdbjni-linux64.so) at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1965) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1890) at java.lang.Runtime.load0(Runtime.java:795) at java.lang.System.load(System.java:1062) at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78) at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:734) ... 11 more
  • Spark SQL常用接口 Spark SQL中重要的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集 DataFrameReader:从外部存储系统加载DataFrame的接口。 DataFrameStatFunctions:实现DataFrame的统计功能。 UserDefinedFunction:用户自定义的函数。 常见的Actions方法有: 表6 Spark SQL方法介绍 方法 说明 Row[] collect() 返回一个数组,包含DataFrame的所有列。 long count() 返回DataFrame的行数。 DataFrame describe(java.lang.String... cols) 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 Row first() 返回第一行。 Row[] head(int n) 返回前n行。 void show() 用表格形式显示DataFrame的前20行。 Row[] take(int n) 返回DataFrame中的前n行。 表7 基本的DataFrame Functions介绍 方法 说明 void explain(boolean extended) 打印出SQL语句的逻辑计划和物理计划。 void printSchema() 打印schema信息到控制台。 registerTempTable 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 DataFrame toDF(java.lang.String... colNames) 返回一个列重命名的DataFrame。 DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols) 根据不同的列,按照升序或者降序排序。 GroupedData rollup(Column... cols) 对当前的DataFrame特定列进行多维度的回滚操作。
  • 通过python API的方式提交Flink SQL作业到Yarn上代码样例 下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。 import logging import sys import os from pyflink.table import (EnvironmentSettings, TableEnvironment) def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql() 表2 使用Python提交SQL作业参数说明 参数 说明 示例 file_path “datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql SQL示例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;
  • 通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例 下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”。 import os import logging import sys from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema from pyflink.common.typeinfo import Types from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableEnvironment, EnvironmentSettings def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交前修改sql路径 # file_path = "/opt/client/Flink/flink/insertData2kafka.sql" # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql" file_path = "insertData2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() def read_write_kafka(): # find kafka connector jars env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar" # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar" # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar" # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars(specific_jars) kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'} deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties=kafka_properties) print("---------read ---------------") ds = env.add_source(kafka_consumer) serialization_schema = JsonRowSerializationSchema.builder().with_type_info( type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config=kafka_properties) print("--------write------------------") ds.add_sink(kafka_producer) env.execute("pyflink kafka test") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") print("------------------insert data to kafka----------------") exec_sql() print("------------------read_write_kafka----------------") read_write_kafka() 表1 使用Python提交普通作业参数说明 参数 说明 示例 bootstrap.servers Kafka的Broker实例业务IP和端口。 192.168.12.25:21005 specific_jars “客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准: specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar" specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar file_path “insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql SQL示例: create table kafka_sink_table ( age int, name varchar(10) ) with ( 'connector' = 'kafka', 'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'test_group', 'format' = 'json' ); create TABLE datagen_source_table ( age int, name varchar(10) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink_table SELECT * FROM datagen_source_table;
  • 准备开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发HBase应用程序的工具,版本要求:2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Junit插件 开发环境的基本配置。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行配置。 7-zip 用于解压“*.zip”和“*.rar”文件。 支持7-Zip 16.04版本。
  • 查看调测结果 ClickHouse应用程序运行完成后,可通过以下方式查看程序运行情况: 通过运行结果查看程序运行情况。 通过ClickHouse日志获取应用运行情况。 即查看当前jar文件所在目录的“logs/clickhouse-example.log”日志文件,例如“客户端安装目录/JDBC/logs/clickhouse-example.log”或“客户端安装目录/JDBCTransaction/logs/clickhouse-example.log”。 jar包运行结果如下: 2021-06-10 20:53:56,028 | INFO | main | Current load balancer is 10.112.17.150:21426 | com.huawei.clickhouse.examples.Util.insertData(Util.java:128) 2021-06-10 20:53:58,247 | INFO | main | Inert batch time is 1442 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:53:59,649 | INFO | main | Inert batch time is 1313 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:05,872 | INFO | main | Inert batch time is 6132 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:10,223 | INFO | main | Inert batch time is 4272 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:11,614 | INFO | main | Inert batch time is 1300 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:12,871 | INFO | main | Inert batch time is 1200 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:14,589 | INFO | main | Inert batch time is 1663 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:16,141 | INFO | main | Inert batch time is 1500 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:17,690 | INFO | main | Inert batch time is 1498 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:19,206 | INFO | main | Inert batch time is 1468 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:19,207 | INFO | main | Inert all batch time is 22626 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:148) 2021-06-10 20:54:19,208 | INFO | main | Current load balancer is 10.112.17.150:21426 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:58) 2021-06-10 20:54:20,231 | INFO | main | Execute query:select * from mutong1.testtb_all order by age limit 10 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:63) 2021-06-10 20:54:21,266 | INFO | main | Execute time is 1035 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:67) 2021-06-10 20:54:21,267 | INFO | main | Current load balancer is 10.112.17.150:21426 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:58) 2021-06-10 20:54:21,815 | INFO | main | Execute query:select toYYYYMM(date),count(1) from mutong1.testtb_all group by toYYYYMM(date) order by count(1) DESC limit 10 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:63) 2021-06-10 20:54:22,897 | INFO | main | Execute time is 1082 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:67) 2021-06-10 20:54:22,898 | INFO | main | name age date | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,898 | INFO | main | huawei_266 0 2021-12-19 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_2500 0 2021-12-29 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_8980 0 2021-12-16 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_671 0 2021-12-29 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_2225 0 2021-12-12 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_6040 0 2021-12-14 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_7294 0 2021-12-10 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_1133 0 2021-12-25 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | huawei_3161 0 2021-12-21 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | huawei_3992 0 2021-11-25 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | toYYYYMM(date) count() | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | 201910 2247 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | 202105 2213 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | 201801 2208 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | 201803 2204 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201810 2167 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201805 2166 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201901 2164 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201908 2145 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201912 2143 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 202107 2137 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144)
  • 操作场景 在程序代码完成开发后,您可以在Windows环境中运行应用。本地和集群业务平面网络互通时,您可以直接在本地进行调测。 MapReduce应用程序运行完成后,可通过如下方式查看应用程序的运行情况。 在IntelliJ IDEA中查看应用程序运行情况。 通过MapReduce日志获取应用程序运行情况。 登录MapReduce WebUI查看应用程序运行情况。 登录Yarn WebUI查看应用程序运行情况。 在MapReduce任务运行过程中禁止重启HDFS服务,否则可能会导致任务失败。
  • 运行多组件样例程序 将hive-site.xml、hbase-site.xml、hiveclient.properties放入工程的conf目录。 确保样例工程依赖的所有Hive、HBase相关jar包已正常获取。 打开MultiComponentLocalRunner.java,确认代码中System.setProperty("HADOOP_USER_NAME", "root");设置了用户为root,请确保场景说明中上传的数据的用户为root,或者在代码中将root修改为上传数据的用户名。 在IntelliJ IDEA开发环境中,选中“MultiComponentLocalRunner.java”工程,单击运行对应的应用程序工程。或者右键工程,选择“Run MultiComponentLocalRunner.main()”运行应用工程。
  • 功能简介 Spark的REST API以JSON格式展现Web UI的一些指标,提供用户一种更简单的方法去创建新的展示和监控的工具,并且支持查询正在运行的app和已经结束的app的相关信息。开源的Spark REST接口支持对Jobs、Stages、Storage、Environment和Executors的信息进行查询,FusionInsight版本中添加了查询SQL、JDBC Server和Streaming的信息的REST接口。开源REST接口完整和详细的描述请参考官网上的文档以了解其使用方法:https://spark.apache.org/docs/3.1.1/monitoring.html#rest-api。
  • REST API增强 SQL相关的命令:获取所有SQL语句和执行时间最长的SQL语句 SparkUI命令: curl http://192.168.195.232:8088/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 JobHistory命令: curl http://192.168.227.16:4040/api/v1/applications/application_1478570725074_0004/SQL?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号,application_1478570725074_0004为应用ID。 结果: SparkUI命令和JobHistory命令的查询结果均为: { "longestDurationOfCompletedSQL" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] } ], "sqls" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] }] } 结果分析: 通过这个命令,可以查询当前应用的所有SQL语句的信息(即结果中“sqls”的部分),执行时间最长的SQL语句的信息(即结果中“longestDurationOfCompletedSQL”的部分)。每个SQL语句的信息如下表3。 表3 SQL的常用信息 参数 描述 id SQL语句的ID status SQL语句的执行状态,有RUNNING、COMPLETED、FAILED三种 runningJobs SQL语句产生的job中,正在执行的job列表 successedJobs SQL语句产生的job中,执行成功的job列表 failedJobs SQL语句产生的job中,执行失败的job列表 JDBC Server相关的命令:获取连接数,正在执行的SQL数,所有session信息,所有SQL的信息 命令: curl http://192.168.195.232:8088/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/sqlserver?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 结果: { "sessionNum" : 1, "runningSqlNum" : 0, "sessions" : [ { "user" : "spark", "ip" : "192.168.169.84", "sessionId" : "9dfec575-48b4-4187-876a-71711d3d7a97", "startTime" : "2016/10/29 15:21:10", "finishTime" : "", "duration" : "1 minute 50 seconds", "totalExecute" : 1 } ], "sqls" : [ { "user" : "spark", "jobId" : [ ], "groupId" : "e49ff81a-230f-4892-a209-a48abea2d969", "startTime" : "2016/10/29 15:21:13", "finishTime" : "2016/10/29 15:21:14", "duration" : "555 ms", "statement" : "show tables", "state" : "FINISHED", "detail" : "== Parsed Logical Plan ==\nShowTablesCommand None\n\n== Analyzed Logical Plan ==\ntableName: string, isTemporary: boolean\nShowTablesCommand None\n\n== Cached Logical Plan ==\nShowTablesCommand None\n\n== Optimized Logical Plan ==\nShowTablesCommand None\n\n== Physical Plan ==\nExecutedCommand ShowTablesCommand None\n\nCode Generation: true" } ] } 结果分析: 通过这个命令,可以查询当前JDBC应用的session连接数,正在执行的SQL数,所有的session和SQL信息。每个session的信息如下表4,每个SQL的信息如下表5。 表4 session常用信息 参数 描述 user 该session连接的用户 ip session所在的节点IP sessionId session的ID startTime session开始连接的时间 finishTime session结束连接的时间 duration session连接时长 totalExecute 在该session上执行的SQL数 表5 sql常用信息 参数 描述 user SQL执行的用户 jobId SQL语句包含的job id列表 groupId SQL所在的group id startTime SQL开始时间 finishTime SQL结束时间 duration SQL执行时长 statement 对应的语句 detail 对应的逻辑计划,物理计划 Streaming相关的命令:获取平均输入频率,平均调度时延,平均执行时长,总时延平均值 命令: curl http://192.168.195.232:8088/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/streaming/statistics?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1477722033672_0008为在YARN中的应用ID。 结果: { "startTime" : "2018-12-25T08:58:10.836GMT", "batchDuration" : 1000, "numReceivers" : 1, "numActiveReceivers" : 1, "numInactiveReceivers" : 0, "numTotalCompletedBatches" : 373, "numRetainedCompletedBatches" : 373, "numActiveBatches" : 0, "numProcessedRecords" : 1, "numReceivedRecords" : 1, "avgInputRate" : 0.002680965147453083, "avgSchedulingDelay" : 14, "avgProcessingTime" : 47, "avgTotalDelay" : 62 } 结果分析: 通过这个命令,可以查询当前Streaming应用的平均输入频率(events/sec),平均调度时延(ms),平均执行时长(ms),总时延平均值(ms)。
  • REST接口 通过以下命令可跳过REST接口过滤器获取相应的应用信息。 普通模式下,JobHistory仅支持http协议,故在如下命令的url中请使用http协议。 获取JobHistory中所有应用信息: 命令: curl http://192.168.227.16:4040/api/v1/applications?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号。 结果: [ { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ] }, { " id" : "application_1517290848707_0145", "name" : "Spark shell", "attempts" : [ { "startTime" : "2018-01-31T15:20:31.286CST", "endTime" : "1970-01-01T07:59:59.999CST", "lastUpdated" : "2018-01-31T15:20:47.086CST", "duration" : 0, "sparkUser" : "admintest", "completed" : false, "startTimeEpoch" : 1517383231286, "endTimeEpoch" : -1, "lastUpdatedEpoch" : 1517383247086 } ] }] 结果分析: 通过这个命令,可以查询当前集群中所有的Spark应用(包括正在运行的应用和已经完成的应用),每个应用的信息如下表1。 表1 应用常用信息 参数 描述 id 应用的ID name 应用的Name attempts 应用的尝试,包含了开始时间、结束时间、执行用户、是否完成等信息 获取JobHistory中某个应用的信息: 命令: curl http://192.168.227.16:4040/api/v1/applications/application_1517290848707_0008?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号,application_1517290848707_0008为应用的id。 结果: { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ] } 结果分析: 通过这个命令,可以查询某个Spark应用的信息,显示的信息如表1所示。 获取正在执行的某个应用的Executor信息: 针对alive executor命令: curl http://192.168.169.84:8088/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/executors?mode=monitoring --insecure 针对全部executor(alive&dead)命令: curl http://192.168.169.84:8088/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/allexecutors?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1478570725074_0046为在YARN中的应用ID。 结果: [{ "id" : "driver", "hostPort" : "192.168.169.84:23886", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 278019440, "executorLogs" : { } }, { "id" : "1", "hostPort" : "192.168.169.84:23902", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 1, "maxTasks" : 1, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 139, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 555755765, "executorLogs" : { "stdout" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stdout?start=-4096", "stderr" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stderr?start=-4096" } } ] 结果分析: 通过这个命令,可以查询当前应用的所有Executor信息(包括Driver),每个Executor的信息包含如下表2所示的常用信息。 表2 Executor常用信息 参数 描述 id Executor的ID hostPort Executor所在节点的ip:端口 executorLogs Executor的日志查看路径
  • 准备Manager应用开发和运行环境 在进行开发时,要准备的开发和运行环境如表1所示。 表1 开发和运行环境 准备项 说明 操作系统 Windows系统,支持Windows 7以上版本。 本地开发环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: JDK版本号应该与用户要访问的FusionInsight Manager使用的版本号一致,具体的版本号可以查看对应版本文档或咨询系统管理员。 例如FusionInsight Manager 8.1.2.2支持的JDK是JDK 1.8.x。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 父主题: 准备Manager应用开发环境
  • MapReduce应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 MapReduce应用程序开发流程 表1 MapReduce应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解MapReduce的基本概念。 MapReduce应用开发简介 准备开发和运行环境 使用IntelliJ IDEA工具,请根据指导完成开发环境配置。 MapReduce的运行环境即MapReduce客户端,请根据指导完成客户端的安装和配置。 准备MapReduce开发和运行环境 准备工程 MapReduce提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个MapReduce工程。 导入并配置MapReduce样例工程 (可选)创建MapReduce样例工程 根据场景开发工程 提供了样例工程。 帮助用户快速了解MapReduce各部件的编程接口。 开发MapReduce应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测MapReduce应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测MapReduce应用 父主题: MapReduce开发指南(普通模式)
  • 基于API的Glob路径模式以获取LocatedFileStatus和从FileStatus打开文件 在DistributedFileSystem中添加了以下API,以获取具有块位置的FileStatus,并从FileStatus对象打开文件。这些API将减少从客户端到Namenode的RPC调用的数量。 表6 FileSystem API接口说明 Interface接口 Description说明 public LocatedFileStatus[] globLocatedStatus(Path, PathFilter, boolean) throws IOException 返回一个LocatedFileStatus对象数组,其对应文件路径符合路径过滤规则。 public FSDataInputStream open(FileStatus stat) throws IOException 如果stat对象是LocatedFileStatusHdfs的实例,该实例已具有位置信息,则直接创建InputStream而不联系Namenode。
  • HetuEngine连接方式说明 表1 HetuEngine连接方式说明 连接方式 是否支持用户名密码认证方式 是否支持Keytab认证方式 是否支持客户端跨网段访问 使用前提 HSFabric 是 是 是 确保业务侧和HetuServer服务端HSFabric所在业务节点网络互通 适用于双平面的网络场景 只需对外开放HSFabric固定的IP,端口 支持范围:MRS 3.1.3及之后版本 HSBroker 是 否 否 确保业务侧和HetuServer服务端HSBroker、Coordinator(随机分布在Yarn NodeManger)所在业务节点网络互通 需对外开放Coordinator的IP,端口 支持范围:MRS 3.1.0及之后版本
  • HetuEngine基本概念 HSBroker:HetuEngine的服务代理,用作用户租户管理校验,HetuEngine访问URL的获取等。 Coordinator:HetuEngine服务的资源协调者,负责SQL解析和优化等事务。 Worker:负责执行任务和处理数据。 Connector:HetuEngine访问数据库的接口,HetuEngine通过Connector的驱动连接数据源,读取数据源元数据和对数据进行增删改查等操作。 Catalog:HetuEngine中一个catalog配置文件对应一个数据源,一个数据源可以有多个不同catalog配置,可以通过数据源的properties文件进行配置。 Schema:对应数据库的Schema名称。 Table:对应数据库的表名。
  • Oozie Java接口介绍 Java API主要由org.apache.oozie.client.OozieClient提供。 表1 接口介绍 方法 说明 public String run(Properties conf) 运行job public void start(String jobId) 启动指定的job public String submit(Properties conf) 提交job public void kill(String jobId) 删除指定的job public void suspend(String jobId) 暂停指定的job public void resume(String jobId) 恢复指定的job public WorkflowJob getJobInfo(String jobId) 获取Job信息 父主题: 常用Oozie API接口介绍
  • 代码样例 如下是读文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 读文件 * * @throws java.io.IOException */ private void read() throws IOException { String strPath = DEST_PATH + File.separator + FILE_NAME; Path path = new Path(strPath); FSDataInputStream in = null; BufferedReader reader = null; StringBuffer strBuffer = new StringBuffer(); try { in = fSystem.open(path); reader = new BufferedReader(new InputStreamReader(in)); String sTempOneLine; // write file while ((sTempOneLine = reader.readLine()) != null) { strBuffer.append(sTempOneLine); } LOG.info("result is : " + strBuffer.toString()); LOG.info("success to read."); } finally { // make sure the streams are closed finally. IOUtils.closeStream(reader); IOUtils.closeStream(in); } }
共100000条