华为云用户手册

  • 示例 输入一条记录("student1", "student2, student3"),输出两条记录("student1", "student2") 和 ("student1", "student3") 。 create source stream s1(attr1 string, attr2 string) with (......);insert into s2 select attr1, b1 from s1 left join lateral table(split_cursor(attr2, ',')) as T(b1) on true;
  • 示例 将流weather_out的数据输出到表格存储服务CloudTable的OpenTSDB中。 1 2 3 4 5 6 7 8 910111213141516 CREATE SINK STREAM weather_out ( timestamp_value LONG, /* 时间 */ temperature FLOAT, /* 温度值 */ humidity FLOAT, /* 湿度值 */ location STRING /* 地点 */) WITH ( type = "opentsdb", region = "xxx", cluster_id = "e05649d6-00e2-44b4-b0ff-7194adaeab3f", tsdb_metrics = "weather", tsdb_timestamps = "${timestamp_value}", tsdb_values = "${temperature}; ${humidity}", tsdb_tags = "location:${location},signify:temperature; location:${location},signify:humidity", batch_insert_data_num = "10");
  • DMS输入流 分布式消息服务(Distributed Message Service,简称DMS)是一项基于高可用分布式集群技术的消息中间件服务,提供了可靠且可扩展的托管消息队列,用于收发消息和存储消息。分布式消息服务Kafka是一款基于开源社区版Kafka提供的消息队列服务,向用户提供可靠的全托管式的Kafka消息队列。 DLI支持创建输入流从DMS的Kafka获取数据,作为作业的输入数据。创建DMS Kafka输入流的语法与创建开源Apache Kafka输入流一样,具体请参见开源Kafka输入流。 父主题: 创建输入流
  • DMS输出流 分布式消息服务(Distributed Message Service,简称DMS)是一项基于高可用分布式集群技术的消息中间件服务,提供了可靠且可扩展的托管消息队列,用于收发消息和存储消息。分布式消息服务Kafka是一款基于开源社区版Kafka提供的消息队列服务,向用户提供可靠的全托管式的Kafka消息队列。 DLI支持将作业的输出数据输出到DMS的Kafka实例中。创建DMS Kafka输出流的语法与创建开源Apache Kafka输出流一样,具体请参见MRS Kafka输出流。 父主题: 创建输出流
  • 流作业SQL语法概览 本章节介绍了目前DLI所提供的Flink SQL语法列表。参数说明,示例等详细信息请参考具体的语法说明。 表1 流作业语法概览 语法分类 功能描述 创建输入流 CloudTable HBase输入流 创建输入流 DIS输入流 DMS输入流 创建输入流 MRS Kafka输入流 开源Kafka输入流 OBS输入流 创建输出流 CloudTable HBase输出流 创建输出流 CloudTable OpenTSDB输出流 创建输出流 CSS Elasticsearch输出流 DCS输出流 DDS输出流 DIS输出流 DMS输出流 DWS输出流(通过JDBC方式) DWS输出流(通过OBS转储方式) 创建输出流 MRS HBase输出流 MRS Kafka输出流 开源Kafka输出流 OBS输出流 RDS输出流 创建输出流 SMN输出流 文件系统输出流(推荐) 创建中间流 创建中间流 创建维表 创建Redis表 创建RDS表 自拓展生态 自拓展输入流 自拓展输出流 父主题: Flink SQL语法参考(不再演进,推荐使用Flink OpenSource SQL)
  • 语法说明 COMPUTED COLUMN 计算列是一个使用 “column_name AS computed_column_expression” 语法生成的虚拟列。它由使用同一表中其他列的非查询表达式生成,并且不会在表中进行物理存储。例如,一个计算列可以使用 cost AS price * quantity 进行定义,这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。 在 Flink 中计算列一般用于为 CREATE TABLE 语句定义 时间属性。 处理时间属性 可以简单地通过使用了系统函数 PROCTIME() 的 proc AS PROCTIME() 语句进行定义。 另一方面,由于事件时间列可能需要从现有的字段中获得,因此计算列可用于获得事件时间列。例如,原始字段的类型不是 TIMESTAMP(3) 或嵌套在 JSON 字符串中。 注意: 定义在一个数据源表( source table )上的计算列会在从数据源读取数据后被计算,它们可以在 SELECT 查询语句中使用。 计算列不可以作为 INSERT 语句的目标,在 INSERT 语句中,SELECT 语句的 schema 需要与目标表不带有计算列的 schema 一致。
  • 创建源表相关语法 表1 创建源表相关语法 语法分类 功能描述 创建源表 Kafka源表 DIS源表 JDBC源表 DWS源表 Redis源表 Hbase源表 userDefined源表 创建结果表 ClickHouse结果表 Kafka结果表 Upsert Kafka结果表 DIS结果表 JDBC结果表 DWS结果表 Redis结果表 SMN结果表 Hbase结果表 Elasticsearch结果表 userDefined结果表 创建维表 创建JDBC维表 创建DWS维表 创建Hbase维表
  • 语法格式 create table jbdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression))with ( 'connector.type' = 'jdbc', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '');
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc connector.url 是 数据库的URL connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 连接数据库所需要的驱动。若未配置,则会自动通过URL提取 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.read.partition.column 否 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.lower-bound 否 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.upper-bound 否 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.num 否 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 connector.read.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。
  • 示例 create table jdbcSource ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT)with ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx', 'connector.table' = 'jdbc_table_name', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'xxx', 'connector.password' = 'xxxxxx');
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 若使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,只能为hbase connector.version 是 该值只能为1.4.3 connector. table-name 是 hbase中的表名 connector.zookeeper.quorum 是 Zookeeper的地址 connector.zookeeper.znode.parent 否 Zookeeper中的根目录,默认是/hbase connector.rowkey 否 读取复合rowkey的内容,并根据设置的大小,赋给新的字段 形如:rowkey1:3,rowkey2:3,… 其中3表示取该字段的前3个byte,该值不能大于该字段的字节大小,且该值不能小于1。表示将复合rowkey的前三个字节赋给字段rowkey1,其后三个字节赋给字段rowkey2
  • 语法格式 create table hbaseSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression))with ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = '', 'connector.zookeeper.quorum' = '');
  • 示例 将数据写入smn的相应主题中,其中smn发送的消息的主题为'test',内容为字段'attr1'的内容 create table smnSink ( attr1 STRING, attr2 STRING)with ( 'connector.type' = 'smn', 'connector.region' = 'cn-north-1', 'connector.topic-urn' = 'xxxxxx', 'connector.message-subject' = 'test', 'connector.message-column' = 'attr1');
  • 功能描述 DLI将作业的输出数据输出到HBase中。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式云存储系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。HBase支持消息数据、报表数据、推荐类数据、风控类数据、日志数据、订单数据等结构化、半结构化的KeyValue数据存储。 利用DLI,用户可方便地将海量数据高速、低时延写入HBase。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,只能为hbase connector.version 是 该值只能为1.4.3 connector.table-name 是 hbase中的表名 connector.zookeeper.quorum 是 Zookeeper的地址 connector.zookeeper.znode.parent 否 Zookeeper中的根目录,默认是/hbase connector.write.buffer-flush.max-size 否 每次插入的数据的最大的缓存大小,默认为2mb ,仅支持mb connector.write.buffer-flush.max-rows 否 每次刷新数据的最大条数 connector.write.buffer-flush.interval 否 刷新时间,默认值为0s,如2s connector.rowkey 否 设置复合rowkey,即根据多个字段设置。 形如:rowkey1:3,rowkey2:3,… 其中3表示取该字段的前3个byte,该值不能大于该字段的字节大小,且该值不能小于1
  • 语法格式 create table smnSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED))with ( 'connector.type' = 'smn', 'connector.region' = '', 'connector.topic-urn' = '', 'connector.message-subject' = '', 'connector.message-column' = '');
  • 语法格式 create table hbaseSink ( attr_name attr_type (',' attr_name attr_type)* )with ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = '', 'connector.zookeeper.quorum' = '');
  • 功能描述 DLI将Flink作业的输出数据输出到消息通知服务(SMN)中。 消息通知服务(Simple Message Notification,简称SMN)为DLI提供可靠的、可扩展的、海量的消息处理服务,它大大简化系统耦合,能够根据用户的需求,向订阅终端主动推送消息。可用于连接云服务、向多个协议推送消息以及集成在产生或使用通知的任何其他应用程序等场景。SMN的更多信息,请参见《消息通知服务用户指南》。
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 若使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 sink的类型,smn表示输出到消息通知服务中 connector.region 是 SMN所在区域 connector.topic-urn 否 SMN服务的主题URN,用于静态主题URN配置。作为消息通知的目标主题,需要提前在SMN服务中创建。 与“urn_column”配置两者至少存在一个,同时配置时,“topic_urn”优先级更高。 connector.urn-column 否 主题URN内容的字段名,用于动态主题URN配置。 与“topic_urn”配置两者至少存在一个,同时配置时,“topic_urn”优先级更高。 connector.message-subject 是 发送SMN服务的消息标题,用户自定义 connector.message-column 是 当前表的某个字段名,其内容作为消息的内容,用户自定义。目前只支持默认的文本消息
  • 功能描述 DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。 云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。云搜索服务的更多信息,请参见《云搜索服务用户指南》。
  • 语法格式 create table esSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED))with ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://xxxx:9200', 'connector.index' = '', 'connector.document-type' = '', 'update-mode' = '', 'format.type' = 'json');
  • 示例 create table sink1( attr1 string, attr2 int) with ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://xxxx:9200', 'connector.index' = 'es', 'connector.document-type' = 'one', 'update-mode' = 'append', 'format.type' = 'json');
  • 前提条件 请务必确保您的账户下已在云搜索服务里创建了集群。如何创建集群请参考《云搜索服务用户指南》中创建集群章节。 如果需要通过集群账号和密码访问Elasticsearch,则创建的云搜索服务集群必须开启安全模式并且关闭https。 该场景作业需要运行在DLI的独享队列上,因此要与云搜索服务建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,对于elasticsearch需配置为elasticsearch connector.version 是 使用的elasticsearch的版本。 当前只能使用版本7,即该值只能为7 connector.hosts 是 Elasticsearch所在集群的主机名,多个以’;’间隔,注意请以http开头,如http://x.x.x.x:9200 connector.index 是 Elasticsearch的索引名 connector.document-type 是 Elasticsearch的type名称 当版本为7时,由于elasticsearch使用默认的_doc类型,因此该属性无效 update-mode 是 sink的写入类型,支持append和upsert connector.key-delimiter 否 连接复合主键的拼接符,默认为_ connector.key-null-literal 否 当key中含有null时,使用该字符代替 connector.failure-handler 否 elasticsearch请求失败时的策略,默认为fail fail:当请求失败且作业失败时抛出异常 ignore:忽略 retry-rejected:对于由于es节点的队列满时,会重新请求而不抛出失败。 custom:使用定制策略 connector.failure-handler-class 否 使用失败时的定制策略时所使用的自定义处理方式 connector.flush-on-checkpoint 否 checkpoint时是否会等待所有阻塞请求完成。 默认为true,表示会等待阻塞请求完成,如果配置为false,则表示不会等待阻塞请求完成。 connector.bulk-flush.max-actions 否 批量写入时的每次最大写入记录数 connector.bulk-flush.max-size 否 批量写入时的最大数据量,当前只支持MB,请带上单位 mb connector.bulk-flush.interval 否 批量写入时的刷新的时间间隔,单位为milliseconds,无需带上单位 format.type 是 当前只支持json connector.username 否 Elasticsearch所在集群的账号。该账号参数需和密码“connector.password”参数同时配置。 使用账号密码参数时,创建的云搜索服务集群必须开启安全模式并且关闭https。 connector.password 否 Elasticsearch所在集群的密码。该密码参数需和“connector.username”参数同时配置。
  • 语法格式 create table hbaseSource ( attr_name attr_type (',' attr_name attr_type)* )with ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = '', 'connector.zookeeper.quorum' = '');
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 若使用MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,只能为hbase connector.version 是 该值只能为1.4.3 connector. table-name 是 hbase中的表名 connector.zookeeper.quorum 是 Zookeeper的地址 connector.zookeeper.znode.parent 否 Zookeeper中的根目录,默认是/hbase
  • 示例 根据order_id对数据进行去重,其中proctime为事件时间属性列 SELECT order_id, user, product, number FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num FROM Orders) WHERE row_num = 1;
共100000条