华为云用户手册

  • 示例 回收用户user_name1对于数据库db1的删除数据库权限。 1 REVOKE DROP_DATABASE ON databases.db1 FROM USER user_name1; 回收用户user_name1对于数据库db1的表tb1的SELECT权限。 1 REVOKE SELECT ON databases.db1.tables.tb1 FROM USER user_name1; 回收角色role_name对于数据库db1的表tb1的SELECT权限。 1 REVOKE SELECT ON databases.db1.tables.tb1 FROM ROLE role_name;
  • 关键字 表1 SELECT参数描述 参数 描述 ALL 返回重复的行。为默认选项。其后只能跟*,否则会出错。 DISTINCT 从结果集移除重复的行。 WHERE 指定查询的过滤条件,支持算术运算符、关系运算符和逻辑运算符。 where_condition 过滤条件。 GROUP BY 指定分组的字段,支持单字段及多字段分组。 col_name_list 字段列表。 ORDER BY 对查询结果进行排序。 ASC/DESC ASC为升序,DESC为降序,默认为ASC。 CLUSTER BY 为分桶且排序,按照分桶字段先进行分桶,再在每个桶中依据该字段进行排序,即当DISTRIBUTE BY的字段与SORT BY的字段相同且排序为降序时,两者的作用与CLUSTER BY等效。 DISTRIBUTE BY 指定分桶字段,不进行排序。 SORT BY 将会在桶内进行排序。 LIMIT 对查询结果进行限制,number参数仅支持INT类型。
  • 语法格式 1 2 3 4 5 6 7 SELECT [ALL | DISTINCT] attr_expr_list FROM table_reference [WHERE where_condition] [GROUP BY col_name_list] [ORDER BY col_name_list][ASC | DESC] [CLUSTER BY col_name_list | DISTRIBUTE BY col_name_list] [SORT BY col_name_list]] [LIMIT number];
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 MATCH_RECOGNIZE ( [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH | ALL ROWS PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN ( pattern ) [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ) MR SQL中的模式匹配是用MATCH_RECOGNIZE子句执行。MATCH_RECOGNIZE子句执行如下任务: 使用PARTITION BY 和ORDER BY子句对MATCH_RECOGNIZE子句中的数据进行逻辑分区和排序。 使用PATTERN子句来定义要查找的数据行的模式。这些模式使用规则表达式语法。 使用DEFINE子句指定PATTERN模式变量所需的逻辑条件。 使用MEASURES子句定义度量,这是一些可在SQL查询的其他部分所使用的表达式。
  • 语法格式 将SELECT查询结果插入到表中: 1 2 3 4 5 6 7 INSERT INTO DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...; 将某条数据插入到表中: 1 2 INSERT INTO DLI_TABLE VALUES values_row [, values_row ...];
  • 参数说明 表1 参数描述 参数 描述 DLI_TABLE 已创建跨源连接的DLI表名称。 DLI_TEST 为包含待查询数据的表。 field1,field2...,field 表“DLI_TEST”中的列值,需要匹配表“DLI_TABLE”的列值和类型。 where_condition 查询过滤条件。 num 对查询结果进行限制,num参数仅支持INT类型。 values_row 想要插入到表中的值,列与列之间用逗号分隔。
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 无。 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • 用表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无。 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • UNION 语法格式 1 query UNION [ ALL ] query 语法说明 UNION返回多个查询结果的并集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • 示例 分别使用四种函数结合窗口来实时计算聚类的相关信息。 1 2 3 4 5 6 7 8 9 10 SELECT CENTROID(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE UNBOUNDED PRECEDING) AS centroid, CLUSTER_CENTROIDS(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE UNBOUNDED PRECEDING) AS centroids FROM MyTable SELECT CENTROID(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '60' MINUTE PRECEDING AND CURRENT ROW) AS centroidCE, ALL_POINTS_OF_CLUSTER(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '60' MINUTE PRECEDING AND CURRENT ROW) AS itemList, ALL_CLUSTERS_POINTS(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '60' MINUTE PRECEDING AND CURRENT ROW) AS listoflistofpoints FROM MyTable
  • 语法格式 1 2 3 4 CENTROID(ARRAY[field_names], distance_threshold):加入当前数据点后,该数据点所属分类中心。 CLUSTER_CENTROIDS(ARRAY[field_names], distance_threshold):加入当前数据点后,所有分类中心。 ALL_POINTS_OF_CLUSTER(ARRAY[field_names], distance_threshold):加入当前数据点后,该分类所有数据点。 ALL_CLUSTERS_POINTS(ARRAY[field_names], distance_threshold):加入当前数据点后,所有分类对应的所有数据点。 聚类算法可以应用在无界流中。
  • 关键字 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列,col_name_list中包含的字段必须出现在attr_expr_list的字段内,attr_expr_list中可以使用多个聚合函数,比如count(),sum(),聚合函数中可以包含其他字段。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中,同样,GROUP BY中出现的字段必须在attr_expr_list的字段内,attr_expr_list也可以使用聚合函数。
  • 功能描述 DLI将Flink作业的输出数据输出到关系型数据库(RDS)中。目前支持PostgreSQL和MySQL两种数据库。PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。MySQL数据库适用于各种WEB应用、电子商务应用、企业应用、移动应用等场景,减少IT部署和维护成本。 关系型数据库(Relational Database Service,简称RDS)是一种基于云计算平台的在线关系型数据库服务。 RDS的更多信息,请参见《关系型数据库用户指南》。
  • 前提条件 请务必确保您的账户下已在关系型数据库(RDS)里创建了PostgreSQL或MySQL类型的RDS实例。 如何创建RDS实例,请参见《关系型数据库快速入门》中“购买实例”章节。 该场景作业需要运行在DLI的独享队列上,因此要与RDS实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "rds", username = "", password = "", db_url = "", table_name = "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,rds表示输出到关系型数据库中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 数据库连接地址,格式为:"{database_type}://ip:port/database" 目前支持两种数据库连接:MySQL和PostgreSQL MySQL: 'mysql://ip:port/database' PostgreSQL: 'postgresql://ip:port/database' table_name 是 要插入数据的数据库表名。 db_columns 否 支持配置输出流属性和数据库表属性的对应关系,需严格按照输出流的属性顺序配置。 示例: create sink stream a3(student_name string, student_age int) with ( type = "rds", username = "root", password = "xxxxxxxx", db_url = "mysql://192.168.0.102:8635/test1", db_columns = "name,age", table_name = "t1" ); student_name对应数据库里的name属性,student_age对应数据库里的age属性。 说明: 当不配置db_columns时,若输出流属性个数小于数据库表属性个数,并且数据库多出的属性都是nullable或者有默认值时,这种情况也允许。 primary_key 否 如果想通过主键实时更新表中的数据,需要在创建数据表的时候增加primary_key配置项,如下面例子中的c_timeminute。配置primary_key后,在进行数据写入操作时,如果primary_key存在,则进行更新操作,否则进行插入操作。 示例: CREATE SINK STREAM test(c_timeminute LONG, c_cnt LONG) WITH ( type = "rds", username = "root", password = "xxxxxxxx", db_url = "mysql://192.168.0.12:8635/test", table_name = "test", primary_key = "c_timeminute"); operation_field 否 该配置项用于指定数据的处理方式,需要配置为${field_name}的形式,field_name的类型必读为string,field_name所代表的真正内容表示为D或者DELETE时,表示删除数据库中该条记录,其余默认插入数据。
  • 示例 将流audi_cheaper_than_30w的数据输出到数据库test的audi_cheaper_than_30w表下。 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "xxxxxx", db_url = "mysql://192.168.1.1:8635/test", table_name = "audi_cheaper_than_30w" );
  • 示例 create table upsertKafkaSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT, primary key (car_id) not enforced ) with ( 'connector.type' = 'upsert-kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'format.type' = 'csv' );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'upsert-kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'format.type' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于upsert kafka,需配置为'upsert-kafka' connector.version 否 Kafka版本,仅支持:'0.11' format.type 是 数据序列化格式,支持:'csv', 'json'及'avro'等 connector.topic 是 kafka topic名 connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔 connector.sink-partitioner 否 记录分区方式,支持:'fixed', 'round-robin'及'custom' connector.sink-partitioner-class 否 'sink-partitioner'为'custom'时,需配置,如'org.mycompany.MyPartitioner' connector.sink.ignore-retraction 否 是否忽略回撤消息,默认为false。回撤消息将以null值写入kafka update-mode 否 支持:'append', 'retract'及'upsert'三种写入模式 connector.properties.* 否 配置kafka任意原生属性
  • 语法格式 1 2 3 4 5 6 7 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "user_defined", type_class_name = "", type_class_parameter = "" ) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,需配置为'gaussdb' connector.url 是 jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 connector.table 是 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考示例说明。 connector.driver 否 jdbc连接驱动,默认为: org.postgresql.Driver。 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.write.mode 否 数据写入模式,支持: copy, insert以及upsert三种。默认值为upsert。 该参数与'primary key'配合使用。 未配置'primary key'时,支持copy及insert两种模式追加写入。 配置'primary key',支持copy、upsert以及insert三种模式更新写入。 注意:由于dws不支持更新分布列,因而配置的更新主键必须包含dws表中定义的所有分布列。 connector.write.flush.max-rows 否 数据flush大小,超过该值将触发写入flush。默认为5000。 connector.write.flush.interval 否 数据flush周期,周期性触发写入flush。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。不填写则默认不根据时间刷新。 connector.write.max-retries 否 写入最大重试次数,默认为3。 connector.write.merge.filter-key 否 配置PRIMARY KEY,并且“connector.write.mode”配置为copy时,可以配置merge时的过滤列名。 connector.write.escape-string-value 否 是否对string类型值进行转义,默认为false。
  • 语法格式 DWS结果表中不允许指定所有属性为PRIMARY KEY。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.driver' = '', 'connector.username' = '', 'connector.password' = '' );
  • 示例 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。 使用upsert模式,写入数据到DWS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' ); 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。 CREATE TABLE ads_rpt_game_sdk_realtime_ada_reg_user_pay_mm ( ddate DATE, dmin TIMESTAMP(3), game_appkey VARCHAR, channel_id VARCHAR, pay_user_num_1m bigint, pay_amt_1m bigint, PRIMARY KEY (ddate, dmin, game_appkey, channel_id) NOT ENFORCED ) WITH ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/dws_bigdata_db', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.username' = 'xxxx', 'connector.password' = 'xxxxx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' ); 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.driver' = 'com.huawei.gauss200.jdbc.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' );
  • 功能描述 DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 前提条件 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。 如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 详细样例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 /** source **/ CREATE SOURCE STREAM car_infos (cast_int_to_varchar int, cast_String_to_int string, case_string_to_timestamp string, case_timestamp_to_date timestamp) WITH ( type = "dis", region = "xxxxx", channel = "dis-input", partition_count = "1", encode = "json", offset = "13", json_config = "cast_int_to_varchar=cast_int_to_varchar;cast_String_to_int=cast_String_to_int;case_string_to_timestamp=case_string_to_timestamp;case_timestamp_to_date=case_timestamp_to_date" ); /** sink **/ CREATE SINK STREAM cars_infos_out (cast_int_to_varchar varchar, cast_String_to_int int, case_string_to_timestamp timestamp, case_timestamp_to_date date) WITH ( type = "dis", region = "xxxxx", channel = "dis-output", partition_count = "1", encode = "json", offset = "4", json_config = "cast_int_to_varchar=cast_int_to_varchar;cast_String_to_int=cast_String_to_int;case_string_to_timestamp=case_string_to_timestamp;case_timestamp_to_date=case_timestamp_to_date", enable_output_null="true" ); /** 统计car的静态信息 **/ INSERT INTO cars_infos_out SELECT cast(cast_int_to_varchar as varchar), cast(cast_String_to_int as int), cast(case_string_to_timestamp as timestamp), cast(case_timestamp_to_date as date) FROM car_infos; 返回数据 {"case_string_to_timestamp":1514736001000,"cast_int_to_varchar":"5","case_timestamp_to_date":"2018-01-01","cast_String_to_int":100}
  • 常用类型转换函数 表1 常用类型转换函数 函数 说明 cast(v1 as varchar) 将v1转换为字符串类型,v1可以是数值类型,TIMESTAMP/DATE/TIME。 cast (v1 as int) 将v1转换为int, v1可以是数值类型或字符类。 cast(v1 as timestamp) 将v1转换为timestamp类型,v1可以是字符串或DATE/TIME。 cast(v1 as date) 将v1转换为date类型, v1可以是字符串或者TIMESTAMP。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“cloudtable”表示输出到CloudTable(HBase)。 region 是 表格存储服务所在区域。 cluster_id 是 待插入数据所属集群的id。 table_name 是 待插入数据的表名,支持参数化,例如当需要某一列或者几列作为表名的一部分时,可表示为”car_pass_inspect_with_age_${car_age}“,其中car_age为列名。 table_columns 是 待插入的列,具体形式如:"rowKey,f1:c1,f1:c2,f2:c1",其中必须指定rowKey,当某列不需要加入数据库时,以第三列为例,可表示为"rowKey,f1:c1,,f2:c1"。 illegal_data_table 否 如果指定该参数,异常数据(比如:rowKey不存在)会写入该表(rowKey为时间戳加六位随机数字,schema为info:data, info:reason),否则会丢弃。 create_if_not_exist 否 当待写入的表或者列族不存在时,是否创建,值为true或者false,默认值为false。 batch_insert_data_num 否 表示一次性批量写入的数据条数,值必须为正整数,上限为100,默认值为10。
  • 功能描述 DLI将作业的输出数据输出到CloudTable的HBase中。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式云存储系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。HBase支持消息数据、报表数据、推荐类数据、风控类数据、日志数据、订单数据等结构化、半结构化的KeyValue数据存储。 利用DLI,用户可方便地将海量数据高速、低时延写入HBase。 表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。
共100000条