华为云用户手册

  • 简介 Storm是一个分布式的、可靠的、容错的数据流处理系统。它会把工作任务委托给不同类型的组件,每个组件负责处理一项简单特定的任务。Storm的目标是提供对大数据流的实时处理,可以可靠地处理无限的数据流。 Storm有很多适用的场景:实时分析、在线机器学习、持续计算和分布式ETL等,易扩展、支持容错,可确保数据得到处理,易于构建和操控。 Storm有如下几个特点: 适用场景广泛 易扩展,可伸缩性高 保证无数据丢失 容错性好 多语言 易于构建和操控
  • Spark2x样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Spark2x相关样例工程: 表1 Spark2x相关样例工程 样例工程位置 描述 sparknormal-examples/SparkHbasetoCarbonJavaExample Spark同步HBase数据到CarbonData的应用开发样例代码。 本示例工程中,应用将数据实时写入HBase,用于点查业务。数据每隔一段时间批量同步到CarbonData表中,用于分析型查询业务。 sparknormal-examples/SparkHbasetoHbaseJavaExample Spark从HBase读取数据再写入HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现两个HBase表数据的分析汇总。 sparknormal-examples/SparkHbasetoHbasePythonExample sparknormal-examples/SparkHbasetoHbaseScalaExample sparknormal-examples/SparkHivetoHbaseJavaExample Spark从Hive读取数据再写入到HBase的应用开发样例代码。 sparknormal-examples/SparkHivetoHbasePythonExample sparknormal-examples/SparkHivetoHbaseScalaExample sparknormal-examples/SparkJavaExample Spark Core任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 sparknormal-examples/SparkPythonExample sparknormal-examples/SparkSQLJavaExample sparknormal-examples/SparkLauncherJavaExample 使用Spark Launcher提交作业的Java/Scala示例程序。 本工程应用程序通过org.apache.spark.launcher.SparkLauncher类采用Java/Scala命令方式提交Spark应用。 sparknormal-examples/SparkLauncherScalaExample sparknormal-examples/SparkOnClickHouseJavaExample Spark通过ClickHouse JDBC的原生接口,以及Spark JDBC驱动,实现对ClickHouse数据库和表的创建、查询、插入等操作样例代码。 sparknormal-examples/SparkOnClickHousePythonExample sparknormal-examples/SparkOnClickHouseScalaExample sparknormal-examples/SparkOnHbaseJavaExample Spark on HBase场景的Java/Scala/Python示例程序。 本工程应用程序以数据源的方式去使用HBase,将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。 sparknormal-examples/SparkOnHbasePythonExample sparknormal-examples/SparkOnHbaseScalaExample sparknormal-examples/SparkOnHudiJavaExample Spark on Hudi场景的Java/Scala/Python示例程序。 本工程应用程序使用Spark操作Hudi执行插入数据、查询数据、更新数据、增量查询、特定时间点查询、删除数据等操作。 sparknormal-examples/SparkOnHudiPythonExample sparknormal-examples/SparkOnHudiScalaExample sparknormal-examples/SparkSQLJavaExample Spark SQL任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 sparknormal-examples/SparkSQLPythonExample sparknormal-examples/SparkSQLScalaExample sparknormal-examples/SparkStreamingKafka010JavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程应用程序实时累加计算Kafka中的流数据,统计每个单词的记录总数。 sparknormal-examples/SparkStreamingKafka010PythonExample sparknormal-examples/SparkStreamingtoHbaseJavaExample010 Spark Streaming读取Kafka数据并写入HBase的Java/Scala/Python示例程序。 本工程应用程序每5秒启动一次任务,读取Kafka中的数据并更新到指定的HBase表中。 sparknormal-examples/SparkStreamingtoHbasePythonExample010 sparknormal-examples/SparkStreamingtoHbaseScalaExample010 sparknormal-examples/SparkStructuredStreamingJavaExample 在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。 sparknormal-examples/SparkStructuredStreamingPythonExample sparknormal-examples/SparkStructuredStreamingScalaExample sparknormal-examples/SparkThriftServerJavaExample 通过JDBC访问Spark SQL的Java/Scala示例程序。 本示例中,用户自定义JDBCServer的客户端,使用JDBC连接来进行表的创建、数据加载、查询和删除。 sparknormal-examples/SparkThriftServerScalaExample sparknormal-examples/StructuredStreamingADScalaExample 使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。 sparknormal-examples/StructuredStreamingStateScalaExample Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。 父主题: 概述
  • 开发流程 本文档主要基于Java API进行Storm拓扑的开发。 开发流程中各阶段的说明如图1和表1所示: 图1 拓扑开发流程 表1 Storm应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Storm的基本概念,了解场景需求,拓扑等。 常用概念 准备开发和运行环境 Storm的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。 Storm的运行环境即Storm客户端,请根据指导完成客户端的安装和配置。 准备开发和运行环境 准备工程 Storm提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 配置并导入样例工程 根据场景开发拓扑 提供了Storm拓扑的构造和Spout/Bolt开发过程。 开发程序 打包IntelliJ IDEA代码 Storm样例程序是在Linux环境下运行,需要将IntelliJ IDEA中的代码打包成jar包。 打包IntelliJ IDEA代码 打包业务 将IntelliJ IDEA代码生成的jar包与工程依赖的jar包,合并导出可提交的source.jar。 打包业务 提交拓扑 指导用户将开发好的程序提交运行。 提交拓扑 查看程序运行结果 指导用户提交拓扑后查看程序运行结果。 查看结果 父主题: 概述
  • 代码样例 下面代码片段在com.huawei.storm.example.common包的“RandomSentenceSpout”类的“nextTuple”方法中,作用在于将收到的字符串拆分成单词。 /** * {@inheritDoc} */ @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[] {"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; String sentence = sentences[random.nextInt(sentences.length)]; collector.emit(new Values(sentence)); }
  • 代码样例 下面代码片段在com.huawei.storm.example.common包的“SplitSentenceBolt”类的“execute”方法中,作用在于拆分每条语句为单个单词并发送。 /** * {@inheritDoc} */ @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); if (!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } } 下面代码片段在com.huawei.storm.example.wordcount.WordCountBolt类中,作用在于统计收到的每个单词的数量。 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.out.println("word: " + word + ", count: " + count); }
  • 操作步骤 将从IntelliJ IDEA中导出的jar包复制到Linux客户端指定目录(例如“/opt/jarsource”)。 若业务需要访问外部组件,其所依赖的配置文件请参考相关开发指引,获取到配置文件后将配置文件放在1中指定的目录下。 若业务需要访问外部组件,其所依赖的jar包请参考相关开发指引,获取到jar包后将jar包放在1中指定的目录下。 在Storm客户端安装目录“Storm/storm-1.2.1/bin”下执行打包命令,将上述jar包打成一个完整的业务jar包放入指定目录/opt/jartarget(可为任意空目录)。执行sh storm-jartool.sh /opt/jarsource/ /opt/jartarget命令后,会在“/opt/jartarget”下生成source.jar。
  • 功能分解 根据上述场景进行功能分解,如表1所示: 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Bolt 4 创建topology 请参见创建Topology 部分代码请参考代码样例说明,完整代码请参考Strom-examples示例工程。
  • 代码样例 下面代码片段在com.huawei.storm.example.wordcount包的“WordCountTopology”类的“main”方法中,作用在于构建应用程序并提交。 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.CMD); } private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception { switch (type) { case CMD: { cmdSubmit(builder, null); break; } case REMOTE: { remoteSubmit(builder); break; } case LOCAL: { localSubmit(builder); break; } } } /** * 命令行方式远程提交 * 步骤如下: * 打包成Jar包,然后在客户端命令行上面进行提交 * 远程提交的时候,要先将该应用程序和其他外部依赖(非excemple工程提供,用户自己程序依赖)的jar包打包成一个大的jar包 * 再通过storm客户端中storm -jar的命令进行提交 * * 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录 * * 运行命令如下: * ./storm jar ../example/example.jar com.huawei.storm.example.WordCountTopology */ private static void cmdSubmit(TopologyBuilder builder, Config conf) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException { if (conf == null) { conf = new Config(); } conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology()); } private static void localSubmit(TopologyBuilder builder) throws InterruptedException { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "替换为用户jar包地址"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } private static TopologyBuilder buildTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word")); return builder; } 如果拓扑开启了ack,推荐acker的数量不大于所设置的worker数量。
  • 操作步骤 将从IntelliJ IDEA打包出来的jar包放入指定文件夹(例如“D:\source”)。 在样例代码目录“src/storm-examples/storm-examples”下创建“lib”目录,将IntelliJ IDEA中导出的jar包复制到“lib”目录下,并解压。 若业务需要访问外部组件,其所依赖的配置文件请参考相关开发指引,获取到配置文件后将配置文件放在1中指定的目录下。 若业务需要访问外部组件,其所依赖的jar包请参考相关开发指引,获取到jar包后将jar包放在1中指定的目录下。 在IntelliJ IDEA样例工程的“tools”目录下找到打包工具:“storm-jartool.cmd”。 双击打包工具,输入要打包的jar包所在目录(“D:\source”)并回车,再输入打出包存放的目录(“D:\target”),在“D:\target”中,会生成“source.jar”文件。
  • 操作步骤 提交拓扑(以wordcount为例,其它拓扑请参照相关开发指引),进入Storm客户端“Storm/storm-1.2.1/bin”目录,将刚打出的source.jar提交(如果在Windows上进行的打包,则需要将Windows上的source.jar上传到Linux服务器,假定上传到“/opt/jartarget”目录),执行命令:storm jar /opt/jartarget/source.jar com.huawei.storm.example.wordcount.WordCountTopology 执行storm list命令,查看已经提交的应用程序,如果发现名称为word-count的应用程序,则说明任务提交成功。 如果业务设置为本地模式,且使用命令行方式提交时,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。
  • 操作步骤 修改WordCountTopology.java类,使用remoteSubmit方式提交应用程序。并替换Jar文件地址。 使用remoteSubmit方式提交应用程序 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.REMOTE); } 根据实际情况修改userJarFilePath为实际的拓扑Jar包地址 private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "D:\\example.jar"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } 执行WordCountTopology.java类的Main方法提交应用程序。
  • 部署运行及结果查看 导出本地jar包,请参见打包IntelliJ IDEA代码。 将1导出的本地Jar包,4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包业务。 执行命令提交拓扑。 storm jar /opt/jartarget/source.jar com.huawei.storm.example.hdfs.SimpleHDFSTopology hdfs-test 拓扑提交成功后请登录HDFS集群查看。
  • 对外接口 Storm-HDFS采用的接口同开源社区版本保持一致,详情参见:https://github.com/apache/storm/tree/v1.2.1/external/storm-hdfs。 Storm-HBase采用的接口同开源社区版本保持一致,详情参见:https://github.com/apache/storm/tree/v1.2.1/external/storm-hbase。 Storm-Kafka采用的接口同开源社区版本保持一致,详情参见:https://github.com/apache/storm/tree/v1.2.1/external/storm-kafka。 Storm-JDBC采用的接口同开源社区版本保持一致,详情参见:https://github.com/apache/storm/tree/v1.2.1/external/storm-jdbc。 父主题: 更多信息
  • 回答 旧插件storm-kafka中的KafkaSpout使用的是Kafka的“SimpleConsumer”接口,需要自主管理offset,KafkaSpout中根据用户定义的字段将Topic中每个Patition的offset记录在ZooKeeper中,定义如下: public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id;} 其中“hosts”是ZooKeeper的连接串,如:192.168.0.1:2181/kafka,“topic”是待消费的Topic名,“zkRoot”表示在ZooKeeper中的存放数据的根路径,一般为:“/kafka/{topic}”,“id”表示应用的标示,如:app1。读取offset会有以下两种场景: 场景1 当拓扑运行后,KafkaSpout会将offset存放在ZooKeeper路径:“/{zkRoot}/{id}/{partitionId}”下,其中“zkRoot”和“id”是用户指定的,“partitionId”是自动获取的。默认情况下,拓扑在启动后会先从ZooKeeper上的offset存放路径读取历史的offset,用作本次的消费起点,因此只需要正确的指定“zkRoot”和“id”,就可以继承历史记录的offset,不用从头开始消费。 场景2 没有像场景1中那样设置固定的“zkRoot”或者“id”,导致无法读取历史的offset,如此一来每次提交拓扑都会把历史已经消费过的数据再消费一遍,这时需要通过如下方式手动指定: SpoutConfig spoutConfig = new SpoutConfig(hosts, inputTopicName, zkRoot, appId);spoutConfig.ignoreZkOffsets = true;spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 通过指定SpoutConfig中的“ignoreZkOffsets”和“startOffsetTime”来强制消费最新的数据。
  • 简介 Yarn是一个分布式的资源管理系统,用于提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer还可以周期性的在已有的代码上进行修改,可是随着代码的增加以及原MapReduce框架设计的不足,在原MapReduce框架上进行修改变得越来越困难,所以MapReduce的committer决定从架构上重新设计MapReduce,使下一代的MapReduce(MRv2/Yarn)框架具有更好的扩展性、可用性、可靠性、向后兼容性和更高的资源利用率,以及能支持除了MapReduce计算框架外的更多的计算框架。
  • 基本概念 ResourceManager(RM) RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。 ApplicationMaster(AM) 用户提交的每个应用程序均包含一个AM,主要功能包括: 与RM调度器协商以获取资源(用Container表示)。 将得到的资源进一步分配给内部任务。 与NM通信以启动/停止任务。 监控所有任务的运行状态,并在任务运行失败时重新为任务申请资源以重启任务。 NodeManager(NM) NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它会接收并处理来自AM的Container启动/停止等各种请求。 Container Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。
  • 部署运行及结果查看 导出本地jar包,请参见打包IntelliJ IDEA代码。 将1中导出的本地Jar包,4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包业务。 执行命令提交拓扑。 storm jar /opt/jartarget/source.jar com.huawei.storm.example.hbase.SimpleHBaseTopology hbase-test 因为示例中的HBaseBolt并没有建表功能,在提交之前确保HBase中存在相应的表,若不存在需要手动建表,HBase shell建表语句如下create 'WordCount', 'cf'。 拓扑提交成功后请自行登录HBase集群查看。
  • 基本认证(Basic Authentication) 在HTTP中,基本认证是一种用来允许Web浏览器或其他客户端程序在请求时提供用户名和密码形式的身份凭证的一种登录验证方式。 在请求发送之前,用Basic加一个空格标识基本认证,以用户名追加一个冒号然后串接上密码,再将此字符串用Base64算法编码。 例如: 用户名是admin、密码是password,则拼接后的字符串就是admin:password,然后进行Base64编码,得到YWRtaW46QWRtaW5AMTIz,加上基本认证标识,得到Basic YWRtaW46QWRtaW5AMTIz,最终将编码后的字符串发送出去,由接收者解码得到一个由冒号分隔的用户名和密码的字符串。
  • 常用接口 YARN常用的Java类有如下几个。 ApplicationClientProtocol 用于Client与ResourceManager之间。Client通过该协议可实现将应用程序提交到ResourceManager上,查询应用程序的运行状态或者中止应用程序等功能。 表1 ApplicationClientProtocol常用方法 方法 说明 forceKillApplication(KillApplicationRequest request) Client通过此接口请求RM中止一个已提交的任务。 getApplicationAttemptReport(GetApplicationAttemptReportRequest request) Client通过此接口从RM获取指定ApplicationAttempt的报告信息。 getApplicationAttempts(GetApplicationAttemptsRequest request) Client通过此接口从RM获取所有ApplicationAttempt的报告信息。 getApplicationReport(GetApplicationReportRequest request) Client通过此接口从RM获取某个应用的报告信息。 getApplications(GetApplicationsRequest request) Client通过此接口从RM获取满足一定过滤条件的应用的报告信息。 getClusterMetrics(GetClusterMetricsRequest request) Client通过此接口从RM获取集群的Metrics。 getClusterNodes(GetClusterNodesRequest request) Client通过此接口从RM获取集群中的所有节点信息。 getContainerReport(GetContainerReportRequest request) Client通过此接口从RM获取某个Container的报告信息。 getContainers(GetContainersRequest request) Client通过此接口从RM获取某个ApplicationAttemp的所有Container的报告信息。 getDelegationToken(GetDelegationTokenRequest request) Client通过此接口获取授权票据,用于container访问相应的service。 getNewApplication(GetNewApplicationRequest request) Client通过此接口获取一个新的应用ID号,用于提交新的应用。 getQueueInfo(GetQueueInfoRequest request) Client通过此接口从RM中获取队列的相关信息。 getQueueUserAcls(GetQueueUserAclsInfoRequest request) Client通过此接口从RM中获取当前用户的队列访问权限信息。 moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest request) 移动一个应用到新的队列。 submitApplication(SubmitApplicationRequest request) Client通过此接口提交一个新的应用到RM。 ApplicationMasterProtocol 用于ApplicationMaster与ResourceManager之间。ApplicationMaster使用该协议向ResourceManager注册、申请资源、获取各个任务的运行情况等。 表2 ApplicationMasterProtocol常用方法 方法 说明 allocate(AllocateRequest request) AM通过此接口提交资源分配申请。 finishApplicationMaster(FinishApplicationMasterRequest request) AM通过此接口通知RM其运行成功或者失败。 registerApplicationMaster(RegisterApplicationMasterRequest request) AM通过此接口向RM进行注册。 ContainerManagementProtocol 用于ApplicationMaster与NodeManager之间。ApplicationMaster使用该协议要求NodeManager启动/中止Container或者查询Container的运行状态。 表3 ContainerManagementProtocol常用方法 方法 说明 getContainerStatuses(GetContainerStatusesRequest request) AM通过此接口向NM请求Containers的当前状态信息。 startContainers(StartContainersRequest request) AM通过此接口向NM提供需要启动的containers列表的请求。 stopContainers(StopContainersRequest request) AM通过此接口请求NM停止一系列已分配的Containers。
  • 回答 Spark部署时,如下jar包存放在客户端的“${SPARK_HOME}/jars/streamingClient010”目录以及服务端的“${BIGDATA_HOME}/FusionInsight_Spark2x_8.1.0.1/install/FusionInsight-Spark2x-3.1.1/spark/jars/streamingClient010”目录: kafka-clients-xxx.jar kafka_2.12-xxx.jar spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar spark-token-provider-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar 由于“$SPARK_HOME/jars/streamingClient010/*”默认没有添加到classpath,所以需要手动配置。 在提交应用程序运行时,在命令中添加如下参数即可,详细示例可参考编包并运行程序。 --jars $SPARK_CLIENT_HOME/jars/streamingClient010/kafka-client-2.4.0.jar,$SPARK_CLIENT_HOME/jars/streamingClient010/kafka_2.12-*.jar,$SPARK_CLIENT_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar 用户自己开发的应用程序以及样例工程都可使用上述命令提交。 但是Spark开源社区提供的KafkaWordCount等样例程序,不仅需要添加--jars参数,还需要配置其他,否则会报“ClassNotFoundException”错误,yarn-client和yarn-cluster模式下稍有不同。 yarn-client模式下 在除--jars参数外,在客户端“spark-defaults.conf”配置文件中,将“spark.driver.extraClassPath”参数值中添加客户端依赖包路径,如“$SPARK_HOME/jars/streamingClient010/*”。 yarn-cluster模式下 除--jars参数外,还需要配置其他,有三种方法任选其一即可,具体如下: 在客户端spark-defaults.conf配置文件中,在“spark.yarn.cluster.driver.extraClassPath”参数值中添加服务端的依赖包路径,如“${BIGDATA_HOME}/FusionInsight_Spark2x_8.1.0.1/install/FusionInsight-Spark2x-3.1.1/spark/jars/streamingClient010/*”。 将各服务端节点的“original-spark-examples_2.12-3.1.1-xxx.jar”包删除。 在客户端“spark-defaults.conf”配置文件中,修改或增加配置选项“spark.driver.userClassPathFirst” = “true”。
  • 问题 Structured Streaming的cluster模式,在数据处理过程中终止ApplicationManager,执行应用时显示如下异常。 2017-05-09 20:46:02,393 | INFO | main | client token: Token { kind: YARN_CLIENT_TOKEN, service: } diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete hdfs://hacluster/structuredtest/checkpoint/offsets to start over.; ApplicationMaster host: 10.96.101.170 ApplicationMaster RPC port: 0 queue: default start time: 1494333891969 final status: FAILED tracking URL: https://9-96-101-191:8090/proxy/application_1493689105146_0052/ user: spark2x | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)Exception in thread "main" org.apache.spark.SparkException: Application application_1493689105146_0052 finished with failed status
  • 回答 原因分析:显示该异常是因为“recoverFromCheckpointLocation”的值判定为false,但却配置了checkpoint目录。 参数“recoverFromCheckpointLocation”的值为代码中“outputMode == OutputMode.Complete()”语句的判断结果(outputMode的默认输出方式为“append”)。 处理方法:编写应用时,用户可以根据具体情况修改数据的输出方式。 将输出方式修改为“complete”,“recoverFromCheckpointLocation”的值会判定为true。此时配置了checkpoint目录时就不会显示异常。
  • 回答 由于checkpoint中包含了spark应用的对象序列化信息、task执行状态信息、配置信息等,因此,当存在以下问题时,从checkpoint恢复spark应用将会失败。 业务代码变更且变更类未明确指定SerialVersionUID。 spark内部类变更,且变更类未明确指定SerialVersionUID。 另外,由于checkpoint保存了部分配置项,因此可能导致业务修改了部分配置项后,从checkpoint恢复时,配置项依然保持为旧值的情况。当前只有以下部分配置会在从checkpoint恢复时重新加载。 "spark.yarn.app.id", "spark.yarn.app.attemptId", "spark.driver.host", "spark.driver.bindAddress", "spark.driver.port", "spark.master", "spark.yarn.jars", "spark.yarn.keytab", "spark.yarn.principal", "spark.yarn.credentials.file", "spark.yarn.credentials.renewalTime", "spark.yarn.credentials.updateTime", "spark.ui.filters", "spark.mesos.driver.frameworkId", "spark.yarn.jars"
  • REST API增强 SQL相关的命令:获取所有SQL语句和执行时间最长的SQL语句 SparkUI命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 可以在命令后的url路径增加相应的参数设置,搜索对应的SQL语句。 例如,查看100条sql语句: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?limit=100" 查看正在运行的参数: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?completed=false" JobHistory命令: curl -k -i --negotiate -u: "https://192.168.227.16:18080/api/v1/applications/application_1478570725074_0004/SQL" 其中192.168.227.16为JobHistory节点的业务IP,18080为JobHistory的端口号,application_1478570725074_0004为应用ID。 结果: SparkUI命令和JobHistory命令的查询结果均为: { "longestDurationOfCompletedSQL" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] } ], "sqls" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] }]} 结果分析: 通过这个命令,可以查询当前应用的所有SQL语句的信息(即结果中“sqls”的部分),执行时间最长的SQL语句的信息(即结果中“longestDurationOfCompletedSQL”的部分)。每个SQL语句的信息如下表3。 表3 SQL的常用信息 参数 描述 id SQL语句的ID status SQL语句的执行状态,有RUNNING、COMPLETED、FAILED三种 runningJobs SQL语句产生的job中,正在执行的job列表 successedJobs SQL语句产生的job中,执行成功的job列表 failedJobs SQL语句产生的job中,执行失败的job列表 JDBC Server相关的命令:获取连接数,正在执行的SQL数,所有session信息,所有SQL的信息 命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/sqlserver" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 结果: { "sessionNum" : 1, "runningSqlNum" : 0, "sessions" : [ { "user" : "spark", "ip" : "192.168.169.84", "sessionId" : "9dfec575-48b4-4187-876a-71711d3d7a97", "startTime" : "2016/10/29 15:21:10", "finishTime" : "", "duration" : "1 minute 50 seconds", "totalExecute" : 1 } ], "sqls" : [ { "user" : "spark", "jobId" : [ ], "groupId" : "e49ff81a-230f-4892-a209-a48abea2d969", "startTime" : "2016/10/29 15:21:13", "finishTime" : "2016/10/29 15:21:14", "duration" : "555 ms", "statement" : "show tables", "state" : "FINISHED", "detail" : "== Parsed Logical Plan ==\nShowTablesCommand None\n\n== Analyzed Logical Plan ==\ntableName: string, isTemporary: boolean\nShowTablesCommand None\n\n== Cached Logical Plan ==\nShowTablesCommand None\n\n== Optimized Logical Plan ==\nShowTablesCommand None\n\n== Physical Plan ==\nExecutedCommand ShowTablesCommand None\n\nCode Generation: true" } ]} 结果分析: 通过这个命令,可以查询当前JDBC应用的session连接数,正在执行的SQL数,所有的session和SQL信息。每个session的信息如下表4,每个SQL的信息如下表5。 表4 session常用信息 参数 描述 user 该session连接的用户 ip session所在的节点IP sessionId session的ID startTime session开始连接的时间 finishTime session结束连接的时间 duration session连接时长 totalExecute 在该session上执行的SQL数 表5 sql常用信息 参数 描述 user SQL执行的用户 jobId SQL语句包含的job id列表 groupId SQL所在的group id startTime SQL开始时间 finishTime SQL结束时间 duration SQL执行时长 statement 对应的语句 detail 对应的逻辑计划,物理计划 JDBC api增强通过beeline里面获取的executionID 取消当前正在执行的SQL 命令: curl -k -i --negotiate -X PUT -u: "https://192.168.195.232:8090/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/cancel/execution?executionId=8" 结果: 取消executionId 执行序号为8的job任务。 补充说明: spark-beeline里面执行SQL语句,如果该SQL语句产生spark任务,该SQL的executionId将会被打印在beeline里面,这个时候如果想取消这条sql的执行,可以用上述命令。 Streaming相关的命令:获取平均输入频率,平均调度时延,平均执行时长,总时延平均值 命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/streaming/statistics" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1477722033672_0008为在YARN中的应用ID。 结果: {"startTime" : "2018-12-25T08:58:10.836GMT", "batchDuration" : 1000, "numReceivers" : 1, "numActiveReceivers" : 1, "numInactiveReceivers" : 0, "numTotalCompletedBatches" : 373, "numRetainedCompletedBatches" : 373, "numActiveBatches" : 0, "numProcessedRecords" : 1, "numReceivedRecords" : 1, "avgInputRate" : 0.002680965147453083, "avgSchedulingDelay" : 14, "avgProcessingTime" : 47, "avgTotalDelay" : 62} 结果分析: 通过这个命令,可以查询当前Streaming应用的平均输入频率(events/sec),平均调度时延(ms),平均执行时长(ms),总时延平均值(ms)。
  • REST接口 通过以下命令可跳过REST接口过滤器获取相应的应用信息。 安全模式下,JobHistory仅支持https协议,故在如下命令的url中请使用https协议。 安全模式下,需要设置spark.ui.customErrorPage=false并重启spark2x服务 (JobHistory2x、JDBCServer2x和SparkResource2x三个实例对应的参数都需要修改)。 升级更新节点环境上的curl版本。具体curl版本升级方法如下: 下载curl安装包(http://curl.haxx.se/download/)。 使用如下命令进行安装包解压: tar -xzvf curl-x.x.x.tar.gz 使用如下命令覆盖安装: cd curl-x.x.x ./configure make make install 使用如下命令更新curl的动态链接库: ldconfig 安装成功后,重新登录节点环境,使用如下命令查看curl版本是否更新成功: curl --version 获取JobHistory中所有应用信息: 命令: curl -k -i --negotiate -u: "https://192.168.227.16:18080/api/v1/applications" 其中192.168.227.16为JobHistory节点的业务IP,18080为JobHistory的端口号。 结果: [ { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ]}, { "id" : "application_1517290848707_0145", "name" : "Spark shell", "attempts" : [ { "startTime" : "2018-01-31T15:20:31.286CST", "endTime" : "1970-01-01T07:59:59.999CST", "lastUpdated" : "2018-01-31T15:20:47.086CST", "duration" : 0, "sparkUser" : "admintest", "completed" : false, "startTimeEpoch" : 1517383231286, "endTimeEpoch" : -1, "lastUpdatedEpoch" : 1517383247086 } ]}] 结果分析: 通过这个命令,可以查询当前集群中所有的Spark应用(包括正在运行的应用和已经完成的应用),每个应用的信息如下表1。 表1 应用常用信息 参数 描述 id 应用的ID name 应用的Name attempts 应用的尝试,包含了开始时间、结束时间、执行用户、是否完成等信息 获取JobHistory中某个应用的信息: 命令: curl -k -i --negotiate -u: "https://192.168.227.16:18080/api/v1/applications/application_1517290848707_0008" 其中192.168.227.16为JobHistory节点的业务IP,18080为JobHistory的端口号,application_1517290848707_0008为应用的id。 结果: { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ]} 结果分析: 通过这个命令,可以查询某个Spark应用的信息,显示的信息如表1所示。 获取正在执行的某个应用的Executor信息: 针对alive executor命令: curl -k -i --negotiate -u: "https://192.168.169.84:8090/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/executors" 针对全部executor(alive&dead)命令: curl -k -i --negotiate -u: "https://192.168.169.84:8090/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/allexecutors" 其中192.168.169.84为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1478570725074_0046为在YARN中的应用ID。 结果: [{ "id" : "driver", "hostPort" : "192.168.169.84:23886", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 278019440, "executorLogs" : { }}, { "id" : "1", "hostPort" : "192.168.169.84:23902", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 1, "maxTasks" : 1, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 139, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 555755765, "executorLogs" : { "stdout" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stdout?start=-4096", "stderr" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stderr?start=-4096" }} ] 结果分析: 通过这个命令,可以查询当前应用的所有Executor信息(包括Driver),每个Executor的信息包含如下表2所示的常用信息。 表2 Executor常用信息 参数 描述 id Executor的ID hostPort Executor所在节点的ip:端口 executorLogs Executor的日志查看路径
  • 功能简介 Spark的REST API以JSON格式展现Web UI的一些指标,提供用户一种更简单的方法去创建新的展示和监控的工具,并且支持查询正在运行的app和已经结束的app的相关信息。开源的Spark REST接口支持对Jobs、Stages、Storage、Environment和Executors的信息进行查询,FusionInsight版本中添加了查询SQL、JDBC Server和Streaming的信息的REST接口。开源REST接口完整和详细的描述请参考官网上的文档以了解其使用方法:https://spark.apache.org/docs/3.1.1/monitoring.html#rest-api。
  • 问题 使用运行的Spark Streaming任务回写Kafka时,Kafka上接收不到回写的数据,且Kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
  • 问题 执行Spark Core应用,尝试收集大量数据到Driver端,当Driver端内存不足时,应用挂起不退出,日志内容如下。 16/04/19 15:56:22 ERROR Utils: Uncaught exception in thread task-result-getter-2java.lang.OutOfMemoryError: Java heap spaceat java.lang.reflect.Array.newArray(Native Method)at java.lang.reflect.Array.newInstance(Array.java:75)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:71)at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:91)at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:94)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:66)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1716)at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:56)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: Java heap spaceat java.lang.reflect.Array.newArray(Native Method)at java.lang.reflect.Array.newInstance(Array.java:75)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:71)at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:91)at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:94)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:66)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1716)at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:56)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从Kafka中读取数据,执行对应处理之后,然后将结果数据回写至Kafka中。 例如:Spark Streming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过Kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将Kafka的阈值调大,建议在FusionInsight Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 回答 用户尝试收集大量数据到Driver端,如果Driver端的内存不足以存放这些数据,那么就会抛出OOM(OutOfMemory)的异常,然后Driver端一直在进行GC,尝试回收垃圾来存放返回的数据,导致应用长时间挂起。 解决措施: 如果用户需要在OOM场景下强制将应用退出,那么可以在启动Spark Core应用时,在客户端配置文件“$SPARK_HOME/conf/spark-defaults.conf”中的配置项“spark.driver.extraJavaOptions”中添加如下内容: -XX:OnOutOfMemoryError='kill -9 %p'
共100000条