华为云用户手册

  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "mrs_hbase", region = "", cluster_address = "", table_name = "", table_columns = "", illegal_data_table = "", batch_insert_data_num = "", action = "" )
  • 示例 将流disSink的数据输出到DIS中。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table disSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' );
  • 功能描述 DLI将Flink作业的输出数据写入数据接入服务(DIS)中。适用于将数据过滤后导入DIS通道,进行后续处理的场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,“dis”表示数据源为数据接入服务,必须为dis。 connector.region 是 数据所在的DIS区域。 connector.ak 否 访问密钥ID(Access Key ID),需与sk同时设置 connector.sk 否 Secret Access Key,需与ak同时设置 connector.channel 是 数据所在的DIS通道名称。 format.type 是 数据编码格式,可选为“csv”、“json” format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.partition-key 否 数据输出分组主键,多个主键用逗号分隔。当该参数没有配置的时候则随机派发。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table disSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = '', 'format.type' = '' );
  • 语法格式 1 2 3 4 CENTROID(ARRAY[field_names], distance_threshold):加入当前数据点后,该数据点所属分类中心。 CLUSTER_CENTROIDS(ARRAY[field_names], distance_threshold):加入当前数据点后,所有分类中心。 ALL_POINTS_OF_CLUSTER(ARRAY[field_names], distance_threshold):加入当前数据点后,该分类所有数据点。 ALL_CLUSTERS_POINTS(ARRAY[field_names], distance_threshold):加入当前数据点后,所有分类对应的所有数据点。 聚类算法可以应用在无界流中。
  • 示例 分别使用四种函数结合窗口来实时计算聚类的相关信息。 1 2 3 4 5 6 7 8 9 10 SELECT CENTROID(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE UNBOUNDED PRECEDING) AS centroid, CLUSTER_CENTROIDS(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE UNBOUNDED PRECEDING) AS centroids FROM MyTable SELECT CENTROID(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '60' MINUTE PRECEDING AND CURRENT ROW) AS centroidCE, ALL_POINTS_OF_CLUSTER(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '60' MINUTE PRECEDING AND CURRENT ROW) AS itemList, ALL_CLUSTERS_POINTS(ARRAY[c,e], 1.0) OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '60' MINUTE PRECEDING AND CURRENT ROW) AS listoflistofpoints FROM MyTable
  • 示例 该示例是从kafka数据源中读取数据,并以insert模式写入DWS结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接DWS数据库,在DWS中创建相应的表,表名为dws_order,SQL语句参考如下: create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作业数据源,将DWS作为结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DWSAddress:DWSPort/DWSdbName', 'table-name' = 'dws_order', 'driver' = 'org.postgresql.Driver', 'username' = 'DWSUserName', 'password' = 'DWSPassword', 'write.mode' = 'insert' ); insert into dwsSink select * from kafkaSource; 连接Kafka集群,向Kafka中输入以下测试数据。 {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 从DWS中使用如下SQL语句查看数据结果。 select * from dws_order 数据结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100.0 100.0 2021-03-24 10:02:03 0001 Alice 330106
  • 功能描述 DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 注意事项 若需要使用upsert模式,则必须在DWS结果表和该结果表连接的DWS表都定义主键。 若DWS在不同的schema中存在相同名称的表,则在flink opensource sql中需要指定相应的schema。 提交Flink作业前,建议勾选“保存作业日志”参数,在OBS桶选项中选择日志保存的位置,方便后续作业提交失败或运行异常时,查看日志并分析问题原因。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。 例如,使用gsjdbc4驱动连接、upsert模式写入数据到DWS中。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDatabase', 'table-name' = 'car_info', 'username' = 'DwsUserName', 'password' = 'DwsPasswrod', 'write.mode' = 'upsert' ); 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例创建DWS结果表。 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector' = 'gaussdb', 'table-name' = 'ads_game_sdk_base\".\"test', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDatabase', 'username' = 'DwsUserName', 'password' = 'DwsPasswrod', 'write.mode' = 'upsert' );
  • 语法格式 DWS结果表中不允许指定所有属性为PRIMARY KEY。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,这里是'gaussdb' url 是 无 String jdbc连接地址 。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考常见问题说明。 driver 否 org.postgresql.Driver String jdbc连接驱动,默认为: org.postgresql.Driver。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 username 否 无 String DWS数据库认证用户名,需要和'password'一起配置 password 否 无 String DWS数据库认证密码,需要和'username'一起配置 write.mode 否 无 String 数据写入模式,支持: copy, insert以及upsert三种。默认值为upsert。 该参数与'primary key'配合使用。 未配置'primary key'时,支持copy及insert两种模式追加写入。 配置'primary key',支持copy、upsert以及insert三种模式更新写入。 注意:由于dws不支持更新分布列,因而配置的更新主键必须包含dws表中定义的所有分布列。 sink.buffer-flush.max-rows 否 100 Integer 每次写入请求缓存的最大行数。 它能提升写入数据的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 sink.buffer-flush.interval 否 1s Duration 刷新缓存的间隔,在这段时间内以异步线程刷新数据。 它能提升写入数据库的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 注意:"sink.buffer-flush.max-size" 和 "sink.buffer-flush.max-rows" 同时设置为 "0",并设置刷新缓存的间隔,则以完整的异步处理方式刷新缓存。 格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 sink.max-retries 否 3 Integer 写入最大重试次数。 write.escape-string-value 否 false Boolean 是否对string类型值进行转义。该参数仅用于write.mode为copy模式下。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置置账号和密码。
  • 前提条件 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 1 2 3 4 5 6 7 8 9 create table clickhouseSink ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'clickhouse', 'connector.url' = '', 'connector.table' = '' );
  • 注意事项 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0,且勿开启kerberos认证。 Flink SQL语句中不能定义主键。同时不能使用任何产生主键的语法,例如insert into clickhouseSink select id, cout(*) from sourceName group by id。 Flink中支持字段类型范围为:string、tinyint、smallint、int、long、float、double、date、timestamp、decimal以及Array。 其中Array中的数据类型仅支持int、bigint、string、float、double。
  • Join表函数(UDTF) 功能描述 将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。 注意事项 针对横向表的左外部联接当前仅支持文本常量 TRUE 作为谓词。 示例 若表函数返回了空结果,左表(outer)的行将会被删除 SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; 若表函数返回了空结果,将会保留相对应的外部行并用空值填充 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE;
  • Join Temporal Table Function 功能描述 注意事项 目前仅支持在 Temporal Tables 上的 inner join 示例 假如Rates是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达: SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
  • 示例 示例一: 该示例将car_info数据,以buyday字段为分区字段,parquet为编码格式,转储数据到OBS。 1 2 3 4 5 6 7 8 9 10 11 12 13 create sink stream car_infos ( carId string, carOwner string, average_speed double, buyday string ) partitioned by (buyday) with ( type = "filesystem", file.path = "obs://obs-sink/car_infos", encode = "parquet", ak = "{{myAk}}", sk = "{{mySk}}" ); 数据最终在OBS中的存储目录结构为:obs://obs-sink/car_infos/buyday=xx/part-x-x。 数据生成后,可通过如下SQL语句建立OBS分区表,用于后续批处理: 创建OBS分区表。 1 2 3 4 5 6 7 8 create table car_infos ( carId string, carOwner string, average_speed double ) partitioned by (buyday string) stored as parquet location 'obs://obs-sink/car_infos'; 从关联OBS路径中恢复分区信息。 1 alter table car_infos recover partitions; 示例二 该示例将car_info数据,以buyday字段为分区字段,csv为编码格式,转储数据到HDFS。 1 2 3 4 5 6 7 8 9 10 11 12 create sink stream car_infos ( carId string, carOwner string, average_speed double, buyday string ) partitioned by (buyday) with ( type = "filesystem", file.path = "hdfs://node-master1sYAx:9820/user/car_infos", encode = "csv", field_delimiter = "," ); 数据最终在HDFS中的存储目录结构为:/user/car_infos/buyday=xx/part-x-x。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出流类型。“type”为“filesystem”,表示输出数据到文件系统。 file.path 是 输出目录,格式为: schema://file.path。 当前schame只支持obs和hdfs。 当schema为obs时,表示输出到对象存储服务OBS。 当schema为hdfs时,表示输出到HDFS。HDFS需要配置代理用户,具体请参考HDFS代理用户配置。 示例:hdfs://node-master1sYAx:9820/user/car_infos,其中node-master1sYAx:9820为MRS集群NameNode所在节点信息。 encode 是 输出数据编码格式,当前支持“parquet”格式和“csv”格式。 当schema为obs时,输出数据编码格式仅支持“parquet”格式。 当schema为hdfs时,输出数据编码格式支持“parquet”格式和“csv”格式。 ak 否 输出到OBS时该参数必填。用于访问OBS认证的accessKey,可使用全局变量,屏蔽敏感信息。 关于全局变量在控制台上的使用方法,请参考《数据湖探索用户指南》。 sk 否 输出到OBS时该参数必填。用于访问OBS认证的secretKey,可使用全局变量,屏蔽敏感信息。 关于全局变量在控制台上的使用方法,请参考《数据湖探索用户指南》。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的MRS集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 field_delimiter 否 属性分隔符。 当编码格式为“csv”时,需要设置属性分隔符,用户可以自定义,如:“,”。
  • 功能描述 创建sink流将数据输出到分布式文件系统(HDFS)或者对象存储服务(OBS)等文件系统。数据生成后,可直接对生成的目录创建非DLI表,通过DLI SQL进行下一步处理分析,并且输出数据目录支持分区表结构。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。 OBS的更多信息,请参见《对象存储服务控制台指南》。
  • 注意事项 使用文件系统输出流的Flink作业必须开启checkpoint,保证作业的一致性。 为了避免数据丢失或者数据被覆盖,开启作业异常自动重启或者手动重启,需要配置为“从checkpoint恢复”。 checkpoint间隔设置需在输出文件实时性、文件大小和恢复时长之间进行权衡,比如10分钟。 checkpoint支持如下两种模式: AtLeastOnce:事件至少被处理一次。 ExactlyOnce:事件仅被处理一次。 使用文件系统输出流写入数据到OBS时,应避免多个作业写同一个目录的情况。 OBS对象存储桶的默认行为为覆盖写,可能导致数据丢失。 OBS并行文件系统桶的默认行为追加写,可能导致数据混淆。 因为以上OBS桶类型行为的区别,为避免作业异常重启可能导致的数据异常问题,请根据您的业务需求选择OBS桶类型。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) [PARTITIONED BY (attr_name (',' attr_name)*] WITH ( type = "filesystem", file.path = "obs://bucket/xx", encode = "parquet", ak = "", sk = "" );
  • HDFS代理用户配置 登录MRS管理页面。 选择MRS的HDFS Namenode配置,在“自定义”中添加配置参数。 图1 HDFS服务配置 其中,core-site值名称“hadoop.proxyuser.myname.hosts”和“hadoop.proxyuser.myname.groups”中的“myname”为传入的krb认证用户名称。 需要保证写入HDFS数据路径权限为777。 配置完成后,单击“保存配置”进行保存。
  • Grouping sets, Rollup, Cube 功能描述 GROUPING SETS 的 GROUP BY 子句可以生成一个等效于由多个简单 GROUP BY 子句的 UNION ALL 生成的结果集,并且其效率比 GROUP BY 要高。 ROLLUP与CUBE按一定的规则产生多种分组,然后按各种分组统计数据。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 Rollup生成的结果集显示了所选列中值的某一层次结构的聚合。 语法格式 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY groupingItem] 语法说明 groupingItem:是Grouping sets(columnName [, columnName]*)、Rollup(columnName [, columnName]*)、Cube(columnName [, columnName]*) 注意事项 无 示例 分别产生基于user和product的结果 INSERT INTO temp SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product));
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • 表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • CASE表达式 语法格式 1 2 3 CASE value WHEN value1 [, value11 ]* THEN result1 [ WHEN valueN [, valueN1 ]* THEN resultN ]* [ ELSE resultZ ] END 或 1 2 3 CASE WHEN condition1 THEN result1 [ WHEN conditionN THEN resultN ]* [ ELSE resultZ ] END 语法说明 当value值为value1则返回result1,都不满足则返回resultZ,若没有else语句,则返回null。 当condition1为true时返回result1,都不满足则返回resultZ,若没有else语句,则返回null。 注意事项 所有result的类型都必须一致。 所有condition类型都必须是布尔类型。 当没有满足的分支时,若指定else语句,则返回else的值,若没有else语句,则返回null。 示例 当units等于5时返回1,否则返回0。 示例1: 1 insert into temp SELECT CASE units WHEN 5 THEN 1 ELSE 0 END FROM Orders; 示例2: 1 insert into temp SELECT CASE WHEN units = 5 THEN 1 ELSE 0 END FROM Orders;
  • 示例 CSV格式转储。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "dws", region = "xxx", ak = "", sk = "", encode = "csv", field_delimiter = "\u0006\u0006\u0002", quote = "\u0007", obs_dir = "dli-append-2/dws", username = "", password = "", db_url = "192.168.1.12:8000/test1", table_name = "table1", max_record_num_per_file = "100", dump_interval = "10" ); ORC格式转储。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "dws", region = "xxx", ak = "", sk = "", encode = "orc", db_obs_server = "obs_server", obs_dir = "dli-append-2/dws", username = "", password = "", db_url = "192.168.1.12:8000/test1", table_name = "table1", max_record_num_per_file = "100", dump_interval = "10" );
  • 功能描述 创建sink流将Flink作业数据通过OBS转储方式输出到数据仓库服务(DWS),即Flink作业数据先输出到OBS,然后再从OBS导入到DWS。如何导入OBS数据到DWS具体可参考《数据仓库服务数据库开发指南》中“从OBS并行导入数据到集群”章节。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dws表示输出到数据仓库服务中。 region 是 数据仓库服务所在区域。 ak 是 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 是 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 encode 是 编码方式。当前支持csv和orc两种方式。 field_delimiter 否 属性分隔符。当编码方式为csv时需要配置,建议尽量用不可见字符作为分隔符,如\u0006\u0002。 quote 否 单字节,建议使用不可见字符,如\u0007。 db_obs_server 否 已在数据库中创建的外部服务器,如obs_server。 如何创建外部服务器,具体操作步骤可参考《数据仓库服务数据库开发指南》中创建外部服务器章节。 如果编码方式为orc格式时需指定该参数。 obs_dir 是 中间文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/subdir。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 数据库连接地址。格式为/ip:port/database,如 “192.168.1.21:8000/test1”。 table_name 是 数据表名,若表不存在,则自动创建。 max_record_num_per_file 是 每个文件最多存储多少条记录。当文件记录数少于最大值时,该文件会延迟一个转储周期输出。 dump_interval 是 转储周期,单位为秒。 delete_obs_temp_file 否 是否要删除obs上的临时文件,默认为“true”,若设置为“false”,则不会删除obs上的文件,需用户自己清理。 max_dump_file_num 否 执行一次转储操作时最多转储多少文件。 当本次转储操作发现文件数小于最大值,则会延迟一个转储周期输出。
共100000条