华为云用户手册

  • 注意事项 privilege必须是可授权限中的一种。且如果赋权对象在resource或上一级resource上已经有对应权限时,则会赋权失败。Privilege支持的权限类型可参见数据权限列表。 resource可以是queue、database、table、view、column,格式分别为: queue的格式为:queues.queue_name queue支持的Privilege权限类型可以参考下表: 操作 说明 DROP_QUEUE 删除队列 SUBMIT_JOB 提交作业 CANCEL_JOB 终止作业 RESTART 重启队列 SCALE_QUEUE 扩缩容队列 GRANT_PRIVILEGE 队列的赋权 REVOKE_PRIVILEGE 队列权限的回收 SHOW_PRIVILEGES 查看其他用户具备的队列权限 database的格式为:databases.db_name database支持的Privilege权限类型可参见数据权限列表。 table的格式为:databases.db_name.tables.table_name table支持的Privilege权限类型可参见数据权限列表。 view的格式为:databases.db_name.tables.view_name view支持的Privilege权限类型和table一样,具体可以参考数据权限列表中table的权限列表描述。 column的格式为:databases.db_name.tables.table_name.columns.column_name column支持的Privilege权限类型仅为:SELECT
  • 语法格式 1 2 3 4 5 6 7 8 9 create table clickhouseSink ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'clickhouse', 'connector.url' = '', 'connector.table' = '' );
  • 注意事项 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0,且勿开启kerberos认证。 Flink SQL语句中不能定义主键。同时不能使用任何产生主键的语法,例如insert into clickhouseSink select id, cout(*) from sourceName group by id。 Flink中支持字段类型范围为:string、tinyint、smallint、int、long、float、double、date、timestamp、decimal以及Array。 其中Array中的数据类型仅支持int、bigint、string、float、double。
  • 关键字 ROLLUP:为GROUP BY的扩展,例如:SELECT a, b, c, SUM(expression) FROM table GROUP BY a, b, c WITH ROLLUP;将转换成以下四条查询: (a, b, c)组合小计 1 2 SELECT a, b, c, sum(expression) FROM table GROUP BY a, b, c; (a, b)组合小计 1 2 SELECT a, b, NULL, sum(expression) FROM table GROUP BY a, b; (a)组合小计 1 2 SELECT a, NULL, NULL, sum(expression) FROM table GROUP BY a; 总计 1 SELECT NULL, NULL, NULL, sum(expression) FROM table;
  • Join表函数(UDTF) 功能描述 将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。 注意事项 针对横向表的左外部联接当前仅支持文本常量 TRUE 作为谓词。 示例 若表函数返回了空结果,左表(outer)的行将会被删除 SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; 若表函数返回了空结果,将会保留相对应的外部行并用空值填充 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE;
  • Join Temporal Table Function 功能描述 注意事项 目前仅支持在 Temporal Tables 上的 inner join 示例 假如Rates是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达: SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
  • 示例代码 计算所有商品库存(items)的样本方差。命令示例如下: select var_samp(items) from warehouse; 返回结果如下: _c0 294.342355 与group by配合使用,对所有商品按照仓库(warehourseId)进行分组,并计算同组商品库存(items)的样本方差。命令示例如下: select warehourseId, var_samp(items) from warehourse group by warehourseId; 返回结果如下: warehouseId _c1 city1 18.23124 city2 16.23344 city3 11.43425
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table jdbcSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'jdbc', 'connector.url' = '', 'connector.table' = '', 'connector.driver' = '', 'connector.username' = '', 'connector.password' = '' );
  • 示例 将流jdbcSink的数据输出到MySQL数据库中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table jdbcSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx', 'connector.table' = 'jdbc_table_name', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'xxx', 'connector.password' = 'xxxxxx' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc connector.url 是 数据库的URL connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 连接数据库所需要的驱动。若未配置,则会自动通过URL提取 connector.username 否 访问数据库所需要的账号 connector.password 否 访问数据库所需要的密码 connector.write.flush.max-rows 否 写数据时,刷新数据的最大行数。默认值为5000 connector.write.flush.interval 否 刷新数据的时间间隔,单位可以为ms、milli、millisecond/s、sec、second/min、minute等。不填写则默认不根据时间刷新 connector.write.max-retries 否 写数据失败时的最大尝试次数。默认值为3 connector.write.exclude-update-columns 否 默认值为空(默认忽略primary key字段),表示更新主键值相同的数据时,忽略指定字段的更新
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0及以上版本,且勿开启kerberos认证。 ClickHouse结果表不支持删除表数据操作。 Flink中支持字段类型范围为:string、tinyint、smallint、int、long、float、double、date、timestamp、decimal以及Array。 其中Array中的数据类型仅支持int、bigint、string、float、double。
  • 示例 从Kafka中读取数据,并将数据插入到数据库为flink、表名为order的ClickHouse数据库中,其具体步骤如下(clickhouse版本为MRS的21.3.4.25): 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink作业队列。 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。 select cluster,shard_num,replica_num,host_name from system.clusters; 其返回信息如下图: ┌─cluster────┬────┬─shard_num─┐ │ default_cluster │ 1 │ 1 │ │ default_cluster │ 1 │ 2 │ └──────── ┴────┴────── ┘ 根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。 CREATE DATABASE flink ON CLUSTER default_cluster; 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。 CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,ClickHouse作业结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector.type' = 'clickhouse', 'connector.url' = 'jdbc:clickhouse://ClickhouseAddress:ClickhousePort/flink', 'connector.table' = 'order', 'connector.write.flush.max-rows' = '1' ); insert into clickhouseSink select * from orders; 连接Kafka集群,向Kafka中插入以下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。 select * from flink.order; 查询结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106 202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110
  • 注意事项 导入OBS表时,创建OBS表时指定的路径必须是文件夹,若建表路径是文件将导致导入数据失败。 仅支持导入位于OBS路径上的原始数据。 不建议对同一张表并发导入数据,因为有一定概率发生并发冲突,导致导入失败。 导入数据时只能指定一个路径,路径中不能包含逗号。 当OBS桶目录下有文件夹和文件同名时,导入数据会优先指向该路径下的文件而非文件夹。 导入PARQUET、ORC及JSON类型数据时,必须指定DATA_TYPE这一OPTIONS,否则会以默认的“CSV”格式进行解析,从而导致导入的数据格式不正确。 导入CSV及JSON类型数据时,如果包含日期及时间列,需要指定DATEFORMAT及TIMESTAMPFORMAT选项,否则将以默认的日期及时间戳格式进行解析。
  • 示例 导入数据前已参考创建OBS表或者创建DLI表中的示例描述创建对应的表。 可使用下列语句将CSV文件导入到DLI表,“t”为表名。 1 2 LOAD DATA INPATH 'obs://dli/data.csv' INTO TABLE t OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','COMMENTCHAR'='#','HEADER'='false'); 可使用下列语句将JSON文件导入到DLI表,“jsontb”为表名。 1 2 LOAD DATA INPATH 'obs://dli/alltype.json' into table jsontb OPTIONS('DATA_TYPE'='json','DATEFORMAT'='yyyy/MM/dd','TIMESTAMPFORMAT'='yyyy/MM/dd HH:mm:ss');
  • 参数说明 表1 参数描述 参数 描述 folder_path 原始数据文件夹或者文件的OBS路径。 db_name 数据库名称。若未指定,则使用当前数据库。 table_name 需要导入数据的DLI表的名称。 以下是可以在导入数据时使用的配置选项: DATA_TYPE: 指定导入的数据类型,当前支持CSV、Parquet、ORC、JSON、Avro类型,默认值为“CSV”。 配置项为OPTIONS('DATA_TYPE'='CSV') 导入CSV和JSON文件时,有三种模式可以选择: PERMISSIVE:选择PERMISSIVE模式时,如果某一列数据类型与目标表列数据类型不匹配,则该行数据将被设置为null。 DROPMALFORMED:选择DROPMALFORMED模式时,如果某一列数据类型与目标表列数据类型不匹配,则不导入该行数据。 FAILFAST:选择FAILFAST模式时,如果某一列类型不匹配,则会抛出异常,导入失败。 模式设置可通过在OPTIONS中添加 OPTIONS('MODE'='PERMISSIVE')进行设置。 DELIMITER:可以在导入命令中指定分隔符,默认值为“,”。 配置项为OPTIONS('DELIMITER'=',')。 对于CSV数据,支持如下所述分隔符: 制表符tab,例如:'DELIMITER'='\t'。 任意的二进制字符,例如:'DELIMITER'='\u0001(^A)'。 单引号('),单引号必须在双引号(" ")内。例如:'DELIMITER'= "'"。 DLI表还支持\001(^A)和\017(^Q),例如:'DELIMITER'='\001(^A)','DELIMITER'='\017(^Q)'。 QUOTECHAR:可以在导入命令中指定引号字符。默认值为"。 配置项为OPTIONS('QUOTECHAR'='"') COMMENTCHAR:可以在导入命令中指定注释字符。在导入操作期间,如果在行的开头遇到注释字符,那么该行将被视为注释,并且不会被导入。默认值为#。 配置项为OPTIONS('COMMENTCHAR'='#') HEADER:用来表示源文件是否有表头。取值范围为“true”和“false”。“true”表示有表头,“false”表示无表头。默认值为“false”。如果没有表头,可以在导入命令中指定FILEHEADER参数提供表头。 配置项为OPTIONS('HEADER'='true') FILEHEADER:如果源文件中没有表头,可在LOAD DATA命令中提供表头。 OPTIONS('FILEHEADER'='column1,column2') ESCAPECHAR:如果用户想在CSV上对Escape字符进行严格验证,可以提供Escape字符。默认值为“\\”。 配置项为OPTIONS('ESCAPECHAR'='\\') 如果在CSV数据中输入ESCAPECHAR,该ESCAPECHAR必须在双引号(" ")内。例如:"a\b"。 MAXCOLUMNS:该可选参数指定了在一行中,CSV解析器解析的最大列数。 配置项为OPTIONS('MAXCOLUMNS'='400') 表2 MAXCOLUMNS 可选参数名称 默认值 最大值 MAXCOLUMNS 2000 20000 设置MAXCOLUMNS Option的值后,导入数据会对executor的内存有要求,所以导入数据可能会由于executor内存不足而失败。 DATEFORMAT:指定列的日期格式。 OPTIONS('DATEFORMAT'='dateFormat') 默认值为:yyyy-MM-dd。 日期格式由Java的日期模式字符串指定。在Java的日期和时间模式字符串中,未加单引号(')的字符'A' 到'Z' 和'a' 到'z' 被解释为模式字符,用来表示日期或时间字符串元素。若模式字符使用单引号 (') 引起来,则在解析时只进行文本匹配,而不进行解析。Java模式字符定义请参见表3。 表3 日期及时间模式字符定义 模式字符 日期或时间元素 示例 G 纪元标识符 AD y 年份 1996; 96 M 月份 July; Jul; 07 w 年中的周数 27(该年的第27周) W 月中的周数 2(该月的第2周) D 年中的天数 189(该年的第189天) d 月中的天数 10(该月的第10天) u 星期中的天数 1 = 星期一, ..., 7 = 星期日 a am/pm 标记 pm(下午时) H 24小时数(0-23) 2 h 12小时数(1-12) 12 m 分钟数 30 s 秒数 55 S 毫秒数 978 z 时区 Pacific Standard Time; PST; GMT-08:00 TIMESTAMPFORMAT:指定列的时间戳格式。 OPTIONS('TIMESTAMPFORMAT'='timestampFormat') 默认值为:yyyy-MM-dd HH:mm:ss。 时间戳格式由Java的时间模式字符串指定。Java时间模式字符串定义详见表3 日期及时间模式字符定义。
  • 参数说明 表1 参数描述 参数 描述 DLI_TABLE 已创建跨源连接的DLI表名称。 DLI_TEST 为包含待查询数据的表。 field1,field2...,field 表“DLI_TEST”中的列值,需要匹配表“DLI_TABLE”的列值和类型。 where_condition 查询过滤条件。 num 对查询结果进行限制,num参数仅支持INT类型。 values_row 想要插入到表中的值,列与列之间用逗号分隔。
  • 语法格式 将SELECT查询结果插入到表中: 1 2 3 4 5 6 7 INSERT INTO DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...; 将某条数据插入到表中: 1 2 INSERT INTO DLI_TABLE VALUES values_row [, values_row ...]; 覆盖插入数据 1 2 3 4 5 6 7 INSERT OVERWRITE TABLE DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...;
  • 语法格式 1 2 3 4 5 SELECT window_func(args) OVER ([PARTITION BY col_name, col_name, ...] [ORDER BY col_name, col_name, ...] [ROWS | RANGE BETWEEN (CURRENT ROW | (UNBOUNDED |[num]) PRECEDING) AND (CURRENT ROW | ( UNBOUNDED | [num]) FOLLOWING)]);
  • 关键字 PARTITION BY:可以用一个或多个键分区。和GROUP BY子句类似,PARTITION BY将表按分区键分区,每个分区是一个窗口,窗口函数作用于各个分区。单表分区数最多允许7000个。 ORDER BY:决定窗口函数求值的顺序。可以用一个或多个键排序。通过ASC或DESC决定升序或降序。窗口由WINDOW子句指定。如果不指定,默认窗口等同于ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,即窗口从表或分区(如果OVER子句中用PARTITION BY分区)的初始处到当前行。 WINDOW:通过指定一个行区间来定义窗口。 CURRENT ROW:表示当前行。 num PRECEDING:定义窗口的下限,即窗口从当前行向前数num行处开始。 UNBOUNDED PRECEDING:表示窗口没有下限。 num FOLLOWING:定义窗口的上限,即窗口从当前行向后数num行处结束。 UNBOUNDED FOLLOWING:表示窗口没有上限。 ROWS BETWEEN…和RANGE BETWEEN…的区别: ROW为物理窗口,即根据ORDER BY子句排序后,取前N行及后N行的数据计算(与当前行的值无关,只与排序后的行号相关)。 RANGE为逻辑窗口,即指定当前行对应值的范围取值,列数不固定,只要行值在范围内,对应列都包含在内。 窗口有以下多种场景,如 窗口只包含当前行。 1 ROWS BETWEEN CURRENT ROW AND CURRENT ROW 窗口从当前行向前数3行开始,到当前行向后数5行结束。 1 ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING 窗口从表或分区的开头开始,到当前行结束。 1 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 窗口从当前行开始,到表或分区的结尾结束。 1 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING 窗口从表或分区的开头开始,到表或分区的结尾结束。 1 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
  • 语法格式 将SELECT查询结果插入到表中: 1 2 3 4 5 6 7 INSERT INTO DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...; 将某条数据插入到表中: 1 2 INSERT INTO DLI_TABLE VALUES values_row [, values_row ...]; 覆盖插入数据 1 2 3 4 5 6 7 INSERT OVERWRITE TABLE DLI_TABLE SELECT field1,field2... [FROM DLI_TEST] [WHERE where_condition] [LIMIT num] [GROUP BY field] [ORDER BY field] ...;
  • 参数说明 表1 参数描述 参数 描述 DLI_TABLE 已创建跨源连接的DLI表名称。 DLI_TEST 为包含待查询数据的表。 field1,field2...,field 表“DLI_TEST”中的列值,需要匹配表“DLI_TABLE”的列值和类型。 where_condition 查询过滤条件。 num 对查询结果进行限制,num参数仅支持INT类型。 values_row 想要插入到表中的值,列与列之间用逗号分隔。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,"kafka"表示输出到Kafka中。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_topic 是 写入的topic encode 是 数据编码格式,可选为“csv”、“json”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 filed_delimiter 否 当encode为csv时,用于指定各字段分隔符,默认为逗号。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "kafka", kafka_bootstrap_servers = "", kafka_topic = "", encode = "json" )
  • 示例 将流kafka_sink的数据输出到Kafka中。 1 2 3 4 5 6 7 CREATE SINK STREAM kafka_sink (name STRING) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "json" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,rds表示输出到关系型数据库中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 数据库连接地址,格式为:"{database_type}://ip:port/database" 目前支持两种数据库连接:MySQL和PostgreSQL MySQL: 'mysql://ip:port/database' PostgreSQL: 'postgresql://ip:port/database' 说明: 将数据库连接地址设置为DWS数据库地址,即可创建DWS维表。DWS数据库版本大于8.1.0后,无法用开源的postgresql驱动连接,需要用gaussdb驱动进行连接。 table_name 是 用于查询数据的数据库表名。 db_columns 否 流属性和数据库表的字段对应关系。当sink流中流属性字段和数据库表中的流属性字段不完全匹配时,该参数必配。格式为“dbtable_attr1,dbtable_attr2,dbtable_attr3“。 cache_max_num 否 表示最大缓存的查询结果数,默认值为32768。 cache_time 否 表示数据库查询结果在内存中缓存的最大时间。单位为毫秒,默认值为10000,当值为0时表示不缓存。
  • 示例 RDS表用于与输入流连接。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "", channel = "dliinput", encode = "csv", field_delimiter = "," ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "******", db_url = "postgresql://192.168.0.0:2000/test1", table_name = "car" ); CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "", channel = "dlioutput", partition_key = "car_owner", encode = "csv", field_delimiter = "," ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info as b on a.car_id = b.car_id; 将数据库连接地址设置为DWS数据库地址,即可创建DWS维表。DWS数据库版本大于8.1.0后,无法用开源的postgresql驱动连接,需要用gaussdb驱动进行连接。
  • 前提条件 请务必确保您的账户下已在关系型数据库(RDS)里创建了PostgreSQL或MySQL类型的RDS实例。 如何创建RDS实例,请参见《关系型数据库快速入门》中“购买实例”章节。 该场景作业需要运行在DLI的独享队列上,因此要与RDS实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE table_id ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "", password = "", db_url = "", table_name = "" );
  • 关键字 表1 CREATE TABLE参数描述 参数 描述 url Oracle的连接地址。 Oracle url支持以下格式: 格式一:jdbc:oracle:thin:@host:port:SID,其中SID是oracle数据库的唯一标识符。 格式二:jdbc:oracle:thin:@//host:port/service_name;这种方式是Oracle推荐的,对于集群来说,每个节点的SID可能不一致,但ServiceName是一致的,包含所有节点。 driver Oracle驱动类名: oracle.jdbc.driver.OracleDriver dbtable 指定在Oracle关联的表名,或者"用户名.表名",例如:public.table_name。 user Oracle用户名。 password Oracle用户名密码。 resource Oracle驱动包的OBS路径。 例如:obs://rest-authinfo/tools/oracle/driver/ojdbc6.jar resource中定义的driver jar包如果被更新,需要重启队列,才会生效。
共100000条