华为云用户手册

  • 已安装客户端时编译并运行程序 进入样例工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package “{maven_setting_path}”为本地Maven的“settings.xml”文件路径,例如“C:\Users\Developer\settings.xml”。 打包成功之后,在工程根目录的“target”子目录下获取打好的jar包,例如“HDFSTest-XXX.jar”,jar包名称以实际打包结果为准。 将导出的Jar包上传至集群客户端运行环境的任意目录下,例如“/opt/client”,然后在该目录下创建“conf”目录,将需要的配置文件复制至“conf”目录,具体操作请参考准备运行环境。 配置环境变量: cd /opt/client source bigdata_env 执行如下命令,运行Jar包。 hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.HdfsExample hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample SparkOnHbaseJavaExample.jar ExampleAvrotable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseDistributedScanExample.py ExampleAvrotable yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar ExampleAvrotable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseDistributedScanExample.py ExampleAvrotable
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseDistributedScanExample文件: # -*- coding:utf-8 -*- # -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseDistributedScan")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseDistributedScan().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample SparkOnHbaseJavaExample-1.0.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkGetExample.py bulktable yarn-cluster模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample SparkOnHbaseJavaExample-1.0.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkGetExample.py bulktable
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkGetExample文件: # -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseBulkGetExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseBulkGetExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 数据规划 Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。 创建Topic。 创建topic的命令格式: bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic} 表1 参数说明 参数名 说明 {zkQuorum} ZooKeeper集群信息,格式为IP:port。 {partitionNum} topic的分区数。 {replicationNum} topic中每个partition数据的副本数。 {Topic} topic名称。 示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181,Topic名称为topic1的数据为例。 bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --partitions 5 --replication-factor 1 --topic topic1
  • 提供Join能力 表12 提供Join能力的相关接口 API 说明 def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2] 通过给定的key在一个窗口范围内join两条数据流。 join操作的key值通过where和eaualTo方法进行指定,代表两条流过滤出包含等值条件的数据。 def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2] 通过给定的key在一个窗口范围内co-group两条数据流。 coGroup操作的key值通过where和eaualTo方法进行指定,代表两条流通过该等值条件进行分区处理。
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用特别的类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,数据经过对设置的key值进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作,join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 提供设置eventtime属性的能力 表6 提供设置eventtime属性的能力的相关接口 API 说明 def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 为了能让event time窗口可以正常触发窗口计算操作,需要从记录中提取时间戳。 def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]): DataStream[T]
  • 提供分流能力 表8 提供分流能力的相关接口 API 说明 def split(selector: OutputSelector[T]): SplitStream[T] 传入OutputSelector,重写select方法确定分流的依据(即打标记),构建SplitStream流。即对每个元素做一个字符串的标记,作为选择的依据,打好标记之后就可以通过标记选出并新建某个标记的流。 def select(outputNames: String*): DataStream[T] 从一个SplitStream中选出一个或多个流。 outputNames指的是使用split方法对每个元素做的字符串标记的序列。
  • Kerberos认证代码示例 package com.huawei.bigdata.hdfs.examples; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; public class KerberosTest { private static String PATH_TO_HDFS_SITE_XML = KerberosTest.class.getClassLoader().getResource("hdfs-site.xml") .getPath(); private static String PATH_TO_CORE_SITE_XML = KerberosTest.class.getClassLoader().getResource("core-site.xml") .getPath(); private static String PATH_TO_KEYTAB = KerberosTest.class.getClassLoader().getResource("user.keytab").getPath(); private static String PATH_TO_KRB5_CONF = KerberosTest.class.getClassLoader().getResource("krb5.conf").getPath(); private static String PRNCIPAL_NAME = "develop"; private FileSystem fs; private Configuration conf; /** * initialize Configuration */ private void initConf() { conf = new Configuration(); // add configuration files conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); } /** * login Kerberos to get TGT, if the cluster is in security mode * @throws IOException if login is failed */ private void login() throws IOException { // not security mode, just return if (! "kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { return; } //security mode System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(PRNCIPAL_NAME, PATH_TO_KEYTAB); } /** * initialize FileSystem, and get ST from Kerberos * @throws IOException */ private void initFileSystem() throws IOException { fs = FileSystem.get(conf); } /** * An example to access the HDFS * @throws IOException */ private void doSth() throws IOException { Path path = new Path("/tmp"); FileStatus fStatus = fs.getFileStatus(path); System.out.println("Status of " + path + " is " + fStatus); //other thing } public static void main(String[] args) throws Exception { KerberosTest test = new KerberosTest(); test.initConf(); test.login(); test.initFileSystem(); test.doSth(); } } Kerberos认证时需要配置Kerberos认证所需要的文件参数,主要包含keytab文件路径、Kerberos认证的用户名称、Kerberos认证所需要的客户端配置“krb5.conf”文件。 login()方法为调用hadoop的接口执行Kerberos认证,生成TGT票据。 doSth()方法调用hadoop的接口访问文件系统,此时底层RPC会自动携带TGT去Kerberos认证,生成ST票据。 以上代码可在安全模式下的HDFS二次开发样例工程中创建KerberosTest.java,运行并查看调测结果,具体操作过程请参考HDFS开发指南(安全模式)。
  • 安全认证基本概念 本文以HDFS组件应用的安全认证为例介绍安全认证相关的常见基本概念,可以帮助用户减少学习Kerberos框架所花费的时间,有助于更好的理解Kerberos业务。 TGT 票据授权票据(Ticket-Granting Ticket),由Kerberos服务生成,提供给应用程序与Kerberos服务器建立认证安全会话,该票据的默认有效期为24小时,24小时后该票据自动过期。 TGT申请方式(以HDFS为例): 通过HDFS提供的接口获取。 /** * login Kerberos to get TGT, if the cluster is in security mode * @throws IOException if login is failed */ private void login() throws IOException { // not security mode, just return if (! "kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { return; } //security mode System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(PRNCIPAL_NAME, PATH_TO_KEYTAB); } 通过MRS集群客户端以kinit方式获取。 登录MRS集群客户端所在节点,进入客户端安装目录。 cd {客户端安装目录} 执行以下命令配置环境变量。 source bigdata_env 执行以下命令进行用户认证。 kinit MRS集群业务用户 ST 服务票据(Server Ticket),由Kerberos服务生成,提供给应用程序与应用服务建立安全会话,该票据一次性有效。 ST的生成在MRS中,基于hadoop-rpc通信,由rpc底层自动向Kerberos服务端提交请求,由Kerberos服务端生成。
  • Kerberos认证说明 开启了Kerberos认证的安全模式集群,进行应用开发时需要进行安全认证。使用Kerberos的系统在设计上采用“客户端/服务器”结构与AES等加密技术,并且能够进行相互认证(即客户端和服务器端均可对对方进行身份认证)。可以用于防止窃听、防止replay攻击、保护数据完整性等场合,是一种应用对称密钥体制进行密钥管理的系统。 图1 Kerberos原理架构 表1 Kerberos模块说明 模块 说明 Application Client 应用客户端,通常是需要提交任务(或者作业)的应用程序。 Application Server 应用服务端,通常是应用客户端需要访问的应用程序。 Kerberos 提供安全认证的服务。 KerberosAdmin 提供认证用户管理的进程。 KerberosServer 提供认证票据分发的进程。 应用客户端(Application Client)可以是集群内某个服务,也可以是客户二次开发的一个应用程序,应用程序可以向应用服务提交任务或者作业。 应用程序在提交任务或者作业前,需要向Kerberos服务申请TGT(Ticket-Granting Ticket),用于建立和Kerberos服务器的安全会话。 Kerberos服务在收到TGT请求后,会解析其中的参数来生成对应的TGT,使用客户端指定的用户名的密钥进行加密响应消息。 应用客户端收到TGT响应消息后,解析获取TGT,此时,再由应用客户端(通常是rpc底层)向Kerberos服务获取应用服务端的ST(Server Ticket)。 Kerberos服务在收到ST请求后,校验其中的TGT合法后,生成对应的应用服务的ST,再使用应用服务密钥将响应消息进行加密处理。 应用客户端收到ST响应消息后,将ST打包到发给应用服务的消息里面传输给对应的应用服务端(Application Server)。 应用服务端收到请求后,使用本端应用服务对应的密钥解析其中的ST,并校验成功后,本次请求合法通过。
  • Kafka应用开发流程介绍 Kafka客户端角色包括Producer和Consumer两个角色,其应用开发流程是相同的。 开发流程中各个阶段的说明如图1和表1所示。 图1 Kafka客户端程序开发流程 表1 Kafka客户端开发的流程说明 阶段 说明 参考文档 准备开发环境 Kafka的客户端程序当前推荐使用java语言进行开发,可使用IntelliJ IDEA工具开发。 Kafka的运行环境即Kafka客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括集群组件信息文件以及用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts域名信息。 准备连接Kafka集群配置文件 配置并导入样例工程 Kafka提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置Kafka样例工程 根据业务场景开发工程 提供了Producer和Consumer相关API的使用样例,包含了API和多线程的使用场景,帮助用户快速熟悉Kafka接口。 开发Kafka应用 编译与运行程序 指导用户将开发好的程序编译并提交运行。 调测Kafka应用 父主题: Kafka开发指南(普通模式)
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,通过设置的key值对数据进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作(等值就是判断两个值相同的join,比如a.id = b.id),join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 问题 执行Spark Core应用,尝试收集大量数据到Driver端,当Driver端内存不足时,应用挂起不退出,日志内容如下。 16/04/19 15:56:22 ERROR Utils: Uncaught exception in thread task-result-getter-2 java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:71) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:91) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:94) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:66) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1716) at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:56) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newArray(Native Method) at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:71) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:91) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:94) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:66) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1716) at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:56) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
  • 回答 用户尝试收集大量数据到Driver端,如果Driver端的内存不足以存放这些数据,那么就会抛出OOM(OutOfMemory)的异常,然后Driver端一直在进行GC,尝试回收垃圾来存放返回的数据,导致应用长时间挂起。 解决措施: 如果用户需要在OOM场景下强制将应用退出,那么可以在启动Spark Core应用时,在客户端配置文件“$SPARK_HOME/conf/spark-defaults.conf”中的配置项“spark.driver.extraJavaOptions”中添加如下内容: -XX:OnOutOfMemoryError='kill -9 %p'
  • Hive WebHCat接口介绍 以下示例的IP为WebHCat的业务IP,端口为安装时设置的WebHCat HTTP端口。 需要在安装客户端的机器上进行kinit认证操作后才可执行示例操作。 以下示例均为https协议的示例,若要使用http协议,需要执行以下操作: 将REST接口切换成HTTP协议方式,请参见配置基于HTTPS/HTTP协议的REST接口。 将示例中的“--insecure”去掉,将https替换成http,例如 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/status' 更改为 curl -i -u : --negotiate 'http://10.64.35.144:9111/templeton/v1/status' 执行操作前需确保使用的curl版本在7.34.0以上。 可以使用以下命令查看curl版本: curl -V :version(GET) 描述 查询WebHCat支持的返回类型列表。 URL https://www.myserver.com/templeton/:version 参数 表1 :version接口请求参数说明 参数 描述 :version WebHCat版本号(当前必须是v1)。 返回结果 表2 :version接口返回结果参数说明 参数 描述 responseTypes WebHCat支持的返回类型列表。 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1' status (GET) 描述 获取当前服务器的状态 URL https://www.myserver.com/templeton/v1/status 参数 无 返回结果 表3 status接口返回结果参数说明 参数 描述 status WebHCat连接正常,返回OK。 version 字符串,包含版本号,比如v1。 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/status' version (GET) 描述 获取服务器WebHCat的版本 URL https://www.myserver.com/templeton/v1/version 参数 无 返回结果 表4 version接口返回结果参数说明 参数 描述 supportedVersions 所有支持的版本。 version 当前服务器WebHCat的版本。 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/version' version/hive (GET) 描述 获取服务器Hive的版本 URL https://www.myserver.com/templeton/v1/version/hive 参数 无 返回结果 表5 version/hive接口返回结果说明 参数 描述 module hive version Hive的版本 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/version/hive' version/hadoop (GET) 描述 获取服务器Hadoop的版本 URL https://www.myserver.com/templeton/v1/version/hadoop 参数 无 返回结果 表6 version/hadoop接口返回结果说明 参数 描述 module hadoop version Hadoop的版本 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/version/hadoop' ddl (POST) 描述 执行DDL语句 URL https://www.myserver.com/templeton/v1/ddl 参数 表7 ddl接口请求参数说明 参数 描述 exec 需要执行的HCatalog DDL语句。 group 当DDL是创建表时,创建表使用的用户组。 permissions 当DDL是创建表时,创建表使用的权限,格式为rwxr-xr-x。 返回结果 表8 ddl接口返回结果说明 参数 描述 stdout HCatalog执行时的标准输出值,可能为空。 stderr HCatalog执行时的错误输出,可能为空。 exitcode HCatalog的返回值。 例子 curl -i -u : --insecure --negotiate -d exec="show tables" 'https://10.64.35.144:9111/templeton/v1/ddl' ddl/database (GET) 描述 列出所有的数据库 URL https://www.myserver.com/templeton/v1/ddl/database 参数 表9 ddl/database接口请求参数说明 参数 描述 like 用来匹配数据库名的正则表达式。 返回结果 表10 ddl/database接口返回结果说明 参数 描述 databases 数据库名 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/ddl/database' ddl/database/:db (GET) 描述 获取指定数据库的详细信息 URL https://www.myserver.com/templeton/v1/ddl/database/:db 参数 参数 描述 :db 数据库名 返回结果 参数 描述 location 数据库位置 comment 数据库的备注,如果没有备注则不存在 database 数据库名 owner 数据库的所有者 owertype 数据库所有者的类型 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/ddl/database/default' ddl/database/:db (PUT) 描述 创建数据库 URL https://www.myserver.com/templeton/v1/ddl/database/:db 参数 参数 描述 :db 数据库名 group 创建数据库时使用的用户组 permission 创建数据库时使用的权限 location 数据库的位置 comment 数据库的备注,比如描述 properties 数据库属性 返回结果 参数 描述 database 新创建的数据库的名字 例子 curl -i -u : --insecure --negotiate -X PUT -HContent-type:application/json -d '{"location": "/tmp/a", "comment": "my db", "properties": {"a": "b"}}' 'https://10.64.35.144:9111/templeton/v1/ddl/database/db2' ddl/database/:db (DELETE) 描述 删除数据库 URL https://www.myserver.com/templeton/v1/ddl/database/:db 参数 参数 描述 :db 数据库名 ifExists 如果指定数据库不存在,Hive会返回错误,除非设置了ifExists为true。 option 将参数设置成cascade或者restrict。如果选择cascade,将清除一切,包括数据和定义。如果选择restrict,表格内容为空,模式也将不存在。 返回结果 参数 描述 database 删除的数据库名字 例子 curl -i -u : --insecure --negotiate -X DELETE 'https://10.64.35.144:9111/templeton/v1/ddl/database/db3?ifExists=true' ddl/database/:db/table (GET) 描述 列出数据库下的所有表 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table 参数 参数 描述 :db 数据库名 like 用来匹配表名的正则表达式 返回结果 参数 描述 database 数据库名字 tables 数据库下表名列表 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/ddl/database/default/table' ddl/database/:db/table/:table (GET) 描述 获取表的详细信息 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table 参数 参数 描述 :db 数据库名 :table 表名 format 格式:format=extended,参考额外信息(“table extended like”)。 返回结果 参数 描述 columns 列名和类型 database 数据库名 table 表名 partitioned 是否分区表,只有extended下才会显示。 location 表的位置,只有extended下才会显示。 outputformat 输出形式,只有extended下才会显示。 inputformat 输入形式,只有extended下才会显示。 owner 表的属主,只有extended下才会显示。 partitionColumns 分区的列,只有extended下才会显示。 例子 curl -i -u : --insecure --negotiate 'https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/t1?format=extended' ddl/database/:db/table/:table (PUT) 描述 创建表 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table 参数 参数 描述 :db 数据库名 :table 新建表名 group 创建表时使用的用户组 permissions 创建表时使用的权限 external 指定位置,hive不使用表的默认位置。 ifNotExists 设置为true,当表存在时不会报错。 comment 备注 columns 列描述,包括列名,类型和可选备注。 partitionedBy 分区列描述,用于划分表格。参数columns列出了列名,类型和可选备注。 clusteredBy 分桶列描述,参数包括columnNames、sortedBy、和numberOfBuckets。参数columnNames包括columnName和排列顺序(ASC为升序,DESC为降序)。 format 存储格式,参数包括rowFormat,storedAs,和storedBy。 location HDFS路径 tableProperties 表属性和属性值(name-value对) 返回结果 参数 描述 database 数据库名 table 表名 例子 curl -i -u : --insecure --negotiate -X PUT -HContent-type:application/json -d '{"columns": [{"name": "id", "type": "int"}, {"name": "name","type": "string"}], "comment": "hello","format": {"storedAs": "orc"} }' 'https://10.64.35.144:9111/templeton/v1/ddl/database/db3/table/tbl1' ddl/database/:db/table/:table (POST) 描述 重命名表 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table 参数 参数 描述 :db 数据库名 :table 已有表名 rename 新表表名 返回结果 参数 描述 database 数据库名 table 新表表名 例子 curl -i -u : --insecure --negotiate -d rename=table1 'https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/tbl1' ddl/database/:db/table/:table (DELETE) 描述 删除表 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table 参数 参数 描述 :db 数据库名 :table 表名 ifExists 当设置为true时,不报错。 返回结果 参数 描述 database 数据库名 table 表名 例子 curl -i -u : --insecure --negotiate -X DELETE 'https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/table2?ifExists=true' ddl/database/:db/table/:existingtable/like/:newtable (PUT) 描述 创建一张和已经存在的表一样的表 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:existingtable/like/:newtable 参数 参数 描述 :db 数据库名 :existingtable 已有表名 :newtable 新表名 group 创建表时使用的用户组。 permissions 创建表时使用的权限。 external 指定位置,hive不使用表的默认位置。 ifNotExists 当设置为true时,如果表已经存在,Hive不报错。 location HDFS路径 返回结果 参数 描述 database 数据库名 table 表名 例子 curl -i -u : --insecure --negotiate -X PUT -HContent-type:application/json -d '{"ifNotExists": "true"}' 'https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/t1/like/tt1' ddl/database/:db/table/:table/partition(GET) 描述 列出表的分区信息 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/partition 参数 参数 描述 :db 数据库名 :table 表名 返回结果 参数 描述 database 数据库名 table 表名 partitions 分区属性值和分区名 例子 curl -i -u : --insecure --negotiate https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/x1/partition ddl/database/:db/table/:table/partition/:partition(GET) 描述 列出表的某个具体分区的信息 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/partition/:partition 参数 参数 描述 :db 数据库名 :table 表名 :partition 分区名,解码http引用时,需当心。比如country=%27algeria%27。 返回结果 参数 描述 database 数据库名 table 表名 partition 分区名 partitioned 如果设置为true,为分区表 location 表的存储路径 outputFormat 输出格式 columns 列名,类型,备注 owner 所有者 partitionColumns 分区的列 inputFormat 输入格式 totalNumberFiles 分区下文件个数 totalFileSize 分区下文件总大小 maxFileSize 最大文件大小 minFileSize 最小文件大小 lastAccessTime 最后访问时间 lastUpdateTime 最后更新时间 例子 curl -i -u : --insecure --negotiate https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/x1/partition/dt=1 ddl/database/:db/table/:table/partition/:partition(PUT) 描述 增加一个表分区 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/partition/:partition 参数 参数 描述 :db 数据库名。 :table 表名。 group 创建新分区时使用的用户组。 permissions 创建新分区时用户的权限。 location 新分区的存放位置。 ifNotExists 如果设置为true, 当分区已经存在,系统报错。 返回结果 参数 描述 database 数据库名 table 表名 partitions 分区名 例子 curl -i -u : --insecure --negotiate -X PUT -HContent-type:application/json -d '{}' https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/x1/partition/dt=10 ddl/database/:db/table/:table/partition/:partition(DELETE) 描述 删除一个表分区 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/partition/:partition 参数 参数 描述 :db 数据库名。 :table 表名。 group 删除新分区时使用的用户组。 permissions 删除新分区时用户的权限, 格式为rwxrw-r-x。 ifExists 如果指定分区不存在,Hive报错。参数值设置为true除外。 返回结果 参数 描述 database 数据库名 table 表名 partitions 分区名 例子 curl -i -u : --insecure --negotiate -X DELETE -HContent-type:application/json -d '{}' https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/x1/partition/dt=10 ddl/database/:db/table/:table/column(GET) 描述 获取表的column列表 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/column 参数 参数 描述 :db 数据库名 :table 表名 返回结果 参数 描述 database 数据库名 table 表名 columns 列名字和类型 例子 curl -i -u : --insecure --negotiate https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/t1/column ddl/database/:db/table/:table/column/:column(GET) 描述 获取表的某个具体的column的信息 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/column/:column 参数 参数 描述 :db 数据库名 :table 表名 :column 列名 返回结果 参数 描述 database 数据库名 table 表名 column 列名字和类型 例子 curl -i -u : --insecure --negotiate https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/t1/column/id ddl/database/:db/table/:table/column/:column(PUT) 描述 增加表的一列 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/column/:column 参数 参数 描述 :db 数据库名 :table 表名 :column 列名 type 列类型,比如string和int comment 列备注,比如描述 返回结果 参数 描述 database 数据库名 table 表名 column 列名 例子 curl -i -u : --insecure --negotiate -X PUT -HContent-type:application/json -d '{"type": "string", "comment": "new column"}' https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/t1/column/name ddl/database/:db/table/:table/property(GET) 描述 获取表的property URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/property 参数 参数 描述 :db 数据库名 :table 表名 返回结果 参数 描述 database 数据库名 table 表名 properties 属性列表 例子 curl -i -u : --insecure --negotiate https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/t1/property ddl/database/:db/table/:table/property/:property(GET) 描述 获取表的某个具体的property的值 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/property/:property 参数 参数 描述 :db 数据库名 :table 表名 :property 属性名 返回结果 参数 描述 database 数据库名 table 表名 property 属性列表 例子 curl -i -u : --insecure --negotiate https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/t1/property/last_modified_by ddl/database/:db/table/:table/property/:property(PUT) 描述 增加表的property的值 URL https://www.myserver.com/templeton/v1/ddl/database/:db/table/:table/property/:property 参数 参数 描述 :db 数据库名 :table 表名 :property 属性名 value 属性值 返回结果 参数 描述 database 数据库名 table 表名 property 属性名 例子 curl -i -u : --insecure --negotiate -X PUT -HContent-type:application/json -d '{"value": "my value"}' https://10.64.35.144:9111/templeton/v1/ddl/database/default/table/t1/property/mykey mapreduce/jar(POST) 描述 执行MR任务,在执行之前,需要将MR的jar包上传到HDFS中 URL https://www.myserver.com/templeton/v1/mapreduce/jar 参数 参数 描述 jar 需要执行的MR的jar包。 class 需要执行的MR的分类。 libjars 需要加入的classpath的jar包名,以逗号分隔。 files 需要复制到集群的文件名,以逗号分隔。 arg Main类接受的输入参数。 define 设置hadoop的配置,格式为:define=NAME=VALUE。 statusdir WebHCat会将执行的MR任务的状态写入到statusdir中。如果设置了这个值,那么需要用户手动进行删除。 enablelog 如果statusdir设置,enablelog设置为true,收集Hadoop任务配置和日志到$statusdir/logs。此后,成功和失败的尝试,都将记录进日志。$statusdir/logs下,子目录布局为: logs/$job_id (directory for $job_id) logs/$job_id/job.xml.html logs/$job_id/$attempt_id (directory for $attempt_id) logs/$job_id/$attempt_id/stderr logs/$job_id/$attempt_id/stdout logs/$job_id/$attempt_id/syslog 仅支持Hadoop 1.X。 callback 在MR任务执行完的回调地址,使用$jobId,将任务ID嵌入回调地址。在回调地址中,任务ID替换该$jobId。 返回结果 参数 描述 id 任务ID,类似“job_201110132141_0001” 例子 curl -i -u : --insecure --negotiate -d jar="/tmp/word.count-0.0.1-SNAPSHOT.jar" -d class=com.huawei.word.count.WD -d statusdir="/output" -d enablelog=true "https://10.64.35.144:9111/templeton/v1/mapreduce/jar" mapreduce/streaming(POST) 描述 以Streaming方式提交MR任务 URL https://www.myserver.com/templeton/v1/mapreduce/streaming 参数 参数 描述 input Hadoop中input的路径。 output 存储output的路径。如没有规定,WebHCat将output储存在使用队列资源可以发现到的路径。 mapper mapper程序位置。 reducer reducer程序位置。 files HDFS文件添加到分布式缓存中。 arg 设置argument。 define 设置hadoop的配置变量,格式:define=NAME=VALUE cmdenv 设置环境变量,格式:cmdenv=NAME=VALUE statusdir WebHCat会将执行的MR任务的状态写入到statusdir中。如果设置了这个值,那么需要用户手动进行删除。 enablelog 如果statusdir设置,enablelog设置为true,收集Hadoop任务配置和日志到$statusdir/logs。此后,成功和失败的尝试,都将记录进日志。$statusdir/logs下,子目录布局为: logs/$job_id (directory for $job_id) logs/$job_id/job.xml.html logs/$job_id/$attempt_id (directory for $attempt_id) logs/$job_id/$attempt_id/stderr logs/$job_id/$attempt_id/stdout logs/$job_id/$attempt_id/syslog 仅支持Hadoop 1.X。 callback 在MR任务执行完的回调地址,使用$jobId,将任务ID嵌入回调地址。在回调地址中,任务ID将替换该$jobId。 返回结果 参数 描述 id 任务ID,类似job_201110132141_0001 例子 curl -i -u : --insecure --negotiate -d input=/input -d output=/oooo -d mapper=/bin/cat -d reducer="/usr/bin/wc -w" -d statusdir="/output" 'https://10.64.35.144:9111/templeton/v1/mapreduce/streaming' 本接口的使用需要前置条件,请参阅“开发规范”中的Hive规则。 /hive(POST) 描述 执行Hive命令 URL https://www.myserver.com/templeton/v1/hive 参数 参数 描述 execute hive命令,包含整个和短的Hive命令。 file 包含hive命令的HDFS文件。 files 需要复制到集群的文件名,以逗号分隔。 arg 设置argument。 define 设置hive配置,格式:define=key=value。使用post语句时需要配置实例的scratch dir。WebHCat实例使用define=hive.exec.scratchdir=/tmp/hive-scratch,WebHCat1实例使用define=hive.exec.scratchdir=/tmp/hive1-scratch,以此类推。 statusdir WebHCat会将执行的MR任务的状态写入到statusdir中。如果设置了这个值,那么需要用户手动进行删除。 enablelog 如果statusdir设置,enablelog设置为true,收集Hadoop任务配置和日志到$statusdir/logs。此后,成功和失败的尝试,都将记录进日志。$statusdir/logs下,子目录布局为: logs/$job_id (directory for $job_id) logs/$job_id/job.xml.html logs/$job_id/$attempt_id (directory for $attempt_id) logs/$job_id/$attempt_id/stderr logs/$job_id/$attempt_id/stdout logs/$job_id/$attempt_id/syslog callback 在MR任务执行完的回调地址,使用$jobId,将任务ID嵌入回调地址。在回调地址中,任务ID将替换该$jobId。 返回结果 参数 描述 id 任务ID,类似job_201110132141_0001 例子 curl -i -u : --insecure --negotiate -d execute="select count(*) from t1" -d define=hive.exec.scratchdir=/tmp/hive-scratch -d statusdir="/output" "https://10.64.35.144:9111/templeton/v1/hive" jobs(GET) 描述 获取所有的job id URL https://www.myserver.com/templeton/v1/jobs 参数 参数 描述 fields 如果设置成*,那么会返回每个job的详细信息。如果没设置,只返回任务ID。现在只能设置成*,如设置成其他值,将出现异常。 jobid 如果设置了jobid,那么只有字典顺序比jobid大的job才会返回。比如,如果jobid为"job_201312091733_0001",只有大于该值的job才能返回。返回的job的个数,取决于numrecords。 numrecords 如果设置了numrecords和jobid,jobid列表按字典顺序排列,待jobid返回后,可以得到numrecords的最大值。如果jobid没有设置, 而numrecords设置了参数值,jobid按字典顺序排列后,可以得到numrecords的最大值。相反,如果numrecords没有设置,而jobid设置了参数值,所有大于jobid的job都将返回。 showall 如果设置为true,用户可以浏览的所有job都将返回。不仅仅是用户所拥有的job。 返回结果 参数 描述 id Job id detail 如果showall为true,那么显示detail信息,否则为null。 例子 curl -i -u : --insecure --negotiate "https://10.64.35.144:9111/templeton/v1/jobs" jobs/:jobid(GET) 描述 获取指定job的信息 URL https://www.myserver.com/templeton/v1/jobs/:jobid 参数 参数 描述 jobid Job创建后的Jobid 返回结果 参数 描述 status 包含job状态信息的json对象。 profile 包含job状态的json对象。WebHCat解析JobProfile对象中的信息,该对象因Hadoop版本不同而不同。 id Job的id。 percentComplete 完成百分比,比如75% complete,如果完成后则为null。 user 创建job的用户。 callback 回调URL(如果有)。 userargs 用户提交job时的argument参数和参数值。 exitValue job退出值。 例子 curl -i -u : --insecure --negotiate "https://10.64.35.144:9111/templeton/v1/jobs/job_1440386556001_0255" jobs/:jobid(DELETE) 描述 kill任务 URL https://www.myserver.com/templeton/v1/jobs/:jobid 参数 参数 描述 :jobid 删除的Job的ID 返回结果 参数 描述 user 提交Job的用户。 status 包含Job状态信息的JSON对象。 profile 包含Job信息的JSON对象。WebHCat解析JobProfile对象中的信息,该对象因Hadoop版本不同而不同。 id Job的ID。 callback 回调的URL(如果有)。 例子 curl -i -u : --insecure --negotiate -X DELETE "https://10.64.35.143:9111/templeton/v1/jobs/job_1440386556001_0265" 父主题: Hive对外接口介绍
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample SparkOnHbaseJavaExample.jar ${ip} 9999 streamingTable cf1 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar ${ip} 9999 streamingTable cf1 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1
  • 数据规划 在客户端执行hbase shell进入HBase命令行。 在hbase命令执行下面的命令创建HBbase表: create 'streamingTable','cf1' 在客户端另外一个session通过linux命令构造一个端口进行接收数据(不同操作系统的机器,命令可能不同,suse尝试使用netcat -lk 9999): nc -lk 9999 提交任务命令执行之后,在该命令下输入要提交的数据,通过HBase表进行接收。 在构造一个端口进行接收数据时,需要在客户端所在服务器上安装netcat。
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseStreamingBulkPutExample文件: # -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseStreamingBulkPutExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseStreamingBulkPutExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 回答 建议将"blob.storage.directory"配置选项设置成“/tmp”或者“/opt/huawei/Bigdata/tmp”。 当用户将"blob.storage.directory"配置选项设置成自定义目录时,需要手动赋予用户该目录的owner权限。以下以FusionInsight的admin用户为例。 修改Flink客户端配置文件conf/flink-conf.yaml,配置blob.storage.directory: /home/testdir/testdirdir/xxx。 创建目录/home/testdir(创建一层目录即可),设置该目录为admin用户所属。 /home/testdir/下的testdirdir/xxx目录在启动Flink集群时会在每个节点下自动创建。 进入客户端路径,执行命令./bin/yarn-session.sh -jm 2048 -tm 3072,可以看到yarn-session正常启动并且成功创建目录。
  • 代码样例 如下是删除文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 删除目录 * * @throws java.io.IOException */ private void rmdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!deletePath(destPath)) { LOG.error("failed to delete destPath " + DEST_PATH); return; } LOG.info("success to delete path " + DEST_PATH); } /** * delete file path * * @param filePath * @return * @throws java.io.IOException */ private boolean deletePath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { return false; } // fSystem.delete(filePath, true); return fSystem.delete(filePath, true); }
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。 将commons-pool2-xxx.jar上传到“$SPARK_HOME/jars/streamingClient010/”目录下(jar包可从$SPARK_HOME/tool/carbonPrequery目录下获取)。
  • 数据规划 StructuredStreaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中调测Spark应用章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从Kafka中读取数据,执行对应处理之后,然后将结果数据回写至Kafka中。 例如:Spark Streming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过Kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将Kafka的阈值调大,建议在FusionInsight Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 问题 使用运行的Spark Streaming任务回写Kafka时,Kafka上接收不到回写的数据,且Kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
共100000条