华为云用户手册

  • 回答 用户尝试收集大量数据到Driver端,如果Driver端的内存不足以存放这些数据,那么就会抛出OOM(OutOfMemory)的异常,然后Driver端一直在进行GC,尝试回收垃圾来存放返回的数据,导致应用长时间挂起。 解决措施: 如果用户需要在OOM场景下强制将应用退出,那么可以在启动Spark Core应用时,在客户端配置文件“$SPARK_HOME/conf/spark-defaults.conf”中的配置项“spark.driver.extraJavaOptions”中添加如下内容: -XX:OnOutOfMemoryError='kill -9 %p'
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从Kafka中读取数据,执行对应处理之后,然后将结果数据回写至Kafka中。 例如:Spark Streming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过Kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将Kafka的阈值调大,建议在FusionInsight Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Impala中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 开发思路 ClickHouse作为一款独立的DBMS系统,使用SQL语言就可以进行常见的操作。开发程序示例中,全部通过clickhouse-jdbc API接口来进行描述,开发流程主要分为以下几部分: 设置属性:设置连接ClickHouse服务实例的参数属性。 建立连接:建立和ClickHouse服务实例的连接。 创建库:创建ClickHouse数据库。 创建表:创建ClickHouse数据库下的表。 插入数据:插入数据到ClickHouse表中。 查询数据:查询ClickHouse表数据。 删除表:删除已创建的ClickHouse表。
  • 扩展使用 配置Hive中间过程的数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; 自定义函数,具体内容请参见创建Hive用户自定义函数。
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 功能介绍 本小节介绍了如何使用HQL创建内部表、外部表的基本操作。创建表主要有以下三种方式: 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。 内部表,如果对数据的处理都由Hive完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。 外部表,如果数据要被多种工具(如Pig等)共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。 目前表名长度最长为128,字段名长度最长为128,字段注解长度最长为4000,WITH SERDEPROPERTIES 中key长度最长为256,value长度最长为4000。以上的长度均表示字节长度。
  • 回答 问题原因: 在IBM JDK下建立的Hive connection时间超过登录用户的认证超时时间(默认一天),导致认证失败。 IBM JDK的机制跟Oracle JDK的机制不同,IBM JDK在认证登录后的使用过程中做了时间检查却没有检测外部的时间更新,导致即使显式调用Hive relogin也无法得到刷新。 解决措施: 通常情况下,在发现Hive connection不可用的时候,可以关闭该connection,重新创建一个connection继续执行。
  • Hive SQL Hive SQL支持Hive-3.1.0版本中的所有特性,详情请参见https://cwiki.apache.org/confluence/display/hive/languagemanual。 FusionInsight系统提供的扩展Hive语句如表1所示。 表1 扩展Hive语句 扩展语法 语法说明 语法示例 示例说明 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...) ] ...... [TBLPROPERTIES ("groupId"=" group1 ","locatorId"="locator1")] ...; 创建一个hive表,并指定表数据文件分布的locator信息。详细说明请参见使用HDFS Colocation存储Hive表。 CREATE TABLE tab1 (id INT, name STRING) row format delimited fields terminated by '\t' stored as RCFILE TBLPROPERTIES("groupId"=" group1 ","locatorId"="locator1"); 创建表tab1,并指定tab1的表数据分布在locator1节点上。 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...) ] ... [TBLPROPERTIES ('column.encode.columns'='col_name1,col_name2'| 'column.encode.indices'='col_id1,col_id2', 'column.encode.classname'='encode_classname')]...; 创建一个hive表,并指定表的加密列和加密算法。详细说明请参见使用Hive列加密功能。 create table encode_test(id INT, name STRING, phone STRING, address STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('column.encode.indices'='2,3', 'column.encode.classname'='org.apache.hadoop.hive.serde2.SMS4Rewriter') STORED AS TEXTFILE; 创建表encode_test,并指定插入数据时对第2、3列加密,加密算法类为org.apache.hadoop.hive.serde2.SMS4Rewriter。 REMOVE TABLE hbase_tablename [WHERE where_condition]; 删除hive on hbase表中符合条件的数据。详细说明请参见删除Hive on HBase表中的单行记录。 remove table hbase_table1 where id = 1; 删除表中符合条件“id =1”的数据。 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] STORED AS inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 创建hive表,并设定表可以指定自定义行分隔符。详细说明请参见自定义行分隔符。 create table blu(time string, num string, msg string) row format delimited fields terminated by ',' stored as inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 创建表blu,指定inputformat为SpecifiedDelimiterInputFormat,以便查询时可以指定表的查询行分隔符。 父主题: 对外接口
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见数据加载。 雇员信息数据如表1所示: 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 Country1:City1 2014 3 Tom D 12000.02 personal income tax&0.09 Country2:City2 2014 4 Jack D 24000.03 personal income tax&0.09 Country3:City3 2014 6 Linda D 36000.04 personal income tax&0.09 Country4:City4 2014 8 Zhang R 9000.05 personal income tax&0.05 Country5:City5 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示: 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 加载雇员扩展信息数据到雇员联络信息表“employees_info_extended”中。 雇员扩展信息数据如表3所示: 表3 雇员扩展信息数据 编号 姓名 电话号码 e-mail 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang 135 XXXX XXXX xxxx@xx.com R 8000.01 personal income tax&0.05 Country1:City1 2014 3 Tom 159 XXXX XXXX xxxxx@xx.com.cn D 12000.02 personal income tax&0.09 Country2:City2 2014 4 Jack 186 XXXX XXXX xxxx@xx.org D 24000.03 personal income tax&0.09 Country3:City3 2014 6 Linda 189 XXXX XXXX xxxx@xxx.cn D 36000.04 personal income tax&0.09 Country4:City4 2014 8 Zhang 134 XXXX XXXX xxxx@xxxx.cn R 9000.05 personal income tax&0.05 Country5:City5 2014 数据分析。 数据分析代码实现,请见数据查询。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。实现请参见使用JDBC提交数据分析任务。
  • 开发流程 开发流程中各阶段的说明如图1和表1所示。 图1 Impala应用程序开发流程 表1 Impala应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Impala的基本概念。 常用概念 准备开发和运行环境 Impala的应用程序支持使用Java、Python两种语言进行开发。推荐使用IntelliJ IDEA工具,请根据指导完成不同语言的开发环境配置。 准备开发和运行环境 根据场景开发工程 提供了Java、Python两种不同语言的样例工程,还提供了从建表、数据加载到数据查询的样例工程。 典型场景说明 运行程序及查看结果 指导用户将开发好的程序编译提交运行并查看结果。 调测程序 父主题: 概述
  • 常用概念 客户端 客户端直接面向用户,可通过Java API、Thrift API访问服务端进行Impala的相关操作。本文中的Impala客户端特指Impala client的安装目录,里面包含通过Java API访问Impala的样例代码。 HiveQL语言 Hive Query Language,类SQL语句,与Hive类似。 Statestore Statestore管理Impala集群中所有的Impalad实例的健康状态,并将实例健康信息广播到所有实例上。当某一个Impalad实例发生故障,比如节点异常、网络异常等,Statestore将通知其他Impalad实例,后续的查询请求等将不会向该实例分发。 Catalog Catalog实例服务将每个Impalad实例上发生的元数据变动同步到集群内其他Impalad实例,从而避免在一个Impalad实例中更改元数据,其他各个实例需要执行REFRESH操作来更新。但是,在Hive中建表,修改表等,则需要执行REFRESH或者INVALIDATE METADATA操作。 父主题: 概述
  • 应用开发简介 Impala直接对存储在HDFS,HBase 或对象存储服务(OBS)中的Hadoop数据提供快速,交互式SQL查询。除了使用相同的统一存储平台之外,Impala还使用与Apache Hive相同的元数据,SQL语法(Hive SQL),ODBC驱动程序和用户界面(Hue中的Impala查询UI)。这为实时或面向批处理的查询提供了一个熟悉且统一的平台。作为查询大数据的工具补充,Impala不会替代基于MapReduce构建的批处理框架,例如Hive。基于MapReduce构建的Hive和其他框架最适合长时间运行的批处理作业。 Impala主要特点如下: 支持Hive查询语言(HiveQL)中大多数的SQL-92功能,包括 SELECT,JOIN和聚合函数。 HDFS,HBase 和对象存储服务(OBS)存储,包括: HDFS文件格式:基于分隔符的text file,Parquet,Avro,SequenceFile和RCFile。 压缩编解码器:Snappy,GZIP,Deflate,BZIP。 常见的数据访问接口包括: JDBC驱动程序。 ODBC驱动程序。 HUE beeswax和Impala查询UI。 impala-shell命令行接口。 支持Kerberos身份认证。 Impala主要应用于实时查询数据的离线分析(如日志分析,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景下。 父主题: 概述
  • 准备开发环境 在进行应用开发时,需要准备的本地开发环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,推荐Windows7以上版本。 运行环境:Windows或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: MRS集群的服务端和客户端仅支持自带的Oracle JDK(版本为1.8),不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的,支持Oracle JDK和IBM JDK。 Oracle JDK:支持1.7和1.8版本。 IBM JDK:推荐1.7.8.10、1.7.9.40和1.8.3.0版本。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-zip 16.04版本。
  • 准备运行环境 进行应用开发时,需要同时准备代码的运行调测的环境,用于验证应用程序运行正常。 如果本地Windows开发环境和集群业务平面网络互通,然后直接在Windows中进行程序调测。 如果使用Linux环境调测程序,需准备安装集群客户端的Linux节点并获取相关配置文件。 集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Impala中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 功能简介 本小节介绍了如何使用Impala SQL建内部表、外部表的基本操作。创建表主要有以下三种方式。 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。 内部表,如果对数据的处理都由Impala完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。 外部表,如果数据要被多种工具(如Pig等)共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。
  • 获取数据库连接 使用JDK的驱动管理类java.sql.DriverManager来获取一个Impalad的数据库连接。 Impalad的数据库URL为url="jdbc:hive2://xxx.xxx.xxx.xxx:21050;auth=KERBEROS;principal=impala/hadoop.hadoop.com@HADOOP.COM;user.principal=impala/hadoop.hadoop.com;user.keytab=conf/impala.keytab"; 以上已经经过安全认证,所以用户名和密码为null或者空。 // 建立连接 connection = DriverManager.getConnection(url, "", "");
  • 执行Impala SQL 执行Impala SQL,注意Impala SQL不能以";"结尾。 正确示例: String sql = "SELECT COUNT(*) FROM employees_info";Connection connection = DriverManager.getConnection(url, "", "");PreparedStatement statement = connection.prepareStatement(sql);resultSet = statement.executeQuery(); 错误示例: String sql = "SELECT COUNT(*) FROM employees_info;";Connection connection = DriverManager.getConnection(url, "", "");PreparedStatement statement = connection.prepareStatement(sql);resultSet = statement.executeQuery();
  • Impala SQL语法规则之判空 判断字段是否为“空”,即没有值,使用“is null”;判断不为空,即有值,使用“is not null”。 要注意的是,在Impala SQL中String类型的字段若是空字符串, 即长度为0,那么对它进行is null的判断结果是False。此时应该使用“col = '' ”来判断空字符串;使用“col != '' ”来判断非空字符串。 正确示例: select * from default.tbl_src where id is null;select * from default.tbl_src where id is not null;select * from default.tbl_src where name = '';select * from default.tbl_src where name != ''; 错误示例: select * from default.tbl_src where id = null;select * from default.tbl_src where id != null;select * from default.tbl_src where name is null;select * from default.tbl_src where name is not null;注:表tbl_src的id字段为Int类型,name字段为String类型。
  • Hive JDBC驱动的加载 客户端程序以JDBC的形式连接Impalad时,需要首先加载Hive的JDBC驱动类org.apache.hive.jdbc.HiveDriver。 所以在客户端程序开始前,必须先使用当前类加载器加载该驱动类。 如果classpath下没有相应的jar包,则客户端程序抛出Class Not Found异常并退出。 如下: Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
  • 多线程安全登录方式 如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。 login的代码样例: private Boolean login(Configuration conf){ boolean flag = false; UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB)); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } relogin的代码样例: public Boolean relogin(){ boolean flag = false; try { UserGroupInformation.getLoginUser().reloginFromKeytab(); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; }
  • Impala SQL编写之不支持隐式类型转换 查询语句使用字段的值做过滤时,不支持使用Hive类似的隐式类型转换来编写Impala SQL: Impala示例: select * from default.tbl_src where id = 10001;select * from default.tbl_src where name = 'TestName'; Hive示例(支持隐式类型转换): select * from default.tbl_src where id = '10001';select * from default.tbl_src where name = TestName; 表tbl_src的id字段为Int类型,name字段为String类型。
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见数据加载。 雇员信息数据如表1所示。 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 China:Shenzhen 2014 3 Tom D 12000.02 personal income tax&0.09 America:NewYork 2014 4 Jack D 24000.03 personal income tax&0.09 America:Manhattan 2014 6 Linda D 36000.04 personal income tax&0.09 America:NewYork 2014 8 Zhang R 9000.05 personal income tax&0.05 China:Shanghai 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示。 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 数据分析。 数据分析代码实现,请见数据查询。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。实现请见样例程序指导。
  • 常用概念 Topic Kafka维护的同一类的消息称为一个Topic。 Partition 每一个Topic可以被分为多个Partition,每个Partition对应一个可持续追加的、有序不可变的log文件。 Producer 将消息发往Kafka topic中的角色称为Producer。 Consumer 从Kafka topic中获取消息的角色称为Consumer。 Broker Kafka集群中的每一个节点服务器称为Broker。 父主题: 概述
  • 在Windows中调测程序 运行样例。 导入和修改样例后,即可在开发环境中,右击“ExampleMain.java”,选择“ExampleMain.main()”运行对应的应用程序工程。 使用Windows访问MRS集群来操作Impala,有如下两种方式。 方法一:申请一台Windows的ECS访问MRS集群操作Impala,在安装开发环境后可直接运行样例代码。 在“现有集群”列表中,单击已创建的集群名称。 记录集群的“可用分区”、“虚拟私有云”,以及Master节点的“默认安全组”。 在弹性云服务管理控制台,创建一个新的弹性云服务器。 弹性云服务器的“可用分区”、“虚拟私有云”、“安全组”,需要和待访问集群的配置相同。 选择一个Windows系统的公共镜像。 其他配置参数详细信息,请参见自定义购买弹性云服务器。 方法二:使用本机访问MRS集群操作Impala,在安装开发环境后并完成以下步骤后再运行样例代码。 为任意一个Core节点绑定弹性公网IP,完成后将该IP地址配置在开发样例的client.properties下的impala-server配置项中,用于访问Impala服务、提交SQL语句。 在弹性云服务器页面申请并绑定弹性云服务器IP,具体请参考为弹性云服务器申请和绑定弹性公网IP。 为MRS集群开放安全组规则。在集群Master节点和Core节点的安全组添加安全组规则使弹性云服务器可以访问集群,具体请参考配置安全组规则。 样例中的client.properties配置如下: impala-server = XX.XX.XX.XX:21050 ##指定要连接的impalad实例所在Core节点绑定的服务地址,方式二需要填写步骤1中绑定的弹性公网IP 查看结果。 查看样例代码中的Impala SQL所查询出的结果,运行成功结果会有如下信息。 ExampleMain运行及结果查看。 Create table success!_c00Delete table success! 运行样例出错,出现如下提示: Error running 'ExampleMain': Command line is too long. Shorten command line for ServiceStarter or also for Application default configuration. 解决办法: 在Intellij中的配置Edit Configurations 中设置shorten command line 即可。 父主题: 调测程序
  • 注意事项 如果yarn-session.sh使用-z配置特定的zookeeper的namespace,则在使用flink run时必须使用-yid指出applicationID,使用-yz指出zookeeper的namespace,前后namespace保持一致。 举例: bin/yarn-session.sh -z YARN101 bin/flink run -yid application_****_**** -yz YARN101 examples/streaming/WindowJoin.jar
  • 数据规划 发布者Job使用自定义算子每秒钟产生10000条数据。 数据包含两个属性:分别是Int和String类型。 配置文件 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28943 nettyconnector.ssl.enabled:设置NettySink与NettySource之间通信是否SSL加密(默认为false),例如: nettyconnector.ssl.enabled: true nettyconnector.sinkserver.subnet:设置网络所属域,例如: nettyconnector.sinkserver.subnet: 10.162.0.0/16 安全认证配置: Zookeeper的SASL认证,依赖“flink-conf.yaml”中有关HA的相关配置,具体配置请参见配置管理Flink。 SSL的keystore、truststore、keystore password、truststore password以及password等也使用“flink-conf.yaml”的相关配置,具体配置请参见加密传输。 接口说明 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口: public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */void start(Configuration configuration) throws Exception;/** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */void createTopicNode(String topic) throw Exception;/***将信息注册到某个topic节点(目录)下* @param topic 需要注册到的目录* @param registerRecord 需要注册的信息*/void register(String topic, RegisterRecord registerRecord) throws Exception;/** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception;/** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */void unregister(String topic, int recordId) throws Exception;/** * 查寻信息* @param 查询信息所在的topic*@recordId 查询信息的ID*/RegisterRecord query(String topic, int recordId) throws Exception;/** * 查询某个Topic是否存在 * @param topic */Boolean isExist(String topic) throws Exception;/** *关闭注册服务器句柄 */void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。 NettySink算子 Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler,int numberOfSubscribedJobs) name:为本NettySink的名称。 topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。 registerServerHandler:为注册服务器的句柄。 numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。 NettySource算子 Class NettySource(String name,String topic,RegisterServerHandler registerServerHandler) name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。 topic:订阅的NettySink的topic。 registerServerHandler:为注册服务器的句柄。 NettySource的并发度必须与NettySink的并发度相同,否则无法正常创建连接。
  • 场景说明 假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能: DataStream应用程序可以在Windows环境和Linux环境中运行。 实时统计总计网购时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
  • 数据规划 DataStream样例工程的数据存储在文本中。 将log1.txt和log2.txt放置在指定路径下,例如"/opt/log1.txt"和"/opt/log2.txt"。 数据文件若存放在本地文件系统,需在所有部署Yarn NodeManager的节点指定目录放置,并设置运行用户访问权限。 若将数据文件放置于HDFS,需指定程序中读取文件路径HDFS路径,例如"hdfs://hacluster/path/to/file"。
共100000条