华为云用户手册

  • 监控项类型 Agent会自动发现系统采集的插件类型,并且将采集器实例化,形成监控项。监控项是实例化在一个环境上的。 由于采集器种类较多,会导致用户区分困难。系统后台会定义一些类型,每种采集器都会归到一种类型下,这样方便用户查看数据。 根据采集器的作用可以将监控项分为以下几种类型: 拓扑:拓扑图展示服务之间一段时间的调用关系,可以是从调用方统计的,也可以是从被调用方统计的,并且可以查看这个调用关系的趋势图。 接口调用:指外部服务调用当前应用的监控类型。 基础监控:用来监控系统性能的基础监控指标的监控类型。 异常:用来监控应用的异常信息。 外部调用:是指当前应用调用外部服务的监控类型。 数据库: 对数据库的访问进行监控。 缓存:是对Redis等缓存系统的监控,会采集指令级别的细粒度的指标数据。 web容器:对tomcat等web容器的监控,一般会采集系统总的处理线程数,busy线程数,连接数等;用于衡量系统总的容量。 消息队列:对kafka、RabbitMq等消息系统的监控,包含发送端和接收端的监控。在接收端的处理函数,可以产生调用链信息。 通信协议:对websocket等通信协议的监控。 远程过程调用:对GRPCServer等远程过程调用的监控。
  • 关联日志服务设置 应用性能管理(APM)与云日志服务(LTS)关联,您可以在 LTS中关联调用链的 TraceID 信息,当应用出现故障时,可以通过调用链的 TraceID 快速关联到业务日志,及时定位分析并解决问题。 关联业务日志与TraceId开关,开启后业务日志中会自动生成调用链的TraceId。如果关闭关联业务日志与TraceId开关,则关联日志服务设置不生效。 关联业务日志支持Log4j/Log4j2/Logback日志组件。 自定义设置只支持java类型。 进入组件配置页,在“关联日志服务设置”表单中填写相关信息。 图4 关联日志服务设置 表1 关联日志服务设置参数说明 参数 说明 项目 在下拉菜单中选择项目。 日志组 日志组(LogGroup)是云日志服务进行日志管理的基本单位,可以创建日志流以及设置日志存储时间,每个账号下可以创建100个日志组。创建日志组详细操作参见日志组。 日志流 日志流(LogStream)是日志读写的基本单位,日志组中可以创建日志流,将不同类型的日志分类存储,方便对日志进一步分类管理。详细操作参见日志流。 单击“保存”,弹出“关联日志服务”提示框。 图5 关联日志服务 单击“确认”,关联成功。 单击“复用到其他组件”,弹出“复用到其他组件”选择框。 图6 复用到其他组件 选择一个或多个组件,单击“保存并复用到其他组件”,则当前“关联日志服务”的配置,成功的复制到被选择的组件中。
  • Profiler性能剖析配置 进入组件配置页,在“Profiler性能剖析配置”表单中填写相关信息。 图7 Profiler性能剖析配置 表2 Profiler性能剖析配置说明 参数 说明 Profiler启停 打开后将低开销的应用进行持续诊断,解决Java程序中因为CPU、内存和时延导致的瓶颈问题。 开关默认为关闭,开关置灰。总开关关闭时,所有子开关都处于关闭状态。 CPU 开启后将采集应用运行过程中CPU火焰图信息。 开关默认为关闭,开关置灰。 内存 开启后将采集应用运行过程中堆内存火焰图信息。 开关默认为关闭,开关置灰。 延时 开启后将采集应用运行过程中时延火焰图信息。 开关默认为关闭,开关置灰。 生效范围 组件:组件级范围生效。该组件下关联的所有实例生成Profiler。开关默认为开启,组件按钮标记为蓝色。 实例:实例级范围生效。仅所选实例生成Profiler,实例开启或离线后不会在新的实例生成profiler。开关默认为关闭,开关置灰。 单击实例开关,开启“实例”,实例按钮标记为蓝色。 单击“选择实例”,选择所需要的实例。 图8 选择实例 选择1个或多个实例后,单击“确定”,设置成功。 图9 选择需要的实例 注意: 如果选择了实例级范围生效,则不能复制到其他组件/环境。 “组件”和“实例”只能选择其中一项,不支持同时选择。 单击“保存”,提示保存成功。 单击“复用到其他组件”,弹出“复用到其他组件”选择框。 图10 复用到其他组件 选择一个或多个组件,单击“复用到其他组件”,则当前“Profiler性能剖析配置”信息,成功的复制到被选择的组件中。
  • 修订记录 表1 修订记录 发布日期 修订记录 2024-06-15 前端界面优化。 2024-04-30 新增支持.Net支持OpenTelemetry。 2024-04-15 为部署在CCE容器中的JAVA应用安装Agent新增说明“CCE容器安装Agent建议使用自有探针”。 2024-03-04 新增开始监控C++应用、开始监控Android应用以及开始监控iOS应用。 2024-01-24 新增JavaAgent安装脚本。 2024-01-17 新增支持Node.js支持OpenTelemetry。 2023-12-11 新增支持Agent 2.4.5版本以及下载。 2023-10-30 新增支持OpenTelemetry。 2023-08-08 新增JAVA监控手动接入javaagent支持代理功能。 2023-07-15 新增入门实践章节。 2023-07-05 新增开始监控.Net应用章节。 2023-05-15 新增开始监控Php应用章节。 2023-04-14 新增开始监控Node.js应用、开始监控GO应用和开始监控Python应用章节。 2022-07-15 第一次正式发布。
  • URI GET AstroZero域名/u-route/baas/sys/v1.1/connectors/{connector_type}/{connector_name}/viewobject?object=X&x-image-process=Y 表1 路径参数 参数 是否必选 参数类型 描述 connector_type 是 String 连接器的类型。 obs:与OBS对接的连接器 minio:与MINIO对接的连接器 objectstorageproxy:对象存储代理类型 connector_name 是 String 连接器实例的名称,即创建连接器时命名的连接器实例名称。 object 是 String 文件所在桶里的全路径,包含文件名。 x-image-process 否 String 在OBS中定义,表示图片处理服务,更多介绍参见《图片处理特性指南》。 示例: 命令方式:x-image-process=image/commands 样式方式:x-image-process=style/stylename
  • 基本概念 使用AstroZero API涉及的常用概念。 账号 用户注册账号时,账号对其所拥有的资源及云服务具有完全的访问权限,可以重置用户密码、分配用户权限等。由于账号是付费主体,为了确保账号安全,建议您不要直接使用账号进行日常管理工作,而是创建用户并使用他们进行日常管理工作。 用户 由账号在IAM中创建的用户,是云服务的使用人员,具有身份凭证(密码和访问密钥)。在我的凭证下,您可以查看账号ID和用户ID。通常在调用API的鉴权过程中,您需要用到账号、用户和密码等信息。 在AstroZero中,用户访问AstroZero来开发应用、管理配置应用和业务用户。 业务用户 业务用户是指访问在AstroZero中开发的一个业务应用的用户账号。例如,设备维修管理业务应用由某用户A开发,使用该业务应用的客服人员、派单员及维修人员都是业务用户,用户A是用户,不是业务用户。 区域 区域是指云资源所在的物理位置,同一区域内可用区间内网互通,不同区域间内网不互通。通过在不同地区创建云资源,可以将应用程序设计的更接近特定客户的要求,或满足不同地区的法律或其他要求。 您可以从地区和终端节点中,查询服务所在的区域。 可用区(AZ,Availability Zone) 一个AZ是一个或多个物理数据中心的集合,有独立的风火水电,AZ内逻辑上再将计算、网络、存储等资源划分成多个集群。一个Region中的多个AZ间通过高速光纤相连,以满足用户跨AZ构建高可用性系统的需求。 项目 区域默认对应一个项目,这个项目由系统预置,用来隔离物理区域间的资源(计算资源、存储资源和网络资源),以默认项目为单位进行授权,用户可以访问您账号中该区域的所有资源。如果您希望进行更加精细的权限控制,可以在区域默认的项目中创建子项目,并在子项目中创建资源,然后以子项目为单位进行授权,使得用户仅能访问特定子项目中资源,使得资源的权限控制更加精确。 图1 项目隔离模型 在我的凭证下,您可以查看项目ID。 企业项目 企业项目是项目的升级版,针对企业不同项目间资源的分组和管理,是逻辑隔离。企业项目中可以包含多个区域的资源,且项目中的资源可以迁入迁出。 父主题: 使用前必读
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 操作步骤 准备依赖的Jar包和配置文件。 在Linux环境新建目录,例如“/opt/test”,并创建子目录“lib”和“src/main/resources/”。将样例工程中“lib”文件夹下的Jar包上传Linux环境的“lib”目录。将样例工程中“src/main/resources”文件夹下的配置文件上传到Linux环境的“src/main/resources”目录。 在IntelliJ IDEA工程中修改WordCountTopology.java类,使用remoteSubmit方式提交应用程序。并替换Jar文件地址。 使用remoteSubmit方式提交应用程序 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.REMOTE); } 修改userJarFilePath为Linux环境指定路径“/opt/test/lib/example.jar”。 private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "/opt/test/lib/example.jar "; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } 导出Jar包并上传到Linux环境。 参考打包IntelliJ IDEA代码执行打包,并将jar包命名为“example.jar”。 将导出的Jar包复制到Linux环境的“/opt/test/lib”目录下。 切换到“/opt/test”,执行以下命令,运行Jar包。 java -classpath /opt/test/lib/*:/opt/test/src/main/resources com.huawei.storm.example.wordcount.WordCountTopology
  • 场景说明 通过典型场景,用户可以快速学习和掌握Oozie的开发过程,并且对关键的接口函数有所了解。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,代码示例只涉及了MapReduce作业,其他作业的API调用代码是一样的,只是job配置“job.properties”与工作流配置“workflow.xml”不一样。 完成下载并导入样例工程操作后即可执行通过Java API提交MapReduce作业和查询作业状态。 父主题: JAVA开发
  • 操作场景 在程序代码完成开发后,您可以上传至Linux客户端环境中运行应用。使用Scala或Java语言开发的应用程序在Spark客户端的运行步骤是一样的。 使用Python开发的Spark应用程序无需打包成jar,只需将样例工程复制到编译机器上即可。 用户需保证worker和driver的Python版本一致,否则将报错:"Python in worker has different version %s than that in driver %s."。 用户需保证Maven已配置华为镜像站中SDK的Maven镜像仓库,具体可参考配置华为开源镜像仓
  • 查看调测结果 ClickHouse应用程序运行完成后,可通过以下方式查看程序运行情况: 通过运行结果查看程序运行情况。 通过ClickHouse日志获取应用运行情况。 即查看当前jar文件所在目录的“logs/clickhouse-example.log”日志文件,例如“客户端安装目录/JDBC/logs/clickhouse-example.log”。 jar包运行结果如下: 2021-06-10 20:53:56,028 | INFO | main | Current load balancer is 10.112.17.150:21426 | com.huawei.clickhouse.examples.Util.insertData(Util.java:128) 2021-06-10 20:53:58,247 | INFO | main | Inert batch time is 1442 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:53:59,649 | INFO | main | Inert batch time is 1313 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:05,872 | INFO | main | Inert batch time is 6132 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:10,223 | INFO | main | Inert batch time is 4272 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:11,614 | INFO | main | Inert batch time is 1300 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:12,871 | INFO | main | Inert batch time is 1200 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:14,589 | INFO | main | Inert batch time is 1663 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:16,141 | INFO | main | Inert batch time is 1500 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:17,690 | INFO | main | Inert batch time is 1498 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:19,206 | INFO | main | Inert batch time is 1468 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:145) 2021-06-10 20:54:19,207 | INFO | main | Inert all batch time is 22626 ms | com.huawei.clickhouse.examples.Util.insertData(Util.java:148) 2021-06-10 20:54:19,208 | INFO | main | Current load balancer is 10.112.17.150:21426 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:58) 2021-06-10 20:54:20,231 | INFO | main | Execute query:select * from mutong1.testtb_all order by age limit 10 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:63) 2021-06-10 20:54:21,266 | INFO | main | Execute time is 1035 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:67) 2021-06-10 20:54:21,267 | INFO | main | Current load balancer is 10.112.17.150:21426 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:58) 2021-06-10 20:54:21,815 | INFO | main | Execute query:select toYYYYMM(date),count(1) from mutong1.testtb_all group by toYYYYMM(date) order by count(1) DESC limit 10 | com.huawei.clickhouse.examples.Util.exeSql(Util.java:63) 2021-06-10 20:54:22,897 | INFO | main | Execute time is 1082 ms | com.huawei.clickhouse.examples.Util.exeSql(Util.java:67) 2021-06-10 20:54:22,898 | INFO | main | name age date | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,898 | INFO | main | huawei_266 0 2021-12-19 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_2500 0 2021-12-29 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_8980 0 2021-12-16 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_671 0 2021-12-29 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_2225 0 2021-12-12 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_6040 0 2021-12-14 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_7294 0 2021-12-10 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,899 | INFO | main | huawei_1133 0 2021-12-25 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | huawei_3161 0 2021-12-21 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | huawei_3992 0 2021-11-25 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | toYYYYMM(date) count() | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | 201910 2247 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | 202105 2213 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | 201801 2208 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,900 | INFO | main | 201803 2204 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201810 2167 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201805 2166 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201901 2164 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201908 2145 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 201912 2143 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144) 2021-06-10 20:54:22,901 | INFO | main | 202107 2137 | com.huawei.clickhouse.examples.Demo.queryData(Demo.java:144)
  • 准备开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示: 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端: Oracle JDK:支持1.8版本 IBM JDK:支持1.8.5.11版本 TaiShan客户端: OpenJDK:支持1.8.0_272版本 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发Storm应用程序的工具。版本要求:JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • REST API 功能简介 通过HTTP REST API来查看更多MapReduce任务的信息。目前Mapresuce的REST接口可以查询已完成任务的状态信息。完整和详细的接口请直接参考官网上的描述以了解其使用: http://hadoop.apache.org/docs/r3.1.1/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/HistoryServerRest.html 准备运行环境 在节点上安装客户端,例如安装到“/opt/client”目录,可参考“安装客户端”。 进入客户端安装目录“/opt/client”,执行下列命令初始化环境变量。 source bigdata_env 操作步骤 获取MapReduce上已完成任务的具体信息 命令: curl -k -i --negotiate -u : "http://10.120.85.2:19888/ws/v1/history/mapreduce/jobs" 其中10.120.85.2为MapReduce的“JHS_FLOAT_IP”参数的参数值,19888为JobHistoryServer的端口号。 在Red Hat 6.x以及CentOS 6.x版本,使用curl命令访问JobHistoryServer会有兼容性问题,导致无法返回正确结果。 用户能看到历史任务的状态信息(任务id,开始时间,结束时间,是否执行成功等信息) 运行结果 { "jobs":{ "job":[ { "submitTime":1525693184360, "startTime":1525693194840, "finishTime":1525693215540, "id":"job_1525686535456_0001", "name":"QuasiMonteCarlo", "queue":"default", "user":"mapred", "state":"SUCCEEDED", "mapsTotal":1, "mapsCompleted":1, "reducesTotal":1, "reducesCompleted":1 } ] } } 结果分析: 通过这个接口,可以查询当前集群中已完成的MapReduce任务,并且可以得到表1 表1 常用信息 参数 参数描述 submitTime 任务提交时间 startTime 任务开始执行时间 finishTime 任务执行完成时间 queue 任务队列 user 提交这个任务的用户 state 任务执行成功或失败 父主题: 常用API介绍
  • SparkSQL常用接口 Spark SQL中在Python中重要的类有: pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。 pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。 pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。 pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。 pyspark.sql.functions:DataFrame中内嵌的函数。 pyspark.sql.Window:sql中提供窗口功能。 表4 Spark SQL常用的Action 方法 说明 collect() 返回一个数组,包含DataFrame的所有列。 count() 返回DataFrame中的行数。 describe() 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first() 返回第一行。 head(n) 返回前n行。 show() 用表格形式显示DataFrame。 take(num) 返回DataFrame中的前num行。 表5 基本的DataFrame Functions 方法 说明 explain() 打印出SQL语句的逻辑计划和物理计划。 printSchema() 打印schema信息到控制台。 registerTempTable(name) 将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 toDF() 返回一个列重命名的DataFrame。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dsteam.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming常用接口介绍 方法 说明 socketTextStream(hostname, port, storageLevel) 从TCP源主机:端口创建一个输入流。 start() 启动Spark Streaming计算。 awaitTermination(timeout) 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext, stopGraceFully) 终止Spark Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接受到的数据处理完成。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义State和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(other,numPartitions) 实现不同的Spark Streaming之间做合并操作。
  • 准备开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示: 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端: Oracle JDK:支持1.8版本 IBM JDK:支持1.8.5.11版本 TaiShan客户端: OpenJDK:支持1.8.0_272版本 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发Storm应用程序的工具。版本要求:JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • SparkSQL常用接口 Spark SQL中在Python中重要的类有: pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。 pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。 pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。 pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。 pyspark.sql.functions:DataFrame中内嵌的函数。 pyspark.sql.Window:sql中提供窗口功能。 表4 Spark SQL常用的Action 方法 说明 collect() 返回一个数组,包含DataFrame的所有列。 count() 返回DataFrame中的行数。 describe() 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first() 返回第一行。 head(n) 返回前n行。 show() 用表格形式显示DataFrame。 take(num) 返回DataFrame中的前num行。 表5 基本的DataFrame Functions 方法 说明 explain() 打印出SQL语句的逻辑计划和物理计划。 printSchema() 打印schema信息到控制台。 registerTempTable(name) 将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 toDF() 返回一个列重命名的DataFrame。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dsteam.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming常用接口介绍 方法 说明 socketTextStream(hostname, port, storageLevel) 从TCP源主机:端口创建一个输入流。 start() 启动Spark Streaming计算。 awaitTermination(timeout) 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext, stopGraceFully) 终止Spark Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接受到的数据处理完成。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义State和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(other,numPartitions) 实现不同的Spark Streaming之间做合并操作。
  • 操作步骤 进入工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包。 将导出的Jar包上传至Linux运行环境的任意目录下,例如“/optclient”。 将工程中的“lib”文件夹和“conf”文件夹上传至和Jar包相同的Linux运行环境目录下,例如“/opt/client”(其中“lib”目录汇总包含了工程中依赖的所有的Jar包,“conf”目录包含运行jar包所需的集群相关配置文件,请参考准备运行环境)。 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令运行Jar包。 java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.HdfsExample java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample:时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 操作步骤 进入样例工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package “{maven_setting_path}”为本地Maven的“settings.xml”文件路径,例如“C:\Users\Developer\settings.xml”。 打包成功之后,在工程根目录的“target”子目录下获取打好的jar包,例如“HDFSTest-XXX.jar”,jar包名称以实际打包结果为准。 将导出的Jar包上传至集群客户端运行环境的任意目录下,例如“/opt/client”,然后在该目录下创建“conf”目录,将需要的配置文件复制至“conf”目录,具体操作请参考准备运行环境。 配置环境变量: cd /opt/client source bigdata_env 执行如下命令,运行Jar包。 hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.HdfsExample hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • Structured Streaming支持的功能 支持对流式数据的ETL操作。 支持流式DataFrames或Datasets的schema推断和分区。 流式DataFrames或Datasets上的操作:包括无类型,类似SQL的操作(比如select、where、groupBy),以及有类型的RDD操作(比如map、filter、flatMap)。 支持基于Event Time的聚合计算,支持对迟到数据的处理。 支持对流式数据的去除重复数据操作。 支持状态计算。 支持对流处理任务的监控。 支持批流join,流流join。 当前JOIN操作支持列表如下: 左表 右表 支持的Join类型 说明 Static Static 全部类型 即使在流处理中,不涉及流数据的join操作也能全部支持 Stream Static Inner 支持,但是无状态 Left Outer 支持,但是无状态 Right Outer 不支持 Full Outer 不支持 Stream Stream Inner 支持,左右表可选择使用watermark或者时间范围进行状态清理 Left Outer 有条件的支持,左表可选择使用watermark进行状态清理,右表必须使用watermark+时间范围 Right Outer 有条件的支持,右表可选择使用watermark进行状态清理,左表必须使用watermark+时间范围 Full Outer 不支持
  • Structured Streaming不支持的功能 不支持多个流聚合。 不支持limit、first、take这些取N条Row的操作。 不支持Distinct。 只有当output mode为complete时才支持排序操作。 有条件地支持流和静态数据集之间的外连接。 不支持部分DataSet上立即运行查询并返回结果的操作: count():无法从流式Dataset返回单个计数,而是使用ds.groupBy().count()返回一个包含运行计数的streaming Dataset。 foreach():使用ds.writeStream.foreach(...)代替。 show():使用输出console sink代替。
  • Structured Streaming可靠性说明 Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。 从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下: 不允许source的个数或者类型发生变化。 source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如: 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...) 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic") sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如: File sink允许变更为kafka sink,kafka中只处理新数据。 kafka sink不允许变更为file sink。 kafka sink允许变更为foreach sink,反之亦然。 sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如: 不允许file sink的输出路径发生变更。 允许Kafka sink的输出topic发生变更。 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。 Projection、filter和map-like操作变更,局部场景下能够支持,例如: 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...) Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。 状态操作的变更,在部分场景下会导致状态恢复失败: Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。 Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。 Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。 Source的容错性支持列表 Sources 支持的Options 容错支持 说明 File source path:必填,文件路径 maxFilesPerTrigger:每次trigger最大文件数(默认无限大) latestFirst:是否有限处理新文件(默认值: false) fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false) 支持 支持通配符路径,但不支持以逗号分隔的多个路径。 文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。 Socket Source host:连接的节点ip,必填 port:连接的端口,必填 不支持 - Rate Source rowsPerSecond:每秒产生的行数,默认值1 rampUpTime:在达到rowsPerSecond速度之前的上升时间 numPartitions:生成数据行的并行度 支持 - Kafka Source 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html 支持 - Sink的容错性支持列表 Sinks 支持的output模式 支持Options 容错性 说明 File Sink Append Path:必须指定 指定的文件格式,参见DataFrameWriter中的相关接口 exactly-once 支持写入分区表,按时间分区用处较大 Kafka Sink Append, Update, Complete 参见:https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html at-least-once 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html Foreach Sink Append, Update, Complete None 依赖于ForeachWriter实现 参见https://spark.apache.org/docs/3.1.1/structured-streaming-programming-guide.html#using-foreach ForeachBatch Sink Append, Update, Complete None 依赖于算子实现 参见https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch Console Sink Append, Update, Complete numRows:每轮打印的行数,默认20 truncate:输出太长时是否清空,默认true 不支持容错 - Memory Sink Append, Complete None 不支持容错,在complete模式下,重启query会重建整个表 -
  • 准备开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Windows或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • 数据库配置—Derby数据库配置过程 首先应下载一个数据库,可根据具体场景选择最适合的数据库。 该任务以Derby数据库为例。Derby是一个小型的,java编写的,易于使用却适合大多数应用程序的开放源码数据库。 Derby数据库的获取。在官网下载最新版的Derby数据库,将下载下来的数据库将传入Linux客户端(如"/opt"),并解压。 在Derby的安装目录下,进入bin目录,输入如下命令: export DERBY_INSTALL=/opt/db-derby-10.12.1.1-bin export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:. export DERBY_HOME=/opt/db-derby-10.12.1.1-bin . setNetworkServerCP ./startNetworkServer -h 主机名 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定) CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT ); CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT ); INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);
  • 操作场景 本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。 本章节只适用Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
  • 应用开发操作步骤 确认华为MRS产品Storm组件已经安装,且正常运行。 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见环境准备。 工程导入后,修改样例工程“resources/flux-examples”目录下的“jdbc.properties”文件,根据实际环境信息修改相关参数。 #配置JDBC服务端IP地址 JDBC_SERVER_NAME= #配置JDBC服务端端口 JDBC_PORT_NUM= #配置JDBC登录用户名 JDBC_USER_NAME= #配置JDBC登录用户密码 #密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全 JDBC_PASSWORD= #配置database表名 JDBC_BASE_TBL= 在Linux环境下安装Storm客户端。 集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
  • 参考信息 Flink客户端lib目录、opt目录中都有flink jar包,其中lib目录中默认是flink核心jar包,opt目录中是对接外部组件的jar包(例如flink-connector-kafka*.jar),若应用开发中需要请手动复制相关jar包到lib目录中。 针对Flink提供的几个样例工程,其对应的运行依赖包如下: 表1 样例工程依赖包 样例工程 依赖包 说明 DataStream程序样例工程(Java/Scala) flink-dist_*.jar 在Flink的客户端或者服务端安装路径的lib目录下获取。 异步Checkpoint机制程序样例工程(Java/Scala) 向Kafka生产并消费数据程序样例工程(Java/Scala) kafka-clients-*.jar 由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar 在Flink客户端或者服务端安装路径的opt目录下获取。 pipeline程序样例工程(Java/Scala) flink-dist_*.jar 在Flink的客户端或者服务端安装路径的lib目录下获取。 flink-connector-netty_*.jar 在二次开发样例代码编译后产生的lib文件夹下获取。 curator-client-2.12.0.jar curator-framework-2.12.0.jar Stream SQL Join样例工程(Java) flink-dist_2.11*.jar 在Flink的客户端或者服务端安装路径的lib目录下获取。 kafka-clients-*.jar 由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_2.11*.jar 在Flink客户端或者服务端安装路径的opt目录下获取。 flink-table_2.11*.jar -
  • 操作步骤 进入样例工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包,例如“HDFSTest-XXX.jar”,jar包名称以实际打包结果为准。 将导出的Jar包上传至Linux客户端运行环境的任意目录下,例如“/opt/client”。 配置环境变量: cd /opt/client source bigdata_env 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令,运行Jar包。 hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.HdfsExample hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 dstream.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表4 Spark Streaming方法介绍 方法 说明 socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] 从TCP源主机:端口创建一个输入流。 start():Unit 启动Spark Streaming计算。 awaitTermination(timeout: long):Unit 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 终止Spark Streaming计算。 transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ? RDD[T])(implicit arg0: ClassTag[T]): DStream[T] 对每一个RDD应用function操作得到一个新的DStream。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义状态和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(otherStream, [numTasks]) 实现不同的Spark Streaming之间做合并操作。 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。 表5 Spark Streaming增强特性接口 方法 说明 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。
共100000条