华为云用户手册

  • OVER WINDOW Over Window与Group Window区别在于Over window每一行都会输出一条记录。 语法格式 1 2 3 4 OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime(ROWS number PRECEDING) |(RANGE (BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW | UNBOUNDED preceding)) ) 语法说明 表3 参数说明 参数 参数说明 PARTITION BY 指定分组的主键,每个分组各自进行计算。 ORDER BY 指定数据按processing time或event time作为时间戳。 ROWS 个数窗口。 RANGE 时间窗口。 注意事项 同一select里所有聚合函数定义的窗口都必须保持一致。 当前Over窗口只支持前向计算(preceding),不支持following计算。 必须指定ORDER BY 按processing time或event time。 不支持对常量做聚合操作,如sum(2)。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 计算从规则启动到目前为止的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 FROM Orders; // 计算最近四条记录的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt2 FROM Orders; // 计算最近60s的计数及总和(in eventtime),基于事件时间处理,事件时间为Orders中的timeattr字段。 insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt2 FROM Orders;
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dds表示输出到文档数据库服务中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 DDS实例的访问地址,形如:ip1:port,ip2:port/database/collection。 field_names 是 待插入数据字段的key,具体形式如:"f1,f2,f3",并且保证与sink中数据列一一对应。 batch_insert_data_num 否 表示一次性批量写入的数据量,值必须为正整数,默认值为10。
  • 示例 将流qualified_cars 的数据输出到文档数据库collectionTest。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "dds", region = "xxx", db_url = "192.168.0.8:8635,192.168.0.130:8635/dbtest/collectionTest", username = "xxxxxxxxxx", password = "xxxxxxxxxx", field_names = "car_id,car_owner,car_age,average_speed,total_miles", batch_insert_data_num = "10" );
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dds", username = "", password = "", db_url = "", field_names = "" );
  • 前提条件 请务必确保您的账户下已在文档数据库服务(DDS)里创建了DDS实例。 如何创建DDS实例,请参考《文档数据库服务快速入门》中“快速购买文档数据库实例”章节。 目前仅支持未开启SSL认证的集群实例,不支持副本集与单节点的类型实例。 该场景作业需要运行在DLI的独享队列上,请确保已创建DLI独享队列。 关于如何创建DLI独享队列,在购买队列时,选择“按需计费”,勾选“专属资源模式”即可。具体操作请参见《数据湖探索用户指南》中创建队列章节。 确保DLI独享队列与DDS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 功能描述 DLI将Flink作业的输出数据输出到分布式缓存服务(DCS)的Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/。 分布式缓存服务(DCS)为DLI提供兼容Redis的即开即用、安全可靠、弹性扩容、便捷管理的在线分布式缓存能力,满足用户高并发及快速数据访问的业务诉求。 DCS的更多信息,请参见《分布式缓存服务用户指南》。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dcs_redis表示输出到分布式缓存服务的Redis存储系统中。 region 是 数据所在的DCS所在区域。 cluster_address 是 Redis实例连接地址。 password 否 Redis实例连接密码,当设置为免密访问时,省略该配置项。 value_type 是 该参数可配置为如下选项或选项的组合: 支持指定插入数据类型,包括:string, list, hash, set, zset; 支持设置key的过期时间,包括expire, pexpire, expireAt, pexpireAt; 支持删除key命令,包括del, hdel; 当需要使用多个命令时,用“;”分隔。 key_value 是 设置具体的key和value,key_value对必须与value_type所指定的类型数相对应,用“;”分隔,且key和value均支持参数化,动态列名采用${列名}表示。
  • 注意事项 当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。 字符":", ",", ";", "$", "{", "}"已被征用为特殊分隔符,暂时没有提供转义功能,禁止在key和value中作为普通字符使用,否则会影响解析,导致程序异常。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dcs_redis", region = "", cluster_address = "", password = "", value_type= "",key_value= "" );
  • 示例 将流qualified_cars的数据输出到DCS服务的Redis类型的缓存实例中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed DOUBLE, total_miles DOUBLE ) WITH ( type = "dcs_redis", cluster_address = "192.168.0.34:6379", password = "xxxxxxxx", value_type = "string; list; hash; set; zset", key_value = "${car_id}_str: ${car_owner}; name_list: ${car_owner}; ${car_id}_hash: {name:${car_owner}, age: ${car_age}}; name_set: ${car_owner}; math_zset: {${car_owner}:${average_speed}}" );
  • 前提条件 请务必确保您的账户下已在分布式缓存服务(DCS)里创建了Redis类型的缓存实例。 如何创建Redis类型的缓存实例,请参考《分布式缓存服务用户指南》中“申请Redis缓存实例”章节。 该场景作业需要运行在DLI的独享队列上,因此要与DCS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 用户通过VPC对等访问DCS实例时,除了满足VPC对等网跨VPC访问的约束之外,还存在如下约束: 当创建DCS实例时使用了172.16.0.0/12~24网段时,DLI队列不能在192.168.1.0/24、192.168.2.0/24、192.168.3.0/24网段。 当创建DCS实例时使用了192.168.0.0/16~24网段时,DLI队列不能在172.31.1.0/24、172.31.2.0/24、172.31.3.0/24网段。 当创建DCS实例时使用了10.0.0.0/8~24网段时,DLI队列不能在172.31.1.0/24、172.31.2.0/24、172.31.3.0/24网段。
  • 示例 RDS表用于与输入流连接。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "", channel = "dliinput", encode = "csv", field_delimiter = "," ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "******", db_url = "postgresql://192.168.0.0:2000/test1", table_name = "car" ); CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "", channel = "dlioutput", partition_key = "car_owner", encode = "csv", field_delimiter = "," ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info as b on a.car_id = b.car_id; 将数据库连接地址设置为DWS数据库地址,即可创建DWS维表。DWS数据库版本大于8.1.0后,无法用开源的postgresql驱动连接,需要用gaussdb驱动进行连接。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE table_id ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "", password = "", db_url = "", table_name = "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,rds表示输出到关系型数据库中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 数据库连接地址,格式为:"{database_type}://ip:port/database" 目前支持两种数据库连接:MySQL和PostgreSQL MySQL: 'mysql://ip:port/database' PostgreSQL: 'postgresql://ip:port/database' 说明: 将数据库连接地址设置为DWS数据库地址,即可创建DWS维表。DWS数据库版本大于8.1.0后,无法用开源的postgresql驱动连接,需要用gaussdb驱动进行连接。 table_name 是 用于查询数据的数据库表名。 db_columns 否 流属性和数据库表的字段对应关系。当sink流中流属性字段和数据库表中的流属性字段不完全匹配时,该参数必配。格式为“dbtable_attr1,dbtable_attr2,dbtable_attr3“。 cache_max_num 否 表示最大缓存的查询结果数,默认值为32768。 cache_time 否 表示数据库查询结果在内存中缓存的最大时间。单位为毫秒,默认值为10000,当值为0时表示不缓存。
  • 前提条件 请务必确保您的账户下已在关系型数据库(RDS)里创建了PostgreSQL或MySQL类型的RDS实例。 如何创建RDS实例,请参见《关系型数据库快速入门》中“购买实例”章节。 该场景作业需要运行在DLI的独享队列上,因此要与RDS实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 前提条件 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。 如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,需配置为'gaussdb' connector.url 是 jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。DWS数据库版本为8.1.0以后的版本时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 connector.table 是 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考示例说明。 connector.driver 否 jdbc连接驱动,默认为: org.postgresql.Driver。 DWS数据库版本为8.1.0以后的版本时,连接驱动为:com.huawei.gauss200.jdbc.Driver。 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.read.partition.column 否 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.lower-bound 否 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.upper-bound 否 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.num 否 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 connector.read.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示
  • 示例 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。 表car_info没有在schema下时。 1 2 3 4 5 6 7 8 9 10 11 12 create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx' ); 当DWS表test在名为test_schema的schema下时,可以参考如下样例。 1 2 3 4 5 6 7 8 9 10 11 12 create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'test_schema\".\"test', 'connector.username' = 'xx', 'connector.password' = 'xx' ); 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。 create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.driver' = 'com.huawei.gauss200.jdbc.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.username' = 'xx', 'connector.password' = 'xx' );
  • 功能描述 DLI将Flink作业从数据仓库服务(DWS)中读取数据。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' );
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "cloudtable", region = "", cluster_id = "", table_name = "", table_columns = "", create_if_not_exist = "" )
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“cloudtable”表示输出到CloudTable(HBase)。 region 是 表格存储服务所在区域。 cluster_id 是 待插入数据所属集群的id。 table_name 是 待插入数据的表名,支持参数化,例如当需要某一列或者几列作为表名的一部分时,可表示为”car_pass_inspect_with_age_${car_age}“,其中car_age为列名。 table_columns 是 待插入的列,具体形式如:"rowKey,f1:c1,f1:c2,f2:c1",其中必须指定rowKey,当某列不需要加入数据库时,以第三列为例,可表示为"rowKey,f1:c1,,f2:c1"。 illegal_data_table 否 如果指定该参数,异常数据(比如:rowKey不存在)会写入该表(rowKey为时间戳加六位随机数字,schema为info:data, info:reason),否则会丢弃。 create_if_not_exist 否 当待写入的表或者列族不存在时,是否创建,值为true或者false,默认值为false。 batch_insert_data_num 否 表示一次性批量写入的数据条数,值必须为正整数,上限为100,默认值为10。
  • 示例 将流qualified_cars的数据输出到表格存储服务CloudTable的HBase中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "cloudtable", region = "xxx", cluster_id = "209ab1b6-de25-4c48-8e1e-29e09d02de28", table_name = "car_pass_inspect_with_age_${car_age}", table_columns = "rowKey,info:owner,,car:speed,car:miles", illegal_data_table = "illegal_data", create_if_not_exist = "true", batch_insert_data_num = "20" );
  • 功能描述 DLI将作业的输出数据输出到CloudTable的HBase中。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式云存储系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。HBase支持消息数据、报表数据、推荐类数据、风控类数据、日志数据、订单数据等结构化、半结构化的KeyValue数据存储。 利用DLI,用户可方便地将海量数据高速、低时延写入HBase。 表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。若表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • GROUP WINDOW 语法说明 Group Window定义在GROUP BY里,每个分组只输出一条记录,包括以下几种: 分组函数 表1 分组函数表 分组窗口函数 说明 TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 HOP(time_attr, interval, interval) 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 SESSION(time_attr, interval) 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。 注意: 在流处理表中的 SQL 查询中,分组窗口函数的 time_attr 参数必须引用一个合法的时间属性,且该属性需要指定行的处理时间或事件时间。 time_attr设置为event-time时参数类型为timestamp(3)类型。 time_attr设置为processing-time时无需指定类型。 对于批处理的 SQL 查询,分组窗口函数的 time_attr 参数必须是一个timestamp类型的属性。 窗口辅助函数 可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性 表2 窗口辅助函数表 辅助函数 说明 TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) 返回相对应的滚动、滑动和会话窗口范围内的下界时间戳。 TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) 返回相对应的滚动、滑动和会话窗口范围以外的上界时间戳。 注意: 范围以外的上界时间戳不可以 在随后基于时间的操作中,作为行时间属性使用,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合。 TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) 返回的是一个可用于后续需要基于时间的操作的时间属性(rowtime attribute),比如基于时间窗口的join以及 分组窗口或分组窗口上的聚合。 TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) 返回一个可用于后续需要基于时间的操作的 处理时间参数,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合. 注意:辅助函数必须使用与GROUP BY 子句中的分组窗口函数完全相同的参数来调用. 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 // 每天计算SUM(金额)(事件时间)。 insert into temp SELECT name, TUMBLE_START(ts, INTERVAL '1' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(ts, INTERVAL '1' DAY), name; // 每天计算SUM(金额)(处理时间)。 insert into temp SELECT name, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), name; // 每个小时计算事件时间中最近24小时的SUM(数量)。 insert into temp SELECT product, SUM(amount) FROM Orders GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '1' DAY), product; // 计算每个会话的SUM(数量),间隔12小时的不活动间隙(事件时间)。 insert into temp SELECT name, SESSION_START(ts, INTERVAL '12' HOUR) AS sStart, SESSION_END(ts, INTERVAL '12' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(ts, INTERVAL '12' HOUR), name;
  • OVER WINDOW Over Window与Group Window区别在于Over window每一行都会输出一条记录。 语法格式 1 2 3 4 5 6 7 8 9 10 11 SELECT agg1(attr1) OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime ROWS BETWEEN (UNBOUNDED|rowCOUNT) PRECEDING AND CURRENT ROW FROM TABLENAME SELECT agg1(attr1) OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime RANGE BETWEEN (UNBOUNDED|timeInterval) PRECEDING AND CURRENT ROW FROM TABLENAME 语法说明 表5 参数说明 参数 参数说明 PARTITION BY 指定分组的主键,每个分组各自进行计算。 ORDER BY 指定数据按processing time或event time作为时间戳。 ROWS 个数窗口。 RANGE 时间窗口。 注意事项 所有的聚合必须定义到同一个窗口中,即相同的分区、排序和区间。 当前仅支持 PRECEDING (无界或有界) 到 CURRENT ROW 范围内的窗口、FOLLOWING 所描述的区间并未支持。 ORDER BY 必须指定于单个的时间属性。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 计算从规则启动到目前为止的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 FROM Orders; // 计算最近四条记录的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt2 FROM Orders; // 计算最近60s的计数及总和(in eventtime),基于事件时间处理,事件时间为Orders中的timeattr字段。 insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt2 FROM Orders;
  • 语法格式 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_count = "", encode = "", field_delimiter = "", offset= "");
  • 功能描述 创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
共100000条