华为云用户手册

  • 代码样例 如下是读文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类。 /** * 读文件 * * @throws IOException */ private void read() throws IOException { String strPath = DEST_PATH + File.separator + FILE_NAME; Path path = new Path(strPath); FSDataInputStream in = null; BufferedReader reader = null; StringBuffer strBuffer = new StringBuffer(); try { in = fSystem.open(path); reader = new BufferedReader(new InputStreamReader(in)); String sTempOneLine; // 写文件 while ((sTempOneLine = reader.readLine()) != null) { strBuffer.append(sTempOneLine); } System.out.println("result is : " + strBuffer.toString()); System.out.println("success to read."); } finally { //务必要关闭资源. close(reader); close(in); } }
  • 配置文件介绍 登录HDFS时会使用到如表1所示的配置文件。这些文件均已导入到“hdfs-example”工程的“conf”目录。 表1 配置文件 文件名称 作用 获取地址 core-site.xml 配置HDFS详细参数。 MRS_Services_ClientConfig\HDFS\config\core-site.xml hdfs-site.xml 配置HDFS详细参数。 MRS_Services_ClientConfig\HDFS\config\hdfs-site.xml user.keytab 对于Kerberos安全认证提供HDFS用户信息。 如果是安全模式集群,您可以联系管理员获取相应账号对应权限的keytab文件和krb5文件。 krb5.conf Kerberos server配置信息。 不同集群的“user.keytab”、“krb5.conf”不能共用。 “conf”目录下的“log4j.properties”文件客户根据自己的需要进行配置。
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类。 在Linux客户端运行应用的初始化代码,代码样例如下所示。 /** * 初始化,获取一个FileSystem实例 * * @throws IOException */ private void init() throws IOException { confLoad(); authentication(); instanceBuild(); } /** * * 如果程序运行在Linux上,则需要core-site.xml、hdfs-site.xml的路径, * 修改为在Linux下客户端文件的绝对路径。 * */ private void confLoad() throws IOException { conf = new Configuration(); // conf file conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); } /** * kerberos security authentication * 如果程序运行在Linux上,则需要krb5.conf和keytab文件的路径, * 修改为在Linux下客户端文件的绝对路径。并且需要将样例代码中的keytab文件和principal文件 * 分别修改为当前用户的keytab文件名和用户名。 * */ private void authentication() throws IOException { // 安全模式 if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } } /** * build HDFS instance */ private void instanceBuild() throws IOException { // get filesystem fSystem = FileSystem.get(conf); }
  • Spark SQL常用接口 Spark SQL中重要的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集 DataFrameReader:从外部存储系统加载DataFrame的接口。 DataFrameStatFunctions:实现DataFrame的统计功能。 UserDefinedFunction:用户自定义的函数。 常见的Actions方法有: 表5 Spark SQL方法介绍 方法 说明 Row[] collect() 返回一个数组,包含DataFrame的所有列。 long count() 返回DataFrame的行数。 DataFrame describe(java.lang.String... cols) 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 Row first() 返回第一行。 Row[] head(int n) 返回前n行。 void show() 用表格形式显示DataFrame的前20行。 Row[] take(int n) 返回DataFrame中的前n行。 表6 基本的DataFrame Functions介绍 方法 说明 void explain(boolean extended) 打印出SQL语句的逻辑计划和物理计划。 void printSchema() 打印schema信息到控制台。 registerTempTable 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 DataFrame toDF(java.lang.String... colNames) 返回一个列重命名的DataFrame。 DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols) 根据不同的列,按照升序或者降序排序。 GroupedData rollup(Column... cols) 对当前的DataFrame特定列进行多维度的回滚操作。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testScanData方法中 public void testScanData() { LOG.info("Entering testScanData."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // Set the Caching size. scan.setCaching(1000);//注[1] // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data successfully."); } catch (IOException e) { LOG.error("Scan data failed ", e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testScanData."); }
  • 代码样例 下面代码片段在com.huawei.storm.example.wordcount.WordCountTopology类中,作用在于构建应用程序并提交。 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在Eclipse中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.CMD); } private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception { switch (type) { case CMD: { cmdSubmit(builder, null); break; } case REMOTE: { remoteSubmit(builder); break; } case LOCAL: { localSubmit(builder); break; } } } /** * 命令行方式远程提交 * 步骤如下: * 1、打包成Jar包,然后在客户端命令行上面进行提交 * 2、远程提交的时候,要先将该应用程序和其他外部依赖(非example工程提供,用户自己程序依赖)的jar包打包成一个大的jar包 * 3、再通过storm客户端中storm -jar的命令进行提交 * * 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录 * * 运行命令如下: *./storm jar ../example/example.jar com.huawei.streaming.storm.example.WordCountTopology */ private static void cmdSubmit(TopologyBuilder builder, Config conf) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException { if (conf == null) { conf = new Config(); } conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology()); } private static void localSubmit(TopologyBuilder builder) throws InterruptedException { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "替换为用户jar包地址"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } private static TopologyBuilder buildTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word")); return builder; }
  • 下载MRS客户端 登录MRS Manager,请参考登录MRS Manager。 选择“服务管理”。 单击“下载客户端”。 在“客户端类型”选择“完整客户端”。 在“下载路径”选择“远端主机”。 将“主机IP”设置为新申请的弹性云服务器的IP地址,设置“主机端口”为“22”,并将“存放路径”设置为“/tmp”。 如果使用SSH登录ECS的默认端口“22”被修改,请将“主机端口”设置为新端口。 “保存路径”最多可以包含256个字符。 “登录用户”设置为“root”。 如果使用其他用户,请确保该用户对保存目录拥有读取、写入和执行权限。 在“登录方式”选择“密码”或“SSH私钥”。 密码:输入创建集群时设置的root用户密码。 SSH私钥:选择并上传创建集群时使用的密钥文件。 图1 下载客户端 单击“确定”开始生成客户端文件。 若界面显示以下提示信息表示客户端包已经成功保存。单击“关闭”。客户端文件请到下载客户端时设置的远端主机的“存放路径”中获取。 下载客户端文件到远端主机成功。 若界面显示以下提示信息,请检查用户名密码及远端主机的安全组配置,确保用户名密码正确,及远端主机的安全组已增加SSH(22)端口的入方向规则。然后从2执行重新开始下载客户端。 连接到服务器失败,请检查网络连接或参数设置。 生成客户端会占用大量的磁盘IO,不建议在集群处于安装中、启动中、打补丁中等非稳态场景下载客户端。 父主题: HDFS应用开发常见问题
  • 准备本地应用开发环境 在进行应用开发时,要准备的开发环境如表1所示。 表1 开发环境 准备项 说明 Eclipse 开发环境的基本配置。版本要求:4.2或以上。 JDK JDK使用1.7或者1.8版本。 说明: 基于安全考虑,MRS集群服务端只支持TLS 1.1和TLS 1.2加密协议,IBM JDK默认TLS只支持1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS1.0/1.1/1.2。 详情请参见:https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 父主题: 准备HDFS应用开发环境
  • Alluxio应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Alluxio应用程序开发流程 表1 Alluxio应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Alluxio的基本概念。 Alluxio常用概念 准备开发和运行环境 Alluxio的客户端程序当前推荐使用java语言进行开发,并使用Maven工具构建工程。样例程序运行环境即MRS服务所VPC集群的节点。 Alluxio开发环境简介 根据场景开发工程 提供了Java语言的样例工程和数据查询的样例工程。 Alluxio样例程序开发思路 运行程序及查看结果 指导用户将开发好的程序编译提交运行并查看结果。 调测Alluxio应用 父主题: Alluxio应用开发概述
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建Hive表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见加载Hive数据。 雇员信息数据如表1所示。 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 China:Shenzhen 2014 3 Tom D 12000.02 personal income tax&0.09 America:NewYork 2014 4 Jack D 24000.03 personal income tax&0.09 America:Manhattan 2014 6 Linda D 36000.04 personal income tax&0.09 America:NewYork 2014 8 Zhang R 9000.05 personal income tax&0.05 China:Shanghai 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示。 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 数据分析。 数据分析代码实现,请见查询Hive数据。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。实现请见分析Hive数据。
  • 安全认证 Flink整个系统有两种认证方式: 使用kerberos认证:Flink yarn client与Yarn Resource Manager、JobManager与Zookeeper、JobManager与HDFS、TaskManager与HDFS、Kafka与TaskManager、TaskManager和Zookeeper。 使用YARN内部的认证机制:Yarn Resource Manager与Application Master(简称AM)。 Flink的JobManager与YARN的AM是在同一个进程下。 表1 安全认证方式 安全认证方式 说明 配置方法 Kerberos认证 当前只支持keytab认证方式。 从KDC服务器上下载用户keytab,并将keytab放到Flink客户端所在主机的某个文件夹下(例如/home/flinkuser/keytab)。 在“${FLINK_HOME}/conf/flink-conf.yaml”上配置: keytab路径。 security.kerberos.login.keytab: /home/flinkuser/keytab/user.keytab 说明: “/home/flinkuser/keytab/”表示的是用户保存keytab的目录。 principal名(即开发用户名)。 security.kerberos.login.principal:flinkuser 对于HA模式,如果配置了ZooKeeper,还需要设置ZK kerberos认证相关的配置。配置如下: zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka client和Kafka broker之间也需要做kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient YARN内部认证方式 该方式是YARN内部的认证方式,不需要用户配置。 - 当前一个Flink集群只支持一个用户,一个用户可以创建多个Flink集群。
  • 解决步骤 检查工程conf目录下“client.properties”中配置的“bootstrap.servers”配置值中访问的IP和端口是否正确。 如果IP与Kafka集群部署的业务IP不一致,那么需要修改为当前集群正确的IP地址。 如果配置中的端口为21007(开启kerberos认证模式端口),那么修改该端口为9092(没有开启kerberos认证模式端口)。 检查网络是否正常,确保当前机器能够正常访问Kafka集群。
  • Presto应用开发环境简介 在进行应用开发时,要准备的本地开发环境如表1所示。同时需要准备运行调测的Linux环境,用于验证应用程序运行是否正常。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,推荐Windows7以上版本。 运行环境:Linux系统。 安装JDK 开发和运行环境的基本配置。版本要求如下: MRS集群的服务端和客户端仅支持自带的Oracle JDK(版本为1.8),不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的,支持Oracle JDK和IBM JDK。 Oracle JDK:支持1.7和1.8版本。 IBM JDK:推荐1.7.8.10、1.7.9.40和1.8.3.0版本。 说明: 在Presto的开发环境中,基于安全考虑,MRS服务端只支持TLS 1.1和TLS 1.2加密协议。由于IBM JDK默认TLS只支持1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS1.0/1.1/1.2。详情请参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置Eclipse 用于开发Presto应用程序的工具。版本要求如下: JDK使用1.7版本,Eclipse使用3.7.1及以上版本。 JDK使用1.8版本,Eclipse使用4.3.2及以上版本。 说明: 若使用IBM JDK,请确保Eclipse中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保Eclipse中的JDK配置为Oracle JDK。 不同的Eclipse不要使用相同的workspace和相同路径下的示例工程。 网络 确保客户端与Presto服务主机在网络上互通。 父主题: 准备Presto应用开发环境
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testDelete方法中 public void testDelete() { LOG.info("Entering testDelete."); byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Instantiate an HTable object. table = conn.getTable(tableName); // Instantiate a Delete object. Delete delete = new Delete(rowKey); // Submit a delete request. table.delete(delete); LOG.info("Delete table successfully."); } catch (IOException e) { LOG.error("Delete table failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testDelete."); }
  • 代码样例 以下代码片段是创建Connection的示例: private TableName tableName = null; private Configuration conf = null; private Connection conn = null; public static final String TABLE_NAME = "hbase_sample_table"; public HBaseExample(Configuration conf) throws IOException { this.conf = conf; this.tableName = TableName.valueOf(TABLE_NAME); this.conn = ConnectionFactory.createConnection(conf); }
  • 功能介绍 HBase通过ConnectionFactory.createConnection(configuration)方法创建Connection对象。传递的参数为上一步创建的Configuration。 Connection封装了底层与各实际服务器的连接以及与ZooKeeper的连接。Connection通过ConnectionFactory类实例化。创建Connection是重量级操作,Connection是线程安全的,因此,多个客户端线程可以共享一个Connection。 典型的用法,一个客户端程序共享一个单独的Connection,每一个线程获取自己的Admin或Table实例,然后调用Admin对象或Table对象提供的操作接口。不建议缓存或者池化Table、Admin。Connection的生命周期由调用者维护,调用者通过调用close(),释放资源。
  • 回答 由于在Flink配置文件中“high-availability.zookeeper.client.acl”默认为“creator”,即谁创建谁有权限,由于原有用户已经使用ZooKeeper上的/flink目录,导致新创建的用户访问不了ZooKeeper上的/flink目录。 新用户可以通过以下操作来解决问题。 查看客户端的配置文件“conf/flink-conf.yaml”。 修改配置项“high-availability.zookeeper.path.root”对应的ZooKeeper目录,例如:/flink2。 重新提交任务。
  • 操作步骤 执行mvn package生成jar包,在工程目录target目录下获取,比如:hdfs-examples-1.0.jar。 将导出的Jar包拷贝上传至Linux客户端运行环境的任意目录下,例如“/opt/client”,然后在该目录下创建“conf”目录,将“user.keytab” 和 "krb5.conf"拷贝至“conf”目录。可参考6 。 配置环境变量。 source /opt/client/bigdata_env 执行如下命令,运行Jar包。 hadoop jar hdfs-examples-1.0.jar com.huawei.bigdata.hdfs.examples.HdfsMain 运行命令时,需保持客户端“Yarn/config/hdfs-site.xml”中的kerberos相关信息和“HDFS/hadoop/etc/hadoop/hdfs-site.xml”中的kerberos相关信息一致。“hdfs-site.xml”中kerberos的配置mapred改为hdfs,需要修改的地方如图1所示。 图1 hdfs-site.xml
  • Spark SQL常用概念 DataFrame DataFrame是一个由多个列组成的结构化的分布式数据集合,等同于关系数据库中的一张表,或者是R/Python中的Data Frame。DataFrame是Spark SQL中的最基本的概念,可以通过多种方式创建,例如结构化的数据集、Hive表、外部数据库或者RDD。 Spark SQL的程序入口是SQLContext类(或其子类),创建SQLContext时需要一个SparkContext对象作为其构造参数。SQLContext其中一个子类是HiveContext,相较于其父类,HiveContext添加了HiveQL的parser、UDF以及读取存量Hive数据的功能等。但注意,HiveContext并不依赖运行时的Hive,只是依赖Hive的类库。 由SQLContext及其子类可以方便的创建SparkSQL中的基本数据集DataFrame,DataFrame向上提供多种多样的编程接口,向下兼容多种不同的数据源,例如Parquet、JSON、Hive数据、Database、HBase等,这些数据源都可以使用统一的语法来读取。
  • Spark Streaming常用概念 Dstream DStream(又称Discretized Stream)是Spark Streaming提供的抽象概念。 DStream表示一个连续的数据流,是从数据源获取或者通过输入流转换生成的数据流。从本质上说,一个DStream表示一系列连续的RDD。RDD一个只读的、可分区的分布式数据集。 DStream中的每个RDD包含了一个区间的数据。如图4所示。 图4 DStream与RDD关系 应用到DStream上的所有算子会被转译成下层RDD的算子操作,如图5所示。这些下层的RDD转换会通过Spark引擎进行计算。DStream算子隐藏大部分的操作细节,并且提供了方便的High-level API给开发者使用。 图5 DStream算子转译
  • 功能介绍 该样例主要分为三个部分。 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。 main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。
  • 数据规划 创建HDFS数据文件。 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。 在HDFS上创建一个文件夹,“/tmp/examples/multi-components/mapreduce/input/”,并上传data.txt到此目录,命令如下。 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p /tmp/examples/multi-components/mapreduce/input/ 在Linux系统HDFS客户端使用命令hdfs dfs -put data.txt /tmp/examples/multi-components/mapreduce/input/ 创建HBase表并插入数据。 在Linux系统HBase客户端使用命令hbase shell。 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令create 'table1', 'cf'。 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'。 执行命令quit退出。 创建Hive表并载入数据。 在Linux系统Hive客户端使用命令beeline。 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/stayTime,使用命令CREATE TABLE person(name STRING, gender STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;。 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH '/tmp/examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person;。 执行命令!q退出。 由于Hive加载数据将HDFS对应数据目录清空,所以需再次执行1。
  • 场景说明 该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 该样例逻辑过程如下。 以HDFS文本文件为输入数据 log1.txt:数据输入文件 YuanJing,male,10 GuoYijun,male,5 Map阶段 获取输入数据的一行并提取姓名信息。 查询HBase一条数据。 查询Hive一条数据。 将HBase查询结果与Hive查询结果进行拼接作为Map输出。 Reduce阶段 获取Map输出中的最后一条数据。 将数据输出到HBase。 将数据保存到HDFS。
  • Kafka应用开发流程介绍 Kafka客户端角色包括Producer和Consumer两个角色,其应用开发流程是相同的。 开发流程中各个阶段的说明如图1和表1所示。 图1 Kafka客户端程序开发流程 表1 Kafka客户端程序开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发客户端前,需要了解Kafka的基本概念,根据实际场景判断,需要开发的角色是Producer还是Consumer。 Kafka应用开发常用概念 准备开发环境 Kafka的客户端程序当前推荐使用java语言进行开发,并使用Maven工具构建工程。 准备Maven和JDK 准备运行环境 Kafka的样例程序运行环境即MRS服务所VPC集群的节点。 - 准备工程 Kafka提供了不同场景下的样例程序,您可以下载样例工程进行程序学习。或者您可以根据指导,新建一个Kafka工程。 导入并配置Kafka样例工程 根据场景开发工程 提供了Producer和Consumer相关API的使用样例,包含了新旧API和多线程的使用场景,帮助用户快速熟悉Kafka接口。 Kafka样例程序开发思路 编译并运行程序 指导用户将开发好的程序编译并打包,上传到VPC的Linux节点运行。 调测Kafka应用 查看程序运行结果 程序运行结果可以输出到Linux命令行页面。也可通过Linux客户端进行Topic数据消费的方式查看数据是否写入成功。 调测Kafka应用 父主题: Kafka应用开发概述
  • 使用场景 HBase文件存储模块(HBase FileStream,简称HFS)是HBase的独立模块,它作为对HBase与HDFS接口的封装,应用在MRS的上层应用,为上层应用提供文件的存储、读取、删除等功能。 在Hadoop生态系统中,无论是HDFS,还是HBase,在面对海量文件存储的时候,在某些场景下,都会存在一些很难解决的问题: 如果把海量小文件直接保存在HDFS中,会给NameNode带来极大的压力。 由于HBase接口以及内部机制的原因,一些较大的文件也不适合直接保存到HBase中。 HFS的出现,就是为了解决需要在Hadoop中存储海量小文件,同时也要存储一些大文件的混合场景。简单来说,就是在HBase表中,需要存放大量的小文件(10MB以下),同时又需要存放一些比较大的文件(10MB以上)。 HFS为以上场景提供了统一的操作接口,这些操作接口与HBase的函数接口类似。必须在HBase的配置参数“hbase.coprocessor.master.classes”中增加一个值:“org.apache.hadoop.hbase.filestream.coprocessor.FileStreamMasterObserver”。 如果只有小文件,确定不会有大文件的场景下,建议使用HBase的原始接口进行操作。 HFS接口需要同时对HBase和HDFS进行操作,所以客户端用户需要同时拥有这两个组件的操作权限。 直接存放在HDFS中的大文件,HFS在存储时会加入一些元数据信息,所以存储的文件不是直接等于原文件的。不能直接从HDFS中移动出来使用,而需要用HFS的接口进行读取。 使用HFS接口存储在HDFS中的数据,暂不支持备份与容灾。
  • 操作场景 SQL语句转化为具体执行计划是由SQL查询编译器决定的,同一个SQL语句可以转化成多种物理执行计划,如何指导编译器选择效率最高的执行计划,这就是优化器的主要作用。传统数据库(例如Oracle)的优化器有两种:基于规则的优化器(Rule-Based Optimization,RBO)和基于代价的优化器(Cost-Based Optimization,CBO)。 RBO RBO使用的规则是根据经验形成的,只要按照这个规则去写SQL语句,无论数据表中的内容怎样、数据分布如何,都不会影响到执行计划。 CBO CBO是根据实际数据分布和组织情况,评估每个计划的执行代价,从而选择代价最小的执行计划。 目前Spark的优化器都是基于RBO的,已经有数十条优化规则,例如谓词下推、常量折叠、投影裁剪等,这些规则是有效的,但是它对数据是不敏感的。导致的问题是数据表中数据分布发生变化时,RBO是不感知的,基于RBO生成的执行计划不能确保是最优的。而CBO的重要作用就是能够根据实际数据分布估算出SQL语句,生成一组可能被使用的执行计划中代价最小的执行计划,从而提升性能。 目前CBO主要的优化点是Join算法选择。举个简单例子,当两个表做Join操作,如果其中一张原本很大的表经过Filter操作之后结果集小于BroadCast的阈值,在没有CBO情况下是无法感知大表过滤后变小的情况,采用的是SortMergeJoin算法,涉及到大量Shuffle操作,很耗费性能;在有CBO的情况下是可以感知到结果集的变化,采用的是BroadcastHashJoin算法,会将过滤后的小表BroadCast到每个节点,转变为非Shuffle操作,从而大大提高性能。
  • 操作步骤 Spark CBO的设计思路是,基于表和列的统计信息,对各个操作算子(Operator)产生的中间结果集大小进行估算,最后根据估算的结果来选择最优的执行计划。 设置配置项。 在“spark-defaults.conf”配置文件中增加配置项“spark.sql.cbo”,将其设置为true,默认为false。 在客户端执行SQL语句set spark.sql.cbo=true进行配置。 执行统计信息生成命令,得到统计信息。 此步骤只需在运行所有SQL前执行一次。如果数据集发生了变化(插入、更新或删除),为保证CBO的优化效果,需要对有变化的表或者列再次执行统计信息生成命令重新生成统计信息,以得到最新的数据分布情况。 表:执行COMPUTE STATS FOR TABLE src命令计算表的统计信息,统计信息包括记录条数、文件数和物理存储总大小。 列: 执行COMPUTE STATS FOR TABLE src ON COLUMNS命令计算所有列的统计信息。 执行COMPUTE STATS FOR TABLE src ON COLUMNS name,age命令计算表中name和age两个字段的统计信息。 当前列的统计信息支持四种类型:数值类型、日期类型、时间类型和字符串类型。对于数值类型、日期类型和时间类型,统计信息包括:Max、Min、不同值个数(Number of Distinct Value,NDV)、空值个数(Number of Null)和Histogram(支持等宽、等高直方图);对于字符串类型,统计信息包括:Max、Min、Max Length、Average Length、不同值个数(Number of Distinct Value,NDV)、空值个数(Number of Null)和Histogram(支持等宽直方图)。 CBO调优 自动优化:用户根据自己的业务场景,输入SQL语句查询,程序会自动去判断输入的SQL语句是否符合优化的场景,从而自动选择Join优化算法。 手动优化:用户可以通过DESC FORMATTED src命令查看统计信息,根据统计信息的分布,人工优化SQL语句。
  • 场景说明 假定用户开发一个应用程序,用于记录和查询城市的气象信息,记录数据如下表表1,表2和表3所示。 表1 原始数据 城市 区域 时间 温度 湿度 Shenzhen Longgang 2017/7/1 00:00:00 28 54 Shenzhen Longgang 2017/7/1 01:00:00 27 53 Shenzhen Longgang 2017/7/1 02:00:00 27 52 Shenzhen Longgang 2017/7/1 03:00:00 27 51 Shenzhen Longgang 2017/7/1 04:00:00 27 50 Shenzhen Longgang 2017/7/1 05:00:00 27 49 Shenzhen Longgang 2017/7/1 06:00:00 27 48 Shenzhen Longgang 2017/7/1 07:00:00 27 46 Shenzhen Longgang 2017/7/1 08:00:00 29 46 Shenzhen Longgang 2017/7/1 09:00:00 30 48 Shenzhen Longgang 2017/7/1 10:00:00 32 48 Shenzhen Longgang 2017/7/1 11:00:00 32 49 Shenzhen Longgang 2017/7/1 12:00:00 33 49 Shenzhen Longgang 2017/7/1 13:00:00 33 50 Shenzhen Longgang 2017/7/1 14:00:00 32 50 Shenzhen Longgang 2017/7/1 15:00:00 32 50 Shenzhen Longgang 2017/7/1 16:00:00 31 51 Shenzhen Longgang 2017/7/1 17:00:00 30 51 Shenzhen Longgang 2017/7/1 18:00:00 30 51 Shenzhen Longgang 2017/7/1 19:00:00 29 51 Shenzhen Longgang 2017/7/1 20:00:00 29 52 Shenzhen Longgang 2017/7/1 21:00:00 29 53 Shenzhen Longgang 2017/7/1 22:00:00 28 54 Shenzhen Longgang 2017/7/1 23:00:00 28 54 该场景里记录了深圳市龙岗区在2017年7月1日零时的温度和湿度数据,这里通过OpenTSDB的方式建模实质上是两组数据点。 表2 指标数据点1 指标项(metric) 城市(city) 区域(region) Unix timestamp 指标数值(value) city.temp Shenzhen Longgang 1498838400 28 city.temp Shenzhen Longgang 1498842000 27 city.temp Shenzhen Longgang 1498845600 27 city.temp Shenzhen Longgang 1498849200 27 city.temp Shenzhen Longgang 1498852800 27 city.temp Shenzhen Longgang 1498856400 27 city.temp Shenzhen Longgang 1498860000 27 city.temp Shenzhen Longgang 1498863600 27 city.temp Shenzhen Longgang 1498867200 29 city.temp Shenzhen Longgang 1498870800 30 city.temp Shenzhen Longgang 1498874400 32 city.temp Shenzhen Longgang 1498878000 32 city.temp Shenzhen Longgang 1498881600 33 city.temp Shenzhen Longgang 1498885200 33 city.temp Shenzhen Longgang 1498888800 32 city.temp Shenzhen Longgang 1498892400 32 city.temp Shenzhen Longgang 1498896000 31 city.temp Shenzhen Longgang 1498899600 30 city.temp Shenzhen Longgang 1498903200 30 city.temp Shenzhen Longgang 1498906800 29 city.temp Shenzhen Longgang 1498910400 29 city.temp Shenzhen Longgang 1498914000 29 city.temp Shenzhen Longgang 1498917600 28 city.temp Shenzhen Longgang 1498921200 28 表3 指标数据点2 指标项(metric) 城市(city) 区域(region) Unix timestamp 指标数值(value) city.hum Shenzhen Longgang 1498838400 54 city.hum Shenzhen Longgang 1498842000 53 city.hum Shenzhen Longgang 1498845600 52 city.hum Shenzhen Longgang 1498849200 51 city.hum Shenzhen Longgang 1498852800 50 city.hum Shenzhen Longgang 1498856400 49 city.hum Shenzhen Longgang 1498860000 48 city.hum Shenzhen Longgang 1498863600 46 city.hum Shenzhen Longgang 1498867200 46 city.hum Shenzhen Longgang 1498870800 48 city.hum Shenzhen Longgang 1498874400 48 city.hum Shenzhen Longgang 1498878000 49 city.hum Shenzhen Longgang 1498881600 49 city.hum Shenzhen Longgang 1498885200 50 city.hum Shenzhen Longgang 1498888800 50 city.hum Shenzhen Longgang 1498892400 50 city.hum Shenzhen Longgang 1498896000 51 city.hum Shenzhen Longgang 1498899600 51 city.hum Shenzhen Longgang 1498903200 51 city.hum Shenzhen Longgang 1498906800 51 city.hum Shenzhen Longgang 1498910400 52 city.hum Shenzhen Longgang 1498914000 53 city.hum Shenzhen Longgang 1498917600 54 city.hum Shenzhen Longgang 1498921200 54 其中这两组指标数据点都有2个标签: 标签(tag):城市city、区域region 标签值(tag value):ShenZhen、Longgang 用户可以执行以下数据操作: 获取每天的监控数据,通过OpenTSDB的put接口将两个组数据点写入数据库中。 对已有的数据使用OpenTSDB的query接口进行数据查询和分析。
  • Impala简介 Impala直接对存储在HDFS,HBase 或对象存储服务(OBS)中的Hadoop数据提供快速,交互式SQL查询。除了使用相同的统一存储平台之外,Impala还使用与Apache Hive相同的元数据,SQL语法(Hive SQL),ODBC驱动程序和用户界面(Hue中的Impala查询UI)。这为实时或面向批处理的查询提供了一个熟悉且统一的平台。作为查询大数据的工具补充,Impala不会替代基于MapReduce构建的批处理框架,例如Hive。基于MapReduce构建的Hive和其他框架最适合长时间运行的批处理作业。 Impala主要特点如下: 支持Hive查询语言(HiveQL)中大多数的SQL-92功能,包括 SELECT,JOIN和聚合函数。 HDFS,HBase 和对象存储服务(OBS)存储,包括: HDFS文件格式:基于分隔符的text file,Parquet,Avro,SequenceFile和RCFile。 压缩编解码器:Snappy,GZIP,Deflate,BZIP。 常见的数据访问接口包括: JDBC驱动程序。 ODBC驱动程序。 HUE beeswax和Impala查询UI。 impala-shell命令行接口。 支持Kerberos身份认证。 Impala主要应用于实时查询数据的离线分析(如日志分析,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景下。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testModifyTable方法中 public void testModifyTable() { LOG.info("Entering testModifyTable."); // Specify the column family name. byte[] familyName = Bytes.toBytes("education"); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); // Obtain the table descriptor. TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName).build(); // Check whether the column family is specified before modification. if (!htd.hasColumnFamily(familyName)) { // Create the column descriptor. ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(familyName); TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName)).setColumnFamily(cfd).build(); // Disable the table to get the table offline before modifying // the table. admin.disableTable(tableName);//注[1] // Submit a modifyTable request. admin.modifyTable(td); // Enable the table to get the table online after modifying the // table. admin.enableTable(tableName); } LOG.info("Modify table successfully."); } catch (IOException e) { LOG.error("Modify table failed ", e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed ", e); } } } LOG.info("Exiting testModifyTable."); }
共100000条