华为云用户手册

  • 准备开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Linux系统 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 安装和配置IDEA 用于开发Flink应用程序的工具。版本要求:2019.1或其他兼容版本。 安装Scala Scala开发环境的基本配置。版本要求:2.11.7。 安装Scala插件 Scala开发环境的基本配置。版本要求:1.5.4。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • 操作步骤 打开IDEA工具,选择“Create New Project”。 图1 创建工程 在“New Project”页面,选择“Scala”开发环境,并选择“Scala Module”,然后单击“Next”。 如果您需要新建Java语言的工程,选择对应参数即可。 图2 选择开发环境 在工程信息页面,填写工程名称和存放路径,设置JDK版本和Scala SDK,然后单击“Finish”完成工程创建。 图3 填写工程信息
  • Flink应用程序开发流程 Flink开发流程参考如下步骤: 图1 Flink应用程序开发流程 表1 Flink应用开发的流程说明 阶段 说明 参考章节 了解基本概念 在开始开发应用前,需要了解Flink的基本概念。 常用概念 准备开发和运行环境 Flink的应用程序支持使用Scala、Java两种语言进行开发。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。Flink的运行环境即Flink客户端,请根据指导完成客户端的安装和配置。 准备开发环境 准备连接集群配置文件 准备工程 Flink提供了样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个Flink工程。 配置并导入样例工程 新建工程(可选) 准备安全认证 如果您使用的是安全集群,需要进行安全认证。 准备安全认证 根据场景开发工程 提供了Scala、Java两种不同语言的样例工程,帮助用户快速了解Flink各部件的编程接口。 开发程序 编译并运行程序 指导用户将开发好的程序编译并提交运行。 编包并运行程序 查看程序运行结果 程序运行结果会写在用户指定的路径下,用户还可以通过UI查看应用运行情况。 查看调测结果 调优程序 您可以根据程序运行情况,对程序进行调优,使其性能满足业务场景需求。 调优完成后,请重新进行编译和运行。 组件操作指南中的“Flink性能调优”
  • 基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过Socket、Kafka和文件等形式导入,在Flink系统处理后,在通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
  • 简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强,具体请参考:https://ci.apache.org/projects/flink/flink-docs-release-1.15。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • 结构 Flink结构如图2所示。 图2 Flink结构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 Flink系统提供的关键能力: 低时延 提供ms级时延的处理能力。 Exactly Once 提供异步快照机制,保证所有数据真正只处理一次。 HA JobManager支持主备模式,保证无单点故障。 水平扩展能力 TaskManager支持手动水平扩展。
  • 常用概念 客户端 客户端直接面向用户,可通过Java API、Thrift API访问服务端进行Hive的相关操作。 HQL语言 Hive Query Language,类SQL语句。 HCatalog HCatalog是建立在Hive元数据之上的一个表信息管理层,吸收了Hive的DDL命令。为MapReduce提供读写接口,提供Hive命令行接口来进行数据定义和元数据查询。基于MRS的HCatalog功能,Hive、MapReduce开发人员能够共享元数据信息,避免中间转换和调整,能够提升数据处理的效率。 WebHCat WebHCat运行用户通过Rest API来执行Hive DDL,提交MapReduce任务,查询MapReduce任务执行结果等操作。 父主题: 概述
  • 创建库 以下代码片段在com.huawei.clickhouse.examples包的“Demo”类的createDatabase方法中。通过on cluster语句在集群中创建表1中以databaseName参数值为数据库名的数据库。 private void createDatabase(String databaseName, String clusterName) throws Exception { String createDbSql = "create database if not exists " + databaseName + " on cluster " + clusterName; util.exeSql(createDbSql);} 父主题: 样例代码说明
  • ClickHouse样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下ClickHouse相关样例工程: 表1 ClickHouse相关样例工程 样例工程位置 描述 clickhouse-examples 指导用户基于Java语言,实现MRS集群中的ClickHouse的数据表创建、删除以及数据的插入、查询等操作。 本工程中包含了建立服务端连接、创建数据库、创建数据表、插入数据、查询数据及删除数据表等操作示例。 父主题: 概述
  • 建立连接 创建连接时传入表1中配置的user和password作为认证凭据,ClickHouse会带着用户名和密码在服务端进行安全认证。 ClickHouseDataSource clickHouseDataSource =new ClickHouseDataSource(JDBC_PREFIX + serverList.get(tries - 1), clickHouseProperties);connection = clickHouseDataSource.getConnection(user, password); 父主题: 样例代码说明
  • 开发思路 ClickHouse作为一款独立的DBMS系统,使用SQL语言就可以进行常见的操作。开发程序示例中,全部通过clickhouse-jdbc API接口来进行描述,开发流程主要分为以下几部分: 设置属性:设置连接ClickHouse服务实例的参数属性。 建立连接:建立和ClickHouse服务实例的连接。 创建库:创建ClickHouse数据库。 创建表:创建ClickHouse数据库下的表。 插入数据:插入数据到ClickHouse表中。 查询数据:查询ClickHouse表数据。 删除表:删除已创建的ClickHouse表。
  • 开发流程 开发流程中各阶段的说明如ClickHouse应用程序开发流程和表1所示。 图1 ClickHouse应用程序开发流程 表1 ClickHouse应用开发的流程说明 阶段 说明 参考文档 准备开发环境 在进行应用开发前,需首先准备开发环境,ClickHouse的应用程序支持多种语言开发,推荐使用Java语言,使用IntelliJ IDEA工具,同时完成JDK、Maven等初始配置。 准备开发环境 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括集群信息文件以及用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts域名信息。 准备连接集群配置文件 配置并导入样例工程 ClickHouse提供了不同场景下的样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置并导入样例工程 根据业务场景开发程序 提供样例工程,帮助用户快速了解ClickHouse各部件的编程接口。 开发程序 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。 在Windows下调测程序 在Linux下调测程序 父主题: 概述
  • 设置属性 设置连接属性,如下样例代码设置socket超时时间为60s。 ClickHouseProperties clickHouseProperties = new ClickHouseProperties();clickHouseProperties.setSocketTimeout(60000); 如果配置并导入样例工程中的“clickhouse-example.properties”配置文件中“sslUsed”参数配置为“true”时,则需要设置如下连接属性: clickHouseProperties.setSsl(true);clickHouseProperties.setSslMode("none"); 父主题: 样例代码说明
  • ClickHouse简介 ClickHouse是面向联机分析处理的列式数据库,支持SQL查询,且查询性能好,特别是基于大宽表的聚合分析查询性能非常优异,比其他分析型数据库速度快一个数量级。 ClickHouse的设计优点: 数据压缩比高 多核并行计算 向量化计算引擎 支持嵌套数据结构 支持稀疏索引 支持数据Insert和Update ClickHouse的应用场景: 实时数仓场景 使用流式计算引擎(如Flink)把实时数据写入ClickHouse,借助ClickHouse的优异查询性能,在亚秒级内响应多维度、多模式的实时查询分析请求。 离线查询场景 把规模庞大的业务数据导入到ClickHouse,构造数亿至数百亿记录规模、数百以上的维度的大宽表,随时进行个性化统计和持续探索式查询分析,辅助商业决策,具有非常好的查询体验。
  • ClickHouse样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下ClickHouse相关样例工程: 表1 ClickHouse相关样例工程 样例工程位置 描述 clickhouse-examples 指导用户基于Java语言,实现MRS集群中的ClickHouse的数据表创建、删除以及数据的插入、查询等操作。 本工程中包含了建立服务端连接、创建数据库、创建数据表、插入数据、查询数据及删除数据表等操作示例。 父主题: 概述
  • 创建库 以下代码片段在com.huawei.clickhouse.examples包的“Demo”类的createDatabase方法中。通过on cluster语句在集群中创建表1中以databaseName参数值为数据库名的数据库。 private void createDatabase(String databaseName, String clusterName) throws Exception { String createDbSql = "create database if not exists " + databaseName + " on cluster " + clusterName; util.exeSql(createDbSql);} 父主题: 样例代码说明
  • 批量删除分区策略 当批量删除分区时,例如删除一个月所有的分区数据,可以使用的方式有两种: 把所有的分区列出来,然后批量删除。假设city的数量有100个,那么删除一个月的所有分区需要列出3100个分区,然后批量删除。 alter table target drop partition(city=c1,date=p1), partition(city=c2,date=p2),… 只列出一个月的时间分区,然后批量删除。这种方式只需要列出31个时间分区。 alter table target drop partition(date=p1), partition(date=p2)
  • 建立连接 创建连接时传入表1中配置的user和password作为认证凭据,ClickHouse会带着用户名和密码在服务端进行安全认证。 ClickHouseDataSource clickHouseDataSource =new ClickHouseDataSource(JDBC_PREFIX + serverList.get(tries - 1), clickHouseProperties);connection = clickHouseDataSource.getConnection(user, password); 父主题: 样例代码说明
  • Spark SQL动态分区插入优化之sortby 如下SQL中,p1和p2是target表的分区字段,使用sort by关键字来减少小文件的产生。 insert overwrite table target partition(p1,p2)select * from sourcedistribute by p1, p2sort by p1,p2 经过动态分区Shuffle优化之后,每一个Hive分区的数据都会集中在一个Spark Task中,但是由于Hive分区的数量远远大于Task数量,所以一个Task中会包含多个Hive分区的数据,即Task与Hive分区的关系是一对多。 每一个Task会将其包含的数据按照行顺序写入文件,文件所在的分区由该行数据中的分区字段值决定。Task在写入第一行数据时会创建一个新文件,随后写的每行数据都会判断该行数据的分区字段值与上一行数据的分区字段值是否相同,如果不相同就会新建一个文件并将该行数据写入,否则将该行数据写入上一条数据所在的文件。因此在Task写动态分区数据时,相邻两行数据如果分区字段值相同,就会写入同一个文件,否则就会写入不同的文件。假设Task有N行数据,在最坏情况下,所有相邻数据的分区字段值都不相同,那么Task将会写N个文件,每个文件只有一行数据。 为了将一个Task中相同分区的数据集中在一起,减少Task写的文件数量,需要将数据按照分区字段进行排序。假设一个Task中包含M个分区数据,排序之后,一个Task中相同分区的数据就会相邻,最终一个Task只会写M个文件。在Spark SQL中增加sort by关键词可完成排序功能。 在Spark SQL中使用动态分区写入数据,需要同时使用distribute by和sort by才能有效减少小文件数量。
  • 开发流程 开发流程中各阶段的说明如图1和表1所示。 图1 ClickHouse应用程序开发流程 表1 ClickHouse应用开发的流程说明 阶段 说明 参考文档 准备开发环境 在进行应用开发前,需首先准备开发环境,ClickHouse的应用程序支持多种语言开发,推荐使用Java语言,使用IntelliJ IDEA工具,同时完成JDK、Maven等初始配置。 准备开发环境 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括集群信息文件以及用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts域名信息。 准备连接集群配置文件 配置并导入样例工程 ClickHouse提供了不同场景下的样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置并导入样例工程 根据业务场景开发程序 提供样例工程,帮助用户快速了解ClickHouse各部件的编程接口。 开发程序 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。 在Windows下调测程序 在Linux下调测程序 父主题: 概述
  • RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel: StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel: StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking: Boolean = true),将该RDD从持久化列表中移除,并将RDD的存储级别重新设置为StorageLevel.NONE。
  • Spark建议使用Commit V2算法 在Spark提交作业时配置参数spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2,使用commit v2算法,例如在DataArts Studio提交作业增加—conf配置项,值为spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2。 Spark默认采用Commit V1算法,该算法原理为:Task执行完毕后将数据写入了临时目录,Driver等待所有Task执行完毕后,串行的将每个Task的临时输出文件移动到最终的目录中。如果输出的小文件过多,Driver串行移动文件耗时会过长,最终导致Commit过程比较耗时。 将Commit算法修改为V2,使得每个Task在执行成功后将临时文件移动到最终目录,相当于Driver中串行的移动操作优化为Task并行的移动文件操作。V2算法的缺点在于Spark作业执行过程中,最终目录的文件是对外可见的,如果此时有其他程序读取了最终目录的数据,那么其他程序处理的数据出现不一致问题。 然而使用Spark写Hive表,Spark作业的最终目录也是一个临时目录,通过load操作将临时目录数据导入hive表,所以Hive表的目录才是真正的最终目录,外部作业是无法读取到中间临时生成目录,因此针对Spark写Hive场景推荐使用Commit V2算法。
  • Spark SQL动态分区插入优化之Distributeby 如下SQL中,p1和p2是target表的分区字段,使用distribute by关键字来减少小文件的产生。 insert overwrite table target partition(p1,p2)select * from sourcedistribute by p1, p2 Spark程序以动态分区的方式写入Hive表时,会出现了大量的小文件,导致最后移动文件到hive表目录非常耗时,这是因为在Shuffle时Hive多个分区的数据随机落到Spark的多个Task中,此时Task与Hive分区数据的关系是多对多,即每个Task会包含多个分区的部分数据,每个Task中包含的每个分区的数据都很少,最终会导致Task写多个分区文件,每个分区文件都比较小。 为了减少小文件的数量,需要将数据按照分区字段进行Shuffle,将各个分区的数据尽量各自集中在一个Task,在Spark SQL中就是通过distribute by关键字来完成这个功能的。 当使用distribute by关键字在后出现了数据倾斜,即有的分区数据多,有的分区数据少,也会导致spark 作业整体耗时长。需要在distribute by后面增加随机数,例如: insert overwrite table target partition(p1,p2)select * from sourcedistribute by p1, p2, cast(rand() * N as int) N值可以在文件数量和倾斜度之间做权衡。 在Spark SQL中使用动态分区写入数据,需要同时使用distribute by和sort by才能有效减少小文件数量。
  • 操作步骤 查看Spark应用运行结果数据。 结果数据存储路径和格式已经由Spark应用程序指定,可通过指定文件获取。 查看Spark应用程序运行情况。 Spark主要有两个Web页面。 Spark UI页面,用于展示正在执行的应用的运行情况。 页面主要包括了Jobs、Stages、Storage、Environment和Executors五个部分。Streaming应用会多一个Streaming标签页。 页面入口:在YARN的Web UI界面,查找到对应的Spark应用程序。单击应用信息的最后一列“ApplicationMaster”,即可进入SparkUI页面。 History Server页面,用于展示已经完成的和未完成的Spark应用的运行情况。 页面包括了应用ID、应用名称、开始时间、结束时间、执行时间、所属用户等信息。单击应用ID,页面将跳转到该应用的SparkUI页面。 查看Spark日志获取应用运行情况。 您可以查看Spark日志了解应用运行情况,并根据日志信息调整应用程序。相关日志信息可参考Spark2x日志介绍。
  • 设置属性 设置连接属性,如下样例代码设置socket超时时间为60s。 ClickHouseProperties clickHouseProperties = new ClickHouseProperties();clickHouseProperties.setSocketTimeout(60000); 如果配置并导入样例工程中的“clickhouse-example.properties”配置文件中“sslUsed”参数配置为“true”时,则需要设置如下连接属性: clickHouseProperties.setSsl(true);clickHouseProperties.setSslMode("none"); 父主题: 样例代码说明
  • 数据规划 在kafka中生成模拟数据(需要有Kafka权限用户)。 确保集群安装完成,包括安装HDFS、Yarn、Spark2x和Kafka服务。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行程序章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaProducer {brokerlist} {topic} {number of events produce every 0.02s} 示例: java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingState-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaProducer xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005 mytopic 10
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考编包并运行程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 数据规划 将cluster2集群的所有Zookeeper节点和HBase节点的IP和主机名配置到cluster1集群的客户端节点的“/etc/hosts”文件中。 分别将cluster1和cluster2集群Spark2x客户端conf下的hbase-site.xml文件放到“/opt/example/A”,“/opt/example/B”两个目录下。 用spark-submit提交命令: 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。 spark-submit --master yarn --deploy-mode client --files /opt/example/B/hbase-site.xml --keytab /opt/FIclient/user.keytab --principal sparkuser --class com.huawei.spark.examples.SparkOnMultiHbase /opt/example/SparkOnMultiHbase-1.0.jar
  • 数据规划 在kafka中生成模拟数据(需要有Kafka权限用户)。 java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaADEventProducer {BrokerList} {timeOfProduceReqEvent} {eventTimeBeforeCurrentTime} {reqTopic} {reqEventCount} {showTopic} {showEventMaxDelay} {clickTopic} {clickEventMaxDelay} 确保集群安装完成,包括HDFS、Yarn、Spark2x和Kafka。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行程序章节中导出jar包的操作步骤。 命令举例: java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingADScalaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaADEventProducer 10.132.190.170:21005,10.132.190.165:21005 2h 1h req 10000000 show 5m click 5m 此命令将在kafka上创建3个topic:req、show、click,在2h内生成1千万条请求事件数据,请求事件的时间取值范围为{当前时间-1h 至 当前时间},并为每条请求事件随机生成0-5条展示事件,展示事件的时间取值范围为{请求事件时间 至请求事件时间+5m },为每条展示事件随机生成0-5条点击事件,点击事件的时间取值范围为{展示事件时间 至展示事件时间+5m }
  • 场景说明 假定一个广告业务,存在广告请求事件、广告展示事件、广告点击事件,广告主需要实时统计有效的广告展示和广告点击数据。 已知: 终端用户每次请求一个广告后,会生成广告请求事件,保存到kafka的adRequest topic中。 请求一个广告后,可能用于多次展示,每次展示,会生成广告展示事件,保存到kafka的adShow topic中。 每个广告展示,可能会产生多次点击,每次点击,会生成广告点击事件,保存到kafka的adClick topic中。 广告有效展示的定义如下: 请求到展示的时长超过A分钟算无效展示。 A分钟内多次展示,每次展示事件为有效展示。 广告有效点击的定义如下: 展示到点击时长超过B分钟算无效点击。 B分钟内多次点击,仅首次点击事件为有效点击。 基于此业务场景,模拟简单的数据结构如下: 广告请求事件 数据结构:adID^reqTime 广告展示事件 数据结构:adID^showID^showTime 广告点击事件 数据结构:adID^showID^clickTime 数据关联关系如下: 广告请求事件与广告展示事件通过adID关联。 广告展示事件与广告点击事件通过adID+showID关联。 数据要求: 数据从产生到到达流处理引擎的延迟时间不超过2小时 广告请求事件、广告展示事件、广告点击事件到达流处理引擎的时间不能保证有序和时间对齐
共100000条