华为云用户手册

  • 简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • Flink开发接口简介 Flink DataStream API提供Scala和Java两种语言的开发方式,如表1所示。 表1 Flink DataStream API接口 功能 说明 Scala API 提供Scala语言的API,提供过滤、join、窗口、聚合等数据处理能力。由于Scala语言的简洁易懂,推荐用户使用Scala接口进行程序开发。 Java API 提供Java语言的API,提供过滤、join、窗口、聚合等数据处理能力。
  • 基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过socket、Kafka和文件等形式导入,在Flink系统处理后,在通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
  • 基本概念 ResourceManager(RM) RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。 ApplicationMaster(AM) 用户提交的每个应用程序均包含一个AM,主要功能包括: 与RM调度器协商以获取资源(用Container表示)。 将得到的资源进一步分配给内部任务。 与NM通信以启动/停止任务。 监控所有任务的运行状态,并在任务运行失败时重新为任务申请资源以重启任务。 NodeManager(NM) NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它会接收并处理来自AM的Container启动/停止等各种请求。 Container Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。
  • 简介 Yarn是一个分布式的资源管理系统,用于提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer还可以周期性的在已有的代码上进行修改,可是随着代码的增加以及原MapReduce框架设计的不足,在原MapReduce框架上进行修改变得越来越困难,所以MapReduce的committer决定从架构上重新设计MapReduce,使下一代的MapReduce(MRv2/Yarn)框架具有更好的扩展性、可用性、可靠性、向后兼容性和更高的资源利用率,以及能支持除了MapReduce计算框架外的更多的计算框架。
  • 部署运行及结果查看 导出本地jar包,请参见打包IntelliJ IDEA代码。 将1中导出的本地Jar包,5中获取的配置文件和6中获取的jar包合并统一打出完整的业务jar包,请参见打包业务。 执行命令提交拓扑。 keytab方式下,若用户修改了keytab文件名,如修改为“huawei.keytab”,则需要在命令中增加第二个参数进行说明,提交命令示例(拓扑名为hbase-test): storm jar /opt/jartarget/source.jar com.huawei.storm.example.hbase.SimpleHBaseTopology hbase-test huawei.keytab 安全模式下在提交source.jar之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。 因为示例中的HBaseBolt并没有建表功能,在提交之前确保hbase中存在相应的表,若不存在需要手动建表,hbase shell建表语句如下create 'WordCount', 'cf'。 安全模式下hbase需要用户有相应表甚至列族和列的访问权限,因此首先需要在hbase所在集群上使用hbase管理员用户登录,之后在hbase shell中使用grant命令给提交用户申请相应表的权限,如示例中的WordCount,成功之后再使用提交用户登录并提交拓扑。 拓扑提交成功后请自行登录HBase集群查看。 如果使用票据登录,则需要使用命令行定期上传票据,具体周期由票据刷新截止时间而定,步骤如下: 在安装好的storm客户端目录的“Storm/storm-1.2.1/conf/storm.yaml”文件尾部新起一行添加如下内容: topology.auto-credentials: - org.apache.storm.security.auth.kerberos.AutoTGT 执行命令./storm upload-credentials hbase-test。
  • 部署运行及结果查看 导出本地jar包,请参见打包IntelliJ IDEA代码。 将1导出的本地Jar包,5中获取的配置文件和6中获取的jar包合并统一打出完整的业务jar包,请参见打包业务。 执行命令提交拓扑。 keytab方式下,若用户修改了keytab文件名,如修改为“huawei.keytab”,则需要在命令中增加第二个参数进行说明,提交命令示例(拓扑名为hdfs-test): storm jar /opt/jartarget/source.jar com.huawei.storm.example.hdfs.SimpleHDFSTopology hdfs-test huawei.keytab 安全模式下在提交source.jar之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。 拓扑提交成功后请登录HDFS集群查看。 如果使用票据登录,则需要使用命令行定期上传票据,具体周期由票据刷新截止时间而定,步骤如下: 在安装好的storm客户端目录的“Storm/storm-1.2.1/conf/storm.yaml”文件尾部新起一行添加如下内容: topology.auto-credentials: - org.apache.storm.security.auth.kerberos.AutoTGT 执行命令:./storm upload-credentials hdfs-test
  • 对外接口 Storm-HDFS采用的接口同开源社区版本保持一致,详情参见:https://github.com/apache/storm/tree/v1.2.1/external/storm-hdfs。 Storm-HBase采用的接口同开源社区版本保持一致,详情参见:https://github.com/apache/storm/tree/v1.2.1/external/storm-hbase。 Storm-Kafka采用的接口同开源社区版本保持一致,详情参见:https://github.com/apache/storm/tree/v1.2.1/external/storm-kafka。 Storm-JDBC采用的接口同开源社区版本保持一致,详情参见:https://github.com/apache/storm/tree/v1.2.1/external/storm-jdbc。 父主题: 更多信息
  • 操作步骤 修改WordCountTopology.java类,使用remoteSubmit方式提交应用程序。并替换用户keytab文件名称,用户principal名称,和Jar文件地址。 使用remoteSubmit方式提交应用程序 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.REMOTE); } 根据实际情况修改userJarFilePath为实际的拓扑Jar包地址 private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "D:\\example.jar"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } 安全模式下需要执行安全准备,根据实际情况修改userKeyTablePath和userPrincipal为配置并导入样例工程章节的步骤2中所获取用户的keytab文件路径和principal private static void securityPrepare(Config config) throws IOException { String userKeyTablePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "user.keytab"; String userPrincipal = "StreamingDeveloper"; String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator +"krb5.conf"; //windows路径下分隔符替换 userKeyTablePath = userKeyTablePath.replace("\\", "\\\\"); krbFilePath = krbFilePath.replace("\\", "\\\\"); String principalInstance = String.valueOf(config.get(Config.STORM_SECURITY_PRINCIPAL_INSTANCE)); LoginUtil.setKrb5Config(krbFilePath); LoginUtil.setZookeeperServerPrincipal("zookeeper/" + principalInstance); LoginUtil.setJaasFile(userPrincipal, userKeyTablePath); } 执行WordCountTopology.java类的Main方法提交应用程序。
  • 操作步骤 将从IntelliJ IDEA中导出的jar包复制到Linux客户端指定目录(例如“/opt/jarsource”)。 若业务需要访问外部组件,其所依赖的配置文件请参考相关开发指引,获取到配置文件后将配置文件放在1中指定的目录下。 若业务需要访问外部组件,其所依赖的jar包请参考相关开发指引,获取到jar包后将jar包放在1中指定的目录下。 在Storm客户端安装目录“Storm/storm-1.2.1/bin”下执行打包命令,将上述jar包打成一个完整的业务jar包放入指定目录/opt/jartarget(可为任意空目录)。执行sh storm-jartool.sh /opt/jarsource/ /opt/jartarget命令后,会在“/opt/jartarget”下生成source.jar。
  • 代码样例 下面代码片段在com.huawei.storm.example.wordcount包的“WordCountTopology”类的“main”方法中,作用在于构建应用程序并提交。 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.CMD); } private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception { switch (type) { case CMD: { cmdSubmit(builder, null); break; } case REMOTE: { remoteSubmit(builder); break; } case LOCAL: { localSubmit(builder); break; } } } /** * 命令行方式远程提交 * 步骤如下: * 打包成Jar包,然后在客户端命令行上面进行提交 * 远程提交的时候,要先将该应用程序和其他外部依赖(非excemple工程提供,用户自己程序依赖)的jar包打包成一个大的jar包 * 再通过storm客户端中storm -jar的命令进行提交 * * 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录 * * 运行命令如下: * ./storm jar ../example/example.jar com.huawei.storm.example.WordCountTopology */ private static void cmdSubmit(TopologyBuilder builder, Config conf) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException { if (conf == null) { conf = new Config(); } conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology()); } private static void localSubmit(TopologyBuilder builder) throws InterruptedException { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "替换为用户jar包地址"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } private static TopologyBuilder buildTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word")); return builder; } 如果拓扑开启了ack,推荐acker的数量不大于所设置的worker数量。
  • 操作步骤 将从IntelliJ IDEA打包出来的jar包放入指定文件夹(例如“D:\source”)。 在样例代码目录“src/storm-examples/storm-examples”下创建“lib”目录,将IntelliJ IDEA中导出的jar包复制到“lib”目录下,并解压。 若业务需要访问外部组件,其所依赖的配置文件请参考相关开发指引,获取到配置文件后将配置文件放在1中指定的目录下。 若业务需要访问外部组件,其所依赖的jar包请参考相关开发指引,获取到jar包后将jar包放在1中指定的目录下。 在IntelliJ IDEA样例工程的“tools”目录下找到打包工具:“storm-jartool.cmd”。 双击打包工具,输入要打包的jar包所在目录(“D:\source”)并回车,再输入打出包存放的目录(“D:\target”),在“D:\target”中,会生成“source.jar”文件。
  • 代码样例 下面代码片段在com.huawei.storm.example.common包的RandomSentenceSpout类的nextTuple方法中,作用在于将收到的字符串拆分成单词。 /** * {@inheritDoc} */ @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[] {"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; String sentence = sentences[random.nextInt(sentences.length)]; collector.emit(new Values(sentence)); }
  • 操作步骤 安全模式下,请先进行安全认证。 初始化客户端环境变量。 进入客户端安装目录“/opt/Storm_client”执行以下命令,导入环境变量信息。 source bigdata_env 使用在“准备开发用户”章节创建的开发用户进行安全登录。 执行kinit命令进行“人机”用户的安全登录。 kinit用户名 例如: kinit developuser 然后按照提示输入密码,无异常提示返回,则完成了用户的kerberos认证。 提交拓扑(以wordcount为例,其它拓扑请参照相关开发指引),进入Storm客户端“Storm/storm-1.2.1/bin”目录,将刚打出的source.jar提交(如果在Windows上进行的打包,则需要将Windows上的source.jar上传到Linux服务器,假定上传到“/opt/jartarget”目录),执行命令:storm jar /opt/jartarget/source.jar com.huawei.storm.example.wordcount.WordCountTopology。 执行storm list命令,查看已经提交的应用程序,如果发现名称为word-count的应用程序,则说明任务提交成功。 如果业务设置为本地模式,且使用命令行方式提交时,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。
  • 代码样例 下面代码片段在com.huawei.storm.example.common包的“SplitSentenceBolt”类的“execute”方法中,作用在于拆分每条语句为单个单词并发送。 /** * {@inheritDoc} */ @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); if (!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } } 下面代码片段在com.huawei.storm.example.wordcount包的“WordCountBolt”类的execute方法中,作用在于统计收到的每个单词的数量。 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.out.println("word: " + word + ", count: " + count); }
  • Spark2x样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Spark2x相关样例工程: 表1 Spark2x相关样例工程 样例工程位置 描述 sparksecurity-examples/SparkHbasetoCarbonJavaExample Spark同步HBase数据到CarbonData的Java示例程序。 本示例工程中,应用将数据实时写入HBase,用于点查业务。数据每隔一段时间批量同步到CarbonData表中,用于分析型查询业务。 sparksecurity-examples/SparkHbasetoHbaseJavaExample Spark从HBase读取数据再写入HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现两个HBase表数据的分析汇总。 sparksecurity-examples/SparkHbasetoHbasePythonExample sparksecurity-examples/SparkHbasetoHbaseScalaExample sparksecurity-examples/SparkHivetoHbaseJavaExample Spark从Hive读取数据再写入到HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现分析处理Hive表中的数据,并将结果写入HBase表。 sparksecurity-examples/SparkHivetoHbasePythonExample sparksecurity-examples/SparkHivetoHbaseScalaExample sparksecurity-examples/SparkJavaExample Spark Core任务的Java/Python/Scala/R示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkRExample示例不支持未开启Kerberos认证的集群。 sparksecurity-examples/SparkPythonExample sparksecurity-examples/SparkRExample sparksecurity-examples/SparkScalaExample sparksecurity-examples/SparkLauncherJavaExample 使用Spark Launcher提交作业的Java/Scala示例程序。 本工程应用程序通过org.apache.spark.launcher.SparkLauncher类采用Java/Scala命令方式提交Spark应用。 sparksecurity-examples/SparkLauncherScalaExample sparksecurity-examples/SparkOnClickHouseJavaExample Spark通过ClickHouse JDBC的原生接口,以及Spark JDBC驱动,实现对ClickHouse数据库和表的创建、查询、插入等操作样例代码。 sparksecurity-examples/SparkOnClickHousePythonExample sparksecurity-examples/SparkOnClickHouseScalaExample sparksecurity-examples/SparkOnHbaseJavaExample Spark on HBase场景的Java/Scala/Python示例程序。 本工程应用程序以数据源的方式去使用HBase,将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。 sparksecurity-examples/SparkOnHbasePythonExample sparksecurity-examples/SparkOnHbaseScalaExample sparksecurity-examples/SparkOnHudiJavaExample Spark on Hudi场景的Java/Scala/Python示例程序。 本工程应用程序使用Spark操作Hudi执行插入数据、查询数据、更新数据、增量查询、特定时间点查询、删除数据等操作。 sparksecurity-examples/SparkOnHudiPythonExample sparksecurity-examples/SparkOnHudiScalaExample sparksecurity-examples/SparkOnMultiHbaseScalaExample Spark同时访问两个集群中的HBase的Scala示例程序。 sparksecurity-examples/SparkSQLJavaExample Spark SQL任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 sparksecurity-examples/SparkSQLPythonExample sparksecurity-examples/SparkSQLScalaExample sparksecurity-examples/SparkStreamingKafka010JavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程应用程序实时累加计算Kafka中的流数据,统计每个单词的记录总数。 sparksecurity-examples/SparkStreamingKafka010PythonExample sparksecurity-examples/SparkStreamingtoHbaseJavaExample010 Spark Streaming读取Kafka数据并写入HBase的Java/Scala/Python示例程序。 本工程应用程序每5秒启动一次任务,读取Kafka中的数据并更新到指定的HBase表中。 sparksecurity-examples/SparkStreamingtoHbasePythonExample010 sparksecurity-examples/SparkStreamingtoHbaseScalaExample010 sparksecurity-examples/SparkStructuredStreamingJavaExample 在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。 sparksecurity-examples/SparkStructuredStreamingPythonExample sparksecurity-examples/SparkStructuredStreamingScalaExample sparksecurity-examples/SparkThriftServerJavaExample 通过JDBC访问Spark SQL的Java/Scala示例程序。 本示例中,用户自定义JDBCServer的客户端,使用JDBC连接来进行表的创建、数据加载、查询和删除。 sparksecurity-examples/SparkThriftServerScalaExample sparksecurity-examples/StructuredStreamingADScalaExample 使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。 sparksecurity-examples/StructuredStreamingStateScalaExample 在Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。 父主题: 概述
  • 功能分解 根据上述场景进行功能分解,如表1所示: 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Bolt 4 创建topology 请参见创建Topology 部分代码请参考代码样例说明,完整代码请参考Strom-examples示例工程。
  • 开发流程 本文档主要基于Java API进行Storm拓扑的开发。 开发流程中各阶段的说明如图1和表1所示: 图1 拓扑开发流程 表1 Storm应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Storm的基本概念,了解场景需求,拓扑等。 常用概念 准备开发和运行环境 Storm的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。 Storm的运行环境即Storm客户端,请根据指导完成客户端的安装和配置。 准备开发和运行环境 准备工程 Storm提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 配置并导入样例工程 根据场景开发拓扑 提供了Storm拓扑的构造和Spout/Bolt开发过程。 开发程序 打包IntelliJ IDEA代码 Storm样例程序是在Linux环境下运行,需要将IntelliJ IDEA中的代码打包成jar包。 打包IntelliJ IDEA代码 打包业务 将IntelliJ IDEA代码生成的jar包与工程依赖的jar包,合并导出可提交的source.jar。 打包业务 提交拓扑 指导用户将开发好的程序提交运行。 提交拓扑 查看程序运行结果 指导用户提交拓扑后查看程序运行结果。 查看结果 父主题: 概述
  • 简介 Storm是一个分布式的、可靠的、容错的数据流处理系统。它会把工作任务委托给不同类型的组件,每个组件负责处理一项简单特定的任务。Storm的目标是提供对大数据流的实时处理,可以可靠地处理无限的数据流。 Storm有很多适用的场景:实时分析、在线机器学习、持续计算和分布式ETL等,易扩展、支持容错,可确保数据得到处理,易于构建和操控。 Storm有如下几个特点: 适用场景广泛 易扩展,可伸缩性高 保证无数据丢失 容错性好 多语言 易于构建和操控
  • 问题现象 Spark能对接很多的第三方工具,因此在使用过程中经常会依赖一堆的三方包。而有一些包MRS已经自带,这样就有可能造成代码使用的jar包版本和集群自带的jar包版本不一致,在使用过程中就有可能出现jar包冲突的情况。 常见的jar包冲突报错有: 1、报错类找不到:java.lang.NoClassDefFoundError 2、报错方法找不到:java.lang.NoSuchMethodError
  • 原因分析 以自定义UDF为例: 报错信息显示是找不到类。 首先需要确认的是这个类属于的jar包是否在jvm的classpath里面, spark自带的jar都在“spark客户端目录/jars/”。 确认是否存在多个jar包拥有这个类。 如果是其他依赖包,可能是没有使用--jars添加到任务里面。 如果是已经添加到任务里面,但是依旧没有取到,可能是因为配置文件的driver或者executor的classpath配置不正确,可以查看日志确认是否加载到环境。 另外可能报错是类初始化失败导致后面使用这个类的时候出现上述报错,需要确认是否在之前就有初始化失败或者其他报错的情况发生。 报错信息显示找不到方法。 确认这个方法对应的类所在的jar包是否加载到jvm的classpath里面,spark自带的类都在“spark客户端目录/jars/”。 确认是否有多个jar包包含这个类(尤其注意相同工具的不同版本)。 如果报错是Hadoop相关的包,有可能是因为使用的Hadoop版本不一致导致部分方法已经更改。 如果报错的是三方包里面的类,可能是因为Spark已经自带了相关的jar包,但是和代码中使用的版本不一致。
  • 查看调试结果 SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/D:/mavenlocal/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/D:/mavenlocal/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.---- Begin executing sql: CREATE TABLE IF NOT EXISTS CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ----Result---- Done executing sql: CREATE TABLE IF NOT EXISTS CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -------- Begin executing sql: LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD ----Result---- Done executing sql: LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD -------- Begin executing sql: SELECT * FROM child ----NAMEAGEMiranda32Karlie23Candice27---- Done executing sql: SELECT * FROM child -------- Begin executing sql: DROP TABLE child ----Result---- Done executing sql: DROP TABLE child ----Process finished with exit code 0 父主题: 在Windows中调测程序
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考编包并运行程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/user.keytab”,“/opt/krb5.conf”。 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/” )下。
  • 数据规划 创建HBase表,构造数据,列需要包含key,modify_time,valid。其中每条数据key值全表唯一,modify_time代表修改时间,valid代表是否为有效数据(该样例中'1'为有效,'0'为无效数据)。 示例:进入hbase shell,执行如下命令: create 'hbase_table','key','info' put 'hbase_table','1','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','1','info:valid','1' put 'hbase_table','2','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','2','info:valid','1' put 'hbase_table','3','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','3','info:valid','0' put 'hbase_table','4','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','4','info:valid','1' 上述数据的modify_time列可设置为当前时间之前的值。 put 'hbase_table','5','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','5','info:valid','1' put 'hbase_table','6','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','6','info:valid','1' put 'hbase_table','7','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','7','info:valid','0' put 'hbase_table','8','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','8','info:valid','1' put 'hbase_table','4','info:valid','0' put 'hbase_table','4','info:modify_time','2021-03-03 15:20:39' 上述数据的modify_time列可设置为样例程序启动后30分钟内的时间值(此处的30分钟为样例程序默认的同步间隔时间,可修改)。 put 'hbase_table','9','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','9','info:valid','1' put 'hbase_table','10','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','10','info:valid','1' put 'hbase_table','11','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','11','info:valid','0' put 'hbase_table','12','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','12','info:valid','1' 上述数据的modify_time列可设置为样例程序启动后30分钟到60分钟内的时间值,即第二次同步周期。 在sparksql中创建HBase的hive外表,命令如下: create table external_hbase_table(key string ,modify_time STRING, valid STRING) using org.apache.spark.sql.hbase.HBaseSource options(hbaseTableName "hbase_table", keyCols "key", colsMapping "modify_time=info.modify_time,valid=info.valid"); 在sparksql中创建CarbonData表: create table carbon01(key string,modify_time STRING, valid STRING) stored as carbondata; 初始化加载当前hbase表中所有数据到CarbonData表; insert into table carbon01 select * from external_hbase_table where valid='1'; 用spark-submit提交命令: spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --class com.huawei.bigdata.spark.examples.HBaseExternalHivetoCarbon /opt/example/HBaseExternalHivetoCarbon-1.0.jar
  • 回答 第三方jar包(例如自定义udf)区分x86和TaiShan版本时,混合使用方案: 进入到服务端Spark2x SparkResource的安装目录(集群安装时,SparkResource可能会安装在多个节点上,登录任意一个SparkResource节点,进入到SparkResource的安装目录)。 准备好自己的jar包,例如xx.jar的x86版本和TaiShan版本。将x86版本和TaiShan版本的xx.jar分别复制到当前目录的x86文件夹和TaiShan文件夹里面。 在当前目录下执行以下命令将jar包打包: zip -qDj spark-archive-2x-x86.zip x86/* zip -qDj spark-archive-2x-arm.zip arm/* 执行以下命令查看hdfs上的spark2x依赖的jar包: hdfs dfs -ls /user/spark2x/jars/8.1.0.1 8.1.0.1是版本号,不同版本不同。 执行以下命令移动hdfs上旧的jar包文件到其他目录,例如移动到“tmp”目录。 hdfs dfs -mv /user/spark2x/jars/8.1.0.1/spark-archive-2x-arm.zip /tmp hdfs dfs -mv /user/spark2x/jars/8.1.0.1/spark-archive-2x-x86.zip /tmp 上传3中打包的spark-archive-2x-arm.zip和spark-archive-2x-x86.zip到hdfs的/user/spark2x/jars/8.1.0.1目录下,上传命令如下: hdfs dfs -put spark-archive-2x-arm.zip /user/spark2x/jars/8.1.0.1/ hdfs dfs -put spark-archive-2x-x86.zip /user/spark2x/jars/8.1.0.1/ 上传完毕后删除本地的spark-archive-2x-arm.zip,spark-archive-2x-x86.zip文件。 对其他的sparkResource安装节点执行1~2。 进入webUI重启spark2x的jdbcServer实例。 重启后,需要更新客户端配置。按照客户端所在的机器类型(x86、TaiShan)复制xx.jar的相应版本到客户端的Spark2x安装目录“${install_home}/Spark2x/spark/jars”文件夹中。${install_home}是用户的客户端安装路径,用户需要填写实际的安装目录;若本地的安装目录为“/opt/hadoopclient”,那么就复制相应版本xx.jar到“/opt/hadoopclient/Spark2x/spark/jars”文件夹里。
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考编包并运行程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。 运行Python样例代码无需通过Maven打包,只需要上传user.keytab、krb5.conf 文件到客户端所在服务器上。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/example/” )下。
  • 回答 Spark任务在运行过程中,driver会创建一个spark-开头的本地临时目录,用于存放业务jar包,配置文件等,同时在本地创建一个blockmgr-开头的本地临时目录,用于存放block data。此两个目录会在Spark应用运行结束时自动删除。 此两个目录的存放路径优先通过SPARK_LOCAL_DIRS环境变量指定,若不存在该环境变量,则设置为spark.local.dir的值,若此配置还不存在,则使用java.io.tmpdir的值。客户端默认配置中spark.local.dir被设置为/tmp,因此默认使用系统/tmp目录。 但存在一些特殊情况,如driver进程未正常退出,比如被kill -9命令结束进程,或者Java虚拟机直接崩溃等场景,导致driver的退出流程未正常执行,则可能导致该部分目录无法被正常清理,残留在系统中。 当前只有yarn-client模式和local模式的driver进程会产生上述问题,在yarn-cluster模式中,已将container内进程的临时目录设置为container临时目录,当container退出时,由container自动清理该目录,因此yarn-cluster模式不存在此问题。
  • 回答 问题原因: 在IBM JDK下建立的JDBC connection时间超过登录用户的认证超时时间(默认一天),导致认证失败。 IBM JDK的机制跟Oracle JDK的机制不同,IBM JDK在认证登录后的使用过程中做了时间检查却没有检测外部的时间更新,导致即使显式调用relogin也无法得到刷新。 解决措施: 通常情况下,在发现JDBC connection不可用的时候,可以关闭该connection,重新创建一个connection继续执行。
  • 解决方案 提交yarn-client模式的结构流任务时需要额外如下操作: 将Spark客户端目录下spark-default.conf文件中的spark.driver.extraClassPath配置复制出来,并将Kafka相关jar包路径追加到该配置项之后,提交结构流任务时需要通过--conf将该配置项给加上。例如:Kafka相关jar包路径为“/kafkadir”,提交任务需要增加--conf spark.driver.extraClassPath=/opt/client/Spark2x/spark/conf/:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/x86/*:/kafkadir/*。 提交yarn-cluster模式的结构流任务时需要额外如下操作: 将Spark客户端目录下spark-default.conf文件中的spark.yarn.cluster.driver.extraClassPath配置给复制出来,并将Kafka相关jar包相对路径追加到该配置项之后,提交结构流任务时需要通过--conf 将该配置项给加上。例如:kafka相关包为kafka-clients-x.x.x.jar,kafka_2.11-x.x.x.jar,提交任务需要增加--conf spark.yarn.cluster.driver.extraClassPath=/home/huawei/Bigdata/common/runtime/security:./kafka-clients-x.x.x.jar:./kafka_2.11-x.x.x.jar。 当前版本Spark结构流部分不再支持kafka2.x之前的版本,对于升级场景请继续使用旧的客户端。
  • 查看调试结果 SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/D:/mavenlocal/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/D:/mavenlocal/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.---- Begin executing sql: CREATE TABLE IF NOT EXISTS CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ----Result---- Done executing sql: CREATE TABLE IF NOT EXISTS CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -------- Begin executing sql: LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD ----Result---- Done executing sql: LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD -------- Begin executing sql: SELECT * FROM child ----NAMEAGEMiranda32Karlie23Candice27---- Done executing sql: SELECT * FROM child -------- Begin executing sql: DROP TABLE child ----Result---- Done executing sql: DROP TABLE child ----Process finished with exit code 0 父主题: 在Windows中调测程序
共100000条