华为云用户手册

  • 关键字 表1 CREATE TABLE参数描述 参数 描述 url Oracle的连接地址。 Oracle url支持以下格式: 格式一:jdbc:oracle:thin:@host:port:SID,其中SID是oracle数据库的唯一标识符。 格式二:jdbc:oracle:thin:@//host:port/service_name;这种方式是Oracle推荐的,对于集群来说,每个节点的SID可能不一致,但ServiceName是一致的,包含所有节点。 driver Oracle驱动类名: oracle.jdbc.driver.OracleDriver dbtable 指定在Oracle关联的表名,或者"用户名.表名",例如:public.table_name。 user Oracle用户名。 password Oracle用户名密码。 resource Oracle驱动包的OBS路径。 例如:obs://rest-authinfo/tools/oracle/driver/ojdbc6.jar resource中定义的driver jar包如果被更新,需要重启队列,才会生效。
  • 示例 创建Oracle跨源表 1 2 3 4 5 6 7 8 9 CREATE TABLE IF NOT EXISTS oracleTest USING ORACLE OPTIONS ( 'url'='jdbc:oracle:thin:@//192.168.168.40:1521/helowin', 'driver'='oracle.jdbc.driver.OracleDriver', 'dbtable'='test.Student', 'user' = 'test', 'password' = 'test', 'resource' = 'obs://rest-authinfo/tools/oracle/driver/ojdbc6.jar' );
  • 语法定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 INSERT INTO stream_name query; query: values | { select | selectWithoutFrom | query UNION [ ALL ] query } orderItem: expression [ ASC | DESC ] select: SELECT { * | projectItem [, projectItem ]* } FROM tableExpression [ JOIN tableExpression ] [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference tableReference: tablePrimary [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')'
  • 语法支持范围 基础类型: VARCHAR,STRING,BOOLEAN,TINYINT,SMALLINT,INTEGER/INT,BIGINT,REAL/FLOAT,DOUBLE,DECIMAL,DATE,TIME,TIMESTAMP Array:使用[]进行引用。例如: 1 insert into temp select CARDINALITY(ARRAY[1,2,3]) FROM OrderA;
  • 示例 从OBS的桶读取对象为input.csv的文件,文件以'\n'划行, 以','划列。 测试输入数据input.csv可以先通过新建input.txt复制如下文本数据,再另存为input.csv格式文件。将input.csv上传到对应OBS桶目录下。例如,当前上传到:“dli-test-obs01”桶目录下。 1,2,3,4,1403149534 5,6,7,8,1403149535 创建表参考如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", bucket = "dli-test-obs01", region = "xxx", object_name = "input.csv", row_delimiter = "\n", field_delimiter = "," ); 从OBS的桶读取对象为input.json的文件,文件以'\n'划行。 CREATE SOURCE STREAM obs_source ( str STRING ) WITH ( type = "obs", bucket = "obssource", region = "xxx", encode = "json", row_delimiter = "\n", object_name = "input.json" );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "obs", region = "", bucket = "", object_name = "", row_delimiter = "\n", field_delimiter = '', version_id = "" );
  • 功能描述 创建source流从对象存储服务(OBS)获取数据。DLI从OBS上读取用户存储的数据,作为作业的输入数据。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。 对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“obs”表示数据源为对象存储服务。 region 是 对象存储服务所在区域。 encode 否 数据的编码格式,可以为“csv”或者“json”。默认值为“csv”。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 bucket 是 数据所在的OBS桶名。 object_name 是 数据所在OBS桶中的对象名。如果对象不在OBS根目录下,则需添加文件夹名,例如:test/test.csv。对象文件格式参考“encode”参数。 row_delimiter 是 行间的分隔符。 field_delimiter 否 属性分隔符。 当“encode”参数为csv时,该参数必选。用户可以自定义属性分隔符。 当“encode”参数为json时,该参数不需要填写。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前只适用于CSV格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 version_id 否 版本号,当obs里的桶或对象有设置版本的时候需填写,否则不用配置该项。
  • 示例 1 2 3 4 5 6 7 8 9 10 11 12 create table disCsvSource ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT) with ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,“dis”表示数据源为数据接入服务,必须为dis。 connector.region 是 数据所在的DIS区域。 connector.ak 否 访问密钥ID(Access Key ID),需与sk同时设置 connector.sk 否 Secret Access Key,需与ak同时设置 connector.channel 是 数据所在的DIS通道名称。 connector.partition-count 否 读取从0分区开始计算的partition-count个通道范围内的数据。 该参数和partition-range参数不能同时配置。 当两个参数都没有配置的时候默认读取所有partition。 connector.partition-range 否 指定作业从DIS通道读取的分区范围。该参数和partition-count参数不能同时配置。当两个参数没有配置的时候默认读取所有partition。 partition-range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3,范围设置要在dis相应通道的范围内。 connector.offset 否 用户可以根据需求设置该参数的数值,读取数据的起始位置,与start-time不能同时设置。 connector.start-time 否 DIS数据读取从该起始时间的数据。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。 当没有配置start-time也没配置offset的时候,读取最新数据。 connector. enable-checkpoint 否 是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 勿与offset或start-time同时设置;若enable-checkpoint为true,与checkpoint-app-name需要同时配置。 connector. checkpoint-app-name 否 DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 勿与offset或start-time同时设置;若enable-checkpoint为true,则需要同时配置。 connector. checkpoint-interval 否 DIS源算子做checkpoint的时间间隔,默认为60s。格式为d、day/h、hour/min、minute/s、sec、second 勿与offset或start-time同时设置。 format.type 是 数据编码格式,可选为“csv”、“json” format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。
  • 语法格式 create table disSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = '', 'format-type' = '' );
  • 功能描述 创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 参数说明 表1 参数说明 参数 是否必选 说明 field_name 是 数据在数据流中的字段名。 图像分类中field_name类型需声明为ARRAY[TINYINT]。 文本分类中field_name类型需声明为String。 model_path 是 模型存放在OBS上的完整路径,包括模型结构和模型权值。 is_dl4j_model 是 是否是deeplearning4j的模型。 true代表是deeplearning4j,false代表是keras模型。 keras_model_config_path 是 模型结构存放在OBS上的完整路径。在keras中通过model.to_json()可得到模型结构。 keras_weights_path 是 模型权值存放在OBS上的完整路径。在keras中通过model.save_weights(filepath)可得到模型权值。 word2vec_path 是 word2vec模型存放在OBS上的完整路径。
  • 示例 图片分类预测我们采用Mnist数据集作为流的输入,通过加载预训练的deeplearning4j模型或者keras模型,可以实时预测每张图片代表的数字。 1 2 3 4 5 6 CREATE SOURCE STREAM Mnist( image Array[TINYINT] ) SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_dl4j_model_path', false) FROM Mnist SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_keras_model_path', true) FROM Mnist SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_keras_model_config_path', 'keras_weights_path') FROM Mnist 文本分类预测我们采用一组新闻标题数据作为流的输入,通过加载预训练的deeplearning4j模型或者keras模型,可以实时预测每个新闻标题所属的类别,比如经济,体育,娱乐等。 1 2 3 4 5 6 7 CREATE SOURCE STREAM News( title String ) SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_dl4j_word2vec_model_path','your_dl4j_model_path', false) FROM News SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_keras_word2vec_model_path','your_keras_model_path', true) FROM News SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_dl4j_model_path', false) FROM New SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_keras_model_path', true) FROM New
  • 语法格式 1 2 3 4 5 6 7 -- 图像分类, 返回预测图像分类的类别id DL_IMAGE_MAX_PREDICTION_INDEX(field_name, model_path, is_dl4j_model) DL_IMAGE_MAX_PREDICTION_INDEX(field_name, keras_model_config_path, keras_weights_path) -- 适用于Keras模型 -- 文本分类,返回预测文本分类的类别id DL_TEXT_MAX_PREDICTION_INDEX(field_name, model_path, is_dl4j_model) -- 采用默认word2vec模型 DL_TEXT_MAX_PREDICTION_INDEX(field_name, word2vec_path, model_path, is_dl4j_model) 模型及配置文件等需存储在用户的OBS中,路径格式为"obs://your_ak:your_sk@obs.your_obs_region.xxx.com:443/your_model_path"。例如你的模型存放在OBS上,桶名为dl_model,文件名为model.h5,则路径填写为"obs://your_ak:your_sk@obs.xxx.com:443/dl_model/model.h5"。
  • 功能描述 Spark为了提高性能会缓存Parquet的元数据信息。当更新了Parquet表时,缓存的元数据信息未更新,导致Spark SQL查询不到新插入的数据作业执行报错,报错信息参考如下: DLI.0002: FileNotFoundException: getFileStatus on error message 该场景下就需要使用REFRESH TABLE来解决该问题。REFRESH TABLE是用于重新整理某个分区的文件,重用之前的表元数据信息,能够检测到表的字段的增加或者减少,主要用于表中元数据未修改,表的数据修改的场景。
  • 注意事项 resource可以是queue、database、table、column、view,格式分别为: queue的格式为:queues.queue_name database的格式为:databases.db_name table的格式为:databases.db_name.tables.table_name column的格式为:databases.db_name.tables.table_name.columns.column_name view的格式为:databases.db_name.tables.view_name
  • 原生数据类型 Flink SQL支持原生数据类型,请参见表1。 表1 原生数据类型 数据类型 描述 存储空间 范围 VARCHAR 可变长度的字符 - - BOOLEAN 布尔类型 - TRUE/FALSE TINYINT 有符号整数 1字节 -128-127 SMALLINT 有符号整数 2字节 -32768-32767 INT 有符号整数 4字节 -2147483648~2147483647 INTEGER 有符号整数 4字节 -2147483648~2147483647 BIGINT 有符号整数 8字节 -9223372036854775808~9223372036854775807 REAL 单精度浮点型 4字节 - FLOAT 单精度浮点型 4字节 - DOUBLE 双精度浮点型 8字节 - DECIMAL 固定有效位数和小数位数的数据类型 - - DATE 日期类型,描述了特定的年月日,以yyyy-MM-dd格式表示,例如2014-05-29 - DATE类型不包含时间,所表示日期的范围为0000-01-01 to 9999-12-31 TIME 时间类型,以HH:mm:ss表示。 例如20:17:40 - - TIMESTAMP(3) 完整日期,包括日期和时间。 例如:1969-07-20 20:17:40 - - INTERVAL timeUnit [TO timeUnit] 时间间隔 例如:INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY - -
  • 功能描述 在DLI数据多版本功能开启后,备份数据默认保留7天,您可以通过配置系统参数“dli.multi.version.retention.days”调整保留周期。保留周期外的多版本数据后续在执行insert overwrite或者truncate语句时会自动进行清理。在添加列或者修改分区表时,也可以设置表属性“dli.multi.version.retention.days”调整保留周期。 开启和关闭多版本功能SQL语法请参考开启或关闭数据多版本。 DLI数据多版本功能当前仅支持通过Hive语法创建的OBS表,具体建表SQL语法可以参考使用Hive语法创建OBS表。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“edgehub”表示数据源为智能边缘平台的edgehub。 topic 是 主题,需要消费数据的edgehub中的主题名称。 encode 是 数据编码格式,可选为“csv”和“json”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 field_delimiter 否 属性分隔符。当“encode”为“csv”时,用于指定csv字段分隔符,默认为“,"。 当“encode”为“json”时,不需要设置属性之间的分隔符。 json_config 否 当“encode”为“json”时,可以通过该参数指定json字段和流定义字段的映射关系,格式为: "field1=data_json.field1;field2=data_json.field2;field3=$" 其中"field3=$"表示field3的内容为整个json串。
  • 示例 从edgehub主题abc中读取数据,数据编码格式为json。数据示例为:{"student":{"score":90,"name":"1bc2"}}。 1 2 3 4 5 6 7 8 9 CREATE SOURCE STREAM student_scores( name string, score int) WITH ( type = "edgehub", topic = "abc", encode = "json", json_config = "score = student.score; name=student.name" );
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "edgehub", topic = "", encode = "", json_config = "", field_delimiter = '' );
  • 功能描述 创建边缘作业source流,从EdgeHub中获取数据。用户数据写入EdgeHub中,Flink边缘作业从中读取数据,作为流计算的数据输入。 适用于物联网IOT场景,将实时流计算能力从云端延伸到边缘,在边缘快速实现对流数据实时、快速、准确地分析处理,增加数据处理计算的速度和效率。同时将数据在边缘预处理,可以有效减少无效的数据上云,减少资源消耗,提升分析效率。边缘作业依赖于智能边缘平台(Intelligent EdgeFabric, IEF),IEF通过纳管用户的边缘节点,提供将云上应用延伸到边缘的能力,联动边缘和云端的数据,同时,在云端提供统一的设备/应用监控、日志采集等运维能力,为企业提供完整的边缘计算解决方案。IEF的更多信息,请参见《智能边缘平台用户指南》。 仅Flink 1.7版本适配边缘作业场景,且Flink 1.7 EOS。DLI后续版本不再提供边缘作业场景的语法参考。
  • 关键字 GROUPING SETS:为对GROUP BY的扩展,例如 SELECT a, b, sum(expression) FROM table GROUP BY a, b GROUPING SETS((a,b)); 将转换为以下一条查询: 1 2 SELECT a, b, sum(expression) FROM table GROUP BY a, b; SELECT a, b, sum(expression) FROM table GROUP BY a, b GROUPING SETS(a,b); 将转换为以下两条查询: 1 2 3 SELECT a, NULL, sum(expression) FROM table GROUP BY a; UNION SELECT NULL, b, sum(expression) FROM table GROUP BY b; SELECT a, b, sum(expression) FROM table GROUP BY a, b GROUPING SETS((a,b), a); 将转换为以下两条查询: 1 2 3 SELECT a, b, sum(expression) FROM table GROUP BY a, b; UNION SELECT a, NULL, sum(expression) FROM table GROUP BY a; SELECT a, b, sum(expression) FROM table GROUP BY a, b GROUPING SETS((a,b), a, b, ()); 将转换为以下四条查询: 1 2 3 4 5 6 7 SELECT a, b, sum(expression) FROM table GROUP BY a, b; UNION SELECT a, NULL, sum(expression) FROM table GROUP BY a, NULL; UNION SELECT NULL, b, sum(expression) FROM table GROUP BY NULL, b; UNION SELECT NULL, NULL, sum(expression) FROM table;
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'format.type' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于kafka,需配置为'kafka'。 connector.version 否 Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。 format.type 是 数据序列化格式,支持:'csv'、 'json'及'avro'等。 format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.topic 是 kafka topic名。 connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔。 connector.sink-partitioner 否 记录分区的方式,支持:'fixed', 'round-robin'及'custom'。 connector.sink-partitioner-class 否 'sink-partitioner'为'custom'时,需配置,如'org.mycompany.MyPartitioner' 。 update-mode 否 支持:'append'、'retract'及'upsert'三种写入模式。 connector.properties.* 否 配置kafka任意原生属性
  • 示例 将kafkaSink的数据输出到Kafka中 1 2 3 4 5 6 7 8 9 10 11 12 13 create table kafkaSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT) with ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.sink-partitioner' = 'round-robin', 'format.type' = 'csv' );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“opentsdb”表示输出到MRS的OpenTSDB。 region 是 MRS服务所在区域。 tsdb_link_address 是 MRS中OpenTSDB实例的服务地址,格式为http://ip:port或者https://ip:port。 说明: 配置项tsd.https.enabled为true时,需要使用https,注意https暂时不支持证书认证。 tsdb_metrics 是 数据点的metric,支持参数化。 tsdb_timestamps 是 数据点的timestamp,数据类型支持LONG、INT、SHORT和STRING,仅支持指定动态列。 tsdb_values 是 数据点的value,数据类型支持SHORT、INT、LONG、FLOAT、DOUBLE和STRING,支持指定动态列或者常数值。 tsdb_tags 是 数据点的tags,每个tags里面至少一个标签值,最多8个标签值,支持参数化。 batch_insert_data_num 否 表示一次性批量写入的数据量(即数据条数),值必须为正整数,上限为65536,默认值为8。
  • 示例 将流weather_out的数据输出到MRS服务的OpenTSDB中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM weather_out ( timestamp_value LONG, /* 时间 */ temperature FLOAT, /* 温度值 */ humidity FLOAT, /* 湿度值 */ location STRING /* 地点 */ ) WITH ( type = "opentsdb", region = "xxx", tsdb_link_address = "https://x.x.x.x:4242", tsdb_metrics = "weather", tsdb_timestamps = "${timestamp_value}", tsdb_values = "${temperature}; ${humidity}", tsdb_tags = "location:${location},signify:temperature; location:${location},signify:humidity", batch_insert_data_num = "10" );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "opentsdb", region = "", tsdb_metrics = "", tsdb_timestamps = "", tsdb_values = "", tsdb_tags = "", batch_insert_data_num = "" )
共100000条