华为云用户手册

  • 为MRS集群配置委托 配置存算分离支持在新建集群中配置委托实现,也可以通过为已有集群绑定委托实现。本示例以为已有集群配置委托为例介绍。 登录MRS控制台,在导航栏选择“现有集群”。 单击集群名称,进入集群详情页面。 在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“单击同步”进行IAM用户同步。 在集群详情页的“概览”页签,单击“委托”右侧的“管理委托”选择创建ECS委托的委托并单击“确定”进行绑定,或单击“新建委托”进入IAM控制台进行创建后再在此处进行绑定。 图3 绑定委托
  • Hive访问OBS文件系统 用root用户登录集群Master节点,具体请参见登录集群节点。 验证Hive访问OBS。 用root用户登录集群Master节点,执行如下命令: cd /opt/Bigdata/client source bigdata_env source Hive/component_env 查看文件系统mrs-demo01下面的文件列表。 hadoop fs -ls obs://mrs-demo01/ 返回文件列表即表示访问OBS成功。 图5 查看mrs-demo01下的文件列表 执行以下命令进行用户认证(普通模式即未开启Kerberos认证无需执行此步骤)。 kinit hive 输入用户hive密码,默认密码为Hive@123,第一次使用需要修改密码。 执行Hive组件的客户端命令。 beeline 在beeline中直接使用OBS的目录进行访问。例如,执行如下命令创建Hive表并指定数据存储在mrs-demo01文件系统的test_demo01目录中。 create table test_demo01(name string) location "obs://mrs-demo01/test_demo01"; 执行如下命令查询所有表,返回结果中存在表test_demo01,即表示访问OBS成功。 show tables; 图6 查看是否存在表test_demo01 查看表的Location。 show create table test_demo01; 查看表的Location是否为“obs://OBS桶名/”开头。 图7 查看表test_demo01的Location 写入数据。 insert into test_demo01 values('mm'),('ww'),('ww'); 执行select * from test_demo01;查询是否写入成功。 图8 查看表test_demo01中的数据 执行命令!q退出beeline客户端。 重新登录OBS控制台。 单击“并行文件系统”, 选择创建的文件系统名称。 单击“文件”,查看是否存在创建的数据。 图9 查看数据
  • 应用场景 单实例多并发适合函数处理逻辑中有较长时间等待下游服务响应的场景,也适合函数逻辑中初始化时间较长的场景,具备以下优势: 降低冷启动概率,优化函数处理时延:例如并发三个请求,不配置单实例多并发,FunctionGraph默认启动三个实例处理请求,会有三次冷启动。若配置了单实例支持三并发,三个并发请求,FunctionGraph只启动一个实例处理请求,减少了两次冷启动。 减少总请求处理时长,节省费用:单实例单并发下,多个请求的总处理时长为每个请求的处理时长相加。单实例多并发下,同一个实例对并发的多个请求的计费时间为,从第一个请求开始处理计时,到最后一个并发的请求处理结束记一次时长费用。
  • 单实例单并发与单实例多并发的对比 当一个函数执行需要花费5秒,若配置为单实例单并发,三次函数调用请求分别在三个函数实例执行,总执行时长为15秒。 若配置为单实例多并发,设置单实例并发数为5,即单个实例最多支持5个并发请求,如果有三次函数调用请求,将在一个实例内并发处理,总执行时间为5秒。 单实例并发数大于1,在您设置的“单函数最大实例数”范围内,超过单实例并发处理能力时会自动扩容新实例。 表1 单并发与多并发对比 对比项 单实例单并发 单实例多并发 日志打印 - Node.js Runtime使用console.info()函数,Python Runtime使用print()函数,Java Runtime使用System.out.println()函数打印日志,该方式会把当前请求的Request ID包含在日志内容中。当多请求在同一个实例并发处理时,当前请求可能有很多个,继续使用这些函数打印日志会导致Request ID错乱。此时应该使用context.getLogger(),获取一个日志输出对象,通过这个日志输出对象打印日志,例如Python Runtime: log = context.getLogger() log.info("test") 共享变量 不涉及 单实例多并发处理时,修改共享变量会导致错误。这要求您在编写函数时,对于非线程安全的变量修改要进行互斥保护。 监控指标 按实际情况进行监控。 相同负载下,函数的实例数明显减少。 流控错误 不涉及 太多请求时,body中的errorcode为“FSS.0429” ,响应头中的status为429 ,错误信息提示:Your request has been controlled by overload sdk, please retry later。
  • 网络限制 根据对网络的不同设置,函数有以下网络访问能力,您可按需设置。 网络配置 说明 允许函数访问公网 当前函数默认的公网NAT访问带宽在多个租户间共享,带宽小,仅适合小量调用的测试业务场景使用;如果对带宽、性能、可靠性有高要求的生产业务场景,需开启函数访问VPC,在VPC内添加公网NAT网关并绑定EIP,分配独占的外网访问带宽。 允许函数访问VPC内资源 开启“允许函数访问VPC内资源”时,函数将禁用默认网卡并使用VPC绑定的网卡,是否允许公网访问由配置的VPC决定,开关“允许函数访问公网”将不生效。 仅允许指定的VPC调用函数 开启“仅允许指定的VPC调用函数”时,将仅允许通过指定的VPC调用函数,并禁止通过公网调用函数。
  • 参数说明 BUILD DEFERRED | IMMEDIATE IMMEDIATE表示创建物化视图时即包含最新数据。 DEFERRED表示创建物化视图时需要等到第一次refresh时才会包含数据。 REFRESH 指定物化视图的刷新方式。 创建物化视图后,物化视图中的数据只反映创建时刻基表的状态。当基表中的数据发生变化时,需要通过刷新物化视图(REFRESH MATERIALIZED VIEW)更新物化视图中的数据。 目前只支持COMPLETE全量刷新这一种刷新方式。执行物化视图定义的查询语句并更新物化视图。 刷新触发方式。 ON DEMAND:手动按需刷新。 START WITH (timestamptz) | EVERY (interval):定时刷新。START WITH指定首次刷新时间,EVERY 指定刷新间隔,根据指定的时间定时刷新。 ENABLE | DISABLE QUERY REWRITE 是否支持查询重写。默认不支持。 在指定ENABLE QUERY REWRITE时,需要设置GUC参数 mv_rewrite_rule才能启用物化视图查询重写功能。 查询重写是指在对基表进行查询时, 如果基表上创建有物化视图,数据库系统自动判断是否可以使用物化视图中的预计算结果处理查询。 如果可以使用某个物化视图,会直接从该物化视图读取预计算结果,起到加速查询的作用。 WITH ( { storage_parameter = value } [, ... ] ) ORIENTATION 指定表数据的存储方式,即行存方式、列存方式,该参数设置成功后就不再支持修改。 取值范围: ROW,表示表的数据将以行式存储。 行存储适合于OLTP业务,此类型的表上交互事务比较多,一次交互会涉及表中的多个列,用行存查询效率较高。 COLUMN,表示表的数据将以列式存储。 列存储适合于数据仓库业务,此类型的表上会做大量的汇聚计算,且涉及的列操作较少。 默认值:由GUC参数default_orientation决定。 row表示创建行存表。 column表示创建列存表。 column enabledelta表示创建开启delta表的列存表。 物化视图不支持的存储类型:分区表、h-store表、外表、时序表。 enable_foreign_table_query_rewrite 指定是否允许包含外表的物化视图进行查询重写,需要与ENABLE QUERY REWRITE一起使用。 外表数据有变化,物化视图无法感知。如果需要对包含外表的物化视图使用查询重写功能,需要指定此选项。 取值范围: on,允许包含外表的物化视图进行查询重写。 off,不允许包含外表的物化视图进行查询重写。 默认值:off DISTRIBUTE BY 指定表如何在节点之间分布或者复制。 取值范围: REPLICATION:表的每一行存在所有数据节点(DN)中,即每个数据节点都有完整的表数据。 ROUNDROBIN:表的每一行被轮番地发送给各个DN,因此数据会被均匀地分布在各个DN中。(ROUNDROBIN仅8.1.2及以上版本支持) HASH:对指定的列进行Hash,通过映射,把数据分布到指定DN。 默认值:由参数default_distribution_mode决定。 AS query 基于query的结果创建物化视图。
  • 示例 创建基表,并向基表插入数据: 1 2 CREATE TABLE t1 (a int, b int) DISTRIBUTE BY HASH(a); INSERT INTO t1 SELECT x,x FROM generate_series(1,10) x; 创建默认BUILD IMMEDIATE方式的物化视图: 1 CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM t1; 创建指定列存方式的物化视图: 1 CREATE MATERIALIZED VIEW mv2 WITH(orientation = column) AS SELECT * FROM t1; 创建手动按需刷新的物化视图: 1 CREATE MATERIALIZED VIEW mv3 BUILD DEFERRED REFRESH ON DEMAND AS SELECT * FROM t1; 创建指定刷新时间的物化视图: 1 CREATE MATERIALIZED VIEW mv4 BUILD DEFERRED REFRESH START WITH(trunc(sysdate)) EVERY (interval ‘1 day’) AS SELECT * FROM t1;
  • 注意事项 物化视图的基表可以是行存表、列存表、hstore表、分区表或者指定某个分区、外表,不支持包含临时表(包括全局临时表、volatile临时表和普通临时表),不支持冷热表、不支持对自动分区表指定分区。 物化视图禁止INSERT/UPDATE/MERGE INTO/DELETE对数据进行修改。 物化视图执行一次将结果并保存,每次查询结果是一致的。BUILD IMMEDIATE或REFRESH后,物化视图可以查询出正确结果。 物化视图不能通过语法指定Node Group。支持物化视图的基表指定Node Group创建,物化视图会继承基表NodeGroup信息创建,需要多个基表的NodeGroup相同。 创建物化视图时需要schema的CREATE权限和基表或列的SELECT权限。 查询物化视图需要物化视图的SELECT权限。 刷新需要物化视图的INSERT和基表或列的SELECT权限。 物化视图支持ANALYZE/VACUUM/ALTER/DROP等细粒度权限。 物化视图支持with grant option的权限传递操作。 物化视图不支持更高级别的安全控制,如果基表存在行级访问控制、脱敏策略或owner为私有用户等限制SELECT权限的场景,则禁止创建物化视图;如果已存在物化视图,基表增加RLS、脱敏策略或修改owner为私有用户,则物化视图可以执行查询,但无法刷新。
  • 语法格式 1 2 3 4 5 6 7 CREATE MATERIALIZED VIEW [view_name] [ ( column_name [, ...] ) ] {{ BUILD { DEFERRED | IMMEDIATE }| { REFRESH { COMPLETE }{ ON DEMAND }|{ START WITH (timestmaptz) | EVERY (interval) }…}…| { ENABLE | DISABLE } QUERY REWRITE}| { WITH ( { storage_parameter = value } [, ... ] ) }| { DISTRIBUTE BY { HASH (column [ , ... ]) | ROUNDROBIN | REPLICATION }] AS query
  • 示例 清理当前数据库中的所有表: 1 VACUUM; 仅回收表tpcds.web_returns_p1分区P2的空间,不更新统计信息: 1 VACUUM FULL tpcds.web_returns_p1 PARTITION(P2); 回收表tpcds.web_returns_p1空间,并更新统计信息: 1 VACUUM FULL ANALYZE tpcds.web_returns_p1; 清理当前数据库中的所有表并收集查询优化器的统计信息: 1 VACUUM ANALYZE; 仅清理特定表reason: 1 VACUUM (VERBOSE, ANALYZE) tpcds.reason; 对列存表table_delta进行DELTAMERGE操作: 1 VACUUM DELTAMERGE tpcds.table_delta; 仅对列存表table_delta的分区p1进行DELTAMERGE操作: 1 VACUUM DELTAMERGE tpcds.table_delta partition(p1);
  • 参数说明 FULL 选择“FULL”清理,这样可以恢复更多的空间,但是需要耗时更多,并且在表上施加了排他锁。 FULL选项还可以带有COMPACT参数,该参数只针对HDFS表,指定该参数的VACUUM FULL操作性能要好于未指定该参数的VACUUM FULL操作。 COMPACT和PARTITION参数不能同时使用。 使用FULL参数会导致统计信息丢失,如果需要收集统计信息,请在VACUUM FULL语句中加上analyze关键字。 FREEZE 指定FREEZE相当于执行VACUUM时将vacuum_freeze_min_age参数设为0。 VERBOSE 为每个表打印一份详细的清理工作报告。 ANALYZE | ANALYSE 更新用于优化器的统计信息,以决定执行查询的最有效方法。 table_name 要清理的表的名称(可以有模式修饰)。 取值范围:要清理的表的名称。缺省时为当前数据库中的所有表。 column_name 要分析的具体的字段名称。 取值范围:要分析的具体的字段名称。缺省时为所有字段。 PARTITION HDFS表不支持PARTITION参数,PARTITION参数不能和COMPACT同时使用。 PARTITION参数和COMPACT同时使用会报错:COMPACT can not be used with PARTITION. partition_name 要清理的表的分区名称。缺省时为所有分区。 DELTAMERGE 只针对HDFS表,将HDFS表的delta table中的数据转移到主表存储上。对HDFS表而言,当delta表中数据量小于六万行,则不作迁移,只有在大于或者等于六万行数据时,将delta表中所有数据迁移到HDFS上,并通过truncate清理delta表的存储空间。 HDFSDIRECTORY 只针对HDFS表,删除HDFS表在HDFS存储上表目录下的空值分区目录。
  • 注意事项 如果没有参数,VACUUM处理当前数据库里用户拥有相应权限的每个表。如果参数指定了一个表,VACUUM只处理指定的那个表。 要对一个表进行VACUUM操作,通常用户必须是表的所有者,被授予了指定表VACUUM权限的用户或者被授予了gs_role_vacuum_any角色的用户,系统管理员默认拥有此权限。数据库的所有者允许对数据库中除了共享目录以外的所有表进行VACUUM操作(该限制意味着只有系统管理员才能真正对一个数据库进行VACUUM操作)。VACUUM命令会跳过那些用户没有权限的表进行垃圾回收操作。 VACUUM不能在事务块内执行。 建议生产数据库经常清理(至少每晚一次),以保证不断地删除失效的行。尤其是在增删了大量记录之后,对受影响的表执行VACUUM ANALYZE命令是一个很好的习惯。这样将更新系统目录为最近的更改,并且允许查询优化器在规划用户查询时有更好的选择。 不建议日常使用FULL选项,但是可以在特殊情况下使用。例如在用户删除了一个表的大部分行之后,希望从物理上缩小该表以减少磁盘空间占用。VACUUM FULL通常要比单纯的VACUUM收缩更多的表尺寸。如果执行此命令后所占用物理空间无变化(未减少),请确认是否有其他活跃事务(删除数据事务开始之前开始的事务,并在VACUUM FULL执行前未结束)存在,如果有等其他活跃事务退出进行重试。 VACUUM会导致I/O流量的大幅增加,这可能会影响其他活动会话的性能。因此,有时候会建议使用基于开销的VACUUM延迟特性。 如果指定了VERBOSE选项,VACUUM将打印处理过程中的信息,以表明当前正在处理的表。各种有关当前表的统计信息也会打印出来。 语法格式中含有带括号的选项列表时,选项可以以任何顺序写入。如果没有括号,则选项必须按语法显示的顺序给出。 VACUUM和VACUUM FULL时,会根据参数vacuum_defer_cleanup_age延迟清理行存表记录,即不会立即清理刚刚删除的元组。 VACUUM ANALYZE先执行一个VACUUM操作,然后给每个选定的表执行一个ANALYZE。对于日常维护脚本而言,这是一个很方便的组合。 简单的VACUUM(不带FULL选项)只是简单地回收空间并且令其可以再次使用。这种形式的命令可以和对表的普通读写并发操作,因为没有请求排他锁。VACUUM FULL执行更广泛的处理,包括跨块移动行,以便把表压缩到最少的磁盘块数目里。这种形式要慢许多并且在处理的时候需要在表上施加一个排他锁。 VACUUM列存表内部执行的操作包括三个:迁移delta表中的数据到主表、VACUUM主表的delta表、VACUUM主表的desc表。该操作不会回收delta表的存储空间,如果要回收delta表的冗余存储空间,需要对该列存表执行VACUUM DELTAMERGE。 VACUUM FULL系统表只能离线操作,在线VACUUM FULL系统表除了会锁表,还可能导致一些异常情况并产生报错。 如果有长查询访问系统表,此时执行VACUUM FULL,长查询可能会阻塞VACUUM FULL连接访问系统表,导致连接超时报错。 对列存分区表执行VACUUM FULL,会同时锁表和锁分区。 对不同的系统表执行VACUUM FULL并发操作可能会导致本地死锁。 VACUUM FULL操作分区表时与用户DML语句在如下特定场景有并发时可能发生分布式死锁,请谨慎操作: VACUUM FULL子分区与insert/update/delete主表。 VACUUM FULL全表与select全表/select子分区。 对表执行VACUUM FULL操作时会触发表重建(表重建过程中会先把数据转储到一个新的数据文件中,重建完成之后会删除原始文件),当表比较大时,重建会消耗较多的磁盘空间。当磁盘空间不足时,要谨慎对待大表VACUUM FULL操作,防止触发集群只读。
  • 语法格式 回收空间并更新统计信息,关键字顺序必须按语法显示的顺序给出。 1 2 VACUUM [ ( { FULL | FREEZE | VERBOSE | {ANALYZE | ANALYSE }} [,...] ) ] [ table_name [ (column_name [, ...] ) ] ] [ PARTITION ( partition_name ) ]; 仅回收空间,不更新统计信息。 1 VACUUM [ FULL [COMPACT] ] [ FREEZE ] [ VERBOSE ] [ table_name ] [ PARTITION ( partition_name ) ]; 回收空间并更新统计信息,且对关键字顺序有要求。 1 2 VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] { ANALYZE | ANALYSE } [ VERBOSE ] [ table_name [ (column_name [, ...] ) ] ] [ PARTITION ( partition_name ) ]; 针对HDFS表,将delta table中的数据转移到主表存储。(partition_name参数仅8.2.1.300及以上集群版本支持) 1 VACUUM DELTAMERGE [ table_name ][partition_name]; 针对HDFS表,删除HDFS表在HDFS存储上的空值分区目录。 1 VACUUM HDFSDIRECTORY [ table_name ];
  • 语法格式 ALTER MATERIALIZED VIEW [ IF EXISTS ] { materialized_view_name } [ ENABLE | DISABLE ] QUERY REWRITE; ALTER MATERIALIZED VIEW [ IF EXISTS ] { materialized_view_name } REFRESH [ COMPLETE ] [ ON DEMAND ] [ [ START WITH (timestamptz) ] | [ EVERY (interval) ] ]; ALTER MATERIALIZED VIEW { materialized_view_name } OWNER TO new_owner;
  • 参数说明 ENABLE | DISABLE QUERY REWRITE 是否对本物化视图启动查询重写。 在启用启用物化视图的查询重写后需刷新物化视图,保证物化视图数据是最新的。 REFRESH [ COMPLETE ] [ ON DEMAND ] [ [ START WITH (timestamptz) ] | [EVERY (interval)] ] 修改物化视图的刷新方式。 OWNER TO new_owner 修改物化视图的owner。
  • 示例 该示例是从GaussDB(DWS)数据源中读取数据,并写入到Print结果表中,其具体步骤参考如下: 在GaussDB(DWS)中创建相应的表,表名为dws_order,SQL语句参考如下: 1 2 3 4 5 6 7 8 9 10 create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR); 在GaussDB(DWS)中执行以下SQL语句,向dws_order表中插入数据: 1 2 3 4 5 6 7 8 9 10 11 12 insert into public.dws_order (order_id, order_channel, order_time, pay_amount, real_pay, pay_time, user_id, user_name, area_id) values ('202103241000000001', 'webShop', '2021-03-24 10:00:00', '100.00', '100.00', '2021-03-24 10:02:03', '0001', 'Alice', '330106'), ('202103251202020001', 'miniAppShop', '2021-03-25 12:02:02', '60.00', '60.00', '2021-03-25 12:03:00', '0002', 'Bob', '330110'); 执行Flink SQL: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CREATE TABLE dwsSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSIP:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'DWSPassword' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from dwsSource; 执行结果如下:
  • 语法格式 用DWS-Connector做源表时,DWS-Connector实现了SupportsLimitPushDown和SupportsFilterPushDown接口,支持将limit和where条件下推到数据库执行。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' );
  • 参数说明 表1 数据库配置 参数 说明 默认值 connector flink框架区分connector参数,固定为dws。 - url 数据库连接地址。 - username 配置连接用户。 - password 配置密码。 - tableName 对应dws表。 - 表2 查询参数 参数 说明 默认值 fetchSize jdbc statement中fetchSize参数,用于控制查询数据库返回条数。 1000 enablePushDown 开启条件下推:开启后limit 和where条件会下推到数据库执行。 true
  • 概述 dws-connector-flink是在dws-client的基础上对接Flink的一个工具,工具为对dws-client的包装,整体入库能力跟dws-client一致。dws-connector-flink为GaussDB(DWS)团队自研工具,后续将根据GaussDB(DWS)数据库持续优化。 dws-flink-connector的DWS-Connector只支持单并发查询存量数据,暂不支持并行读取。
  • 示例 该示例是从kafka数据源中读取数据,写入DWS结果表中,并指定攒批时间不超过10秒,每批数据最大30000条,其具体步骤如下: 在GaussDB(DWS)数据库中创建表public.dws_order: 1 2 3 4 5 6 7 8 9 10 11 create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR ); 消费Kafka中order_test topic中的数据作为数据源,public.dws_order作为结果表,Kafka数据为JSON格式,并且字段名称和数据库字段名称一一对应: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'DWSPassword', 'autoFlushMaxInterval' = '10s', 'autoFlushBatchSize' = '30000' ); insert into dwsSink select * from kafkaSource; 给Kafka写入测试数据: 1 {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 等10秒后在GaussDB(DWS)表中查询结果: 1 select * from dws_order 结果如下:
  • 格式语法 SQL语法格式可能在不同Flink环境下有细微差异,具体以事件环境格式为准,with后面的参数名称及参数值以此文档为准。 1 2 3 4 5 6 7 8 9 10 11 12 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' );
  • Flink SQL配置参数 Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。 表1 数据库配置 参数 说明 默认值 connector flink框架区分connector参数,固定为dws。 - url 数据库连接地址。 - username 配置连接用户。 - password 配置密码。 - tableName 对应dws表。 - 表2 连接配置 参数 说明 默认值 connectionSize 初始dws-client时的并发数量。 1 connectionMaxUseTimeSeconds 连接创建多少秒后强制释放(单位:秒)。 3600(一小时) connectionMaxIdleMs 连接最大空闲时间,超过后将释放(单位:毫秒)。 60000(一分钟) 表3 写入参数 参数 说明 默认值 conflictStrategy 有主键表数据写入时主键冲突策略: ignore:保持原数据,忽略更新数据。 update:用新数据中非主键列更新原数据中对应列。 replace:用新数据替换原数据。 说明: update和replace在全字段upsert时等效,在部分字段upsert时,replace相当于将数据中不包含的列设置为null。 update writeMode 入库方式: auto:系统自动选择。 copy_merge:当存在主键时使用copy方式入临时表,从临时表merge至目标表;无主键时直接copy至目标表。 copy_upsert:当存在主键时使用copy方式入临时表,从临时表upsert至目标表;无主键时直接copy至目标表。 upsert: 有主键用upsert sql入库;无主键用insert into 入库。 UPDATE:使用update where 语法更新数据,若原表无主键可选择指定uniqueKeys,指定字段不要求必须时唯一索引,但非唯一索引可能会影响性能。 COPY_UPDATE:数据先通过copy方式入库到临时表,通过临时表加速使用update from where方式更新目标数据。 UPDATE_AUTO:批量小于copyWriteBatchSize使用UPDATE,否则使用COPY_UPDATE。 auto maxFlushRetryTimes 在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。 3 autoFlushBatchSize 自动刷库的批大小(攒批大小)。 5000 autoFlushMaxInterval 自动刷库的最大间隔时间(攒批时长)。 5s copyWriteBatchSize 在“writeMode == auto”下,使用copy的批大小。 5000 ignoreDelete 忽略flink任务中的delete。 false (1.0.10前默认true) ignoreNullWhenUpdate 是否忽略flink中字段值为null的更新, 只有在“conflictStrategy == update”时有效。 false metadataCacheSeconds 系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。 180 copyMode copy入库格式: CSV:将数据拼接为CSV格式入库,该方式稳定,但性能略低。 DELIMITER:用分隔符将数据拼接,然后入库,该方式需要数据中不包含分隔符。 CSV createTempTableMode 创建临时表方式: AS LIKE AS numberAsEpochMsForDatetime 如果数据库为时间类型数据源为数字类型,是否将数据当成时间戳转换为对应时间类型。 false stringToDatetimeFormat 如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。 null sink.parallelism flink系统参数用于设置sink并发数量。 跟随上游算子 printDataPk 是否在connector接收到数据时打印数据主键,用于排查问题。 false ignoreUpdateBefore 忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。 true
  • 示例 从Kafka源表中读取数据,将GaussDB(DWS)表作为维表,并将二者生成的宽表信息写入print结果表中,其具体步骤如下: 连接GaussDB(DWS)数据库实例,在GaussDB(DWS)中创建相应的表,作为维表,表名为area_info,SQL语句如下: 1 2 3 4 5 6 7 8 9 create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR, PRIMARY KEY(area_id) ); 连接GaussDB(DWS)数据库实例,向GaussDB(DWS)维表area_info中插入测试数据,其语句如下: 1 2 3 4 5 6 7 insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); flink sql创建源表、结果表、维表并执行SQL: CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName', 'tableName' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookupCacheMaxRows' = '10000', 'lookupCacheExpireAfterAccess' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'print' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 在Kafka中写入数据: 1 2 3 {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 结果参考如下:
  • 参数说明 表1 数据库配置 参数 说明 默认值 connector flink框架区分Connector参数,固定为dws。 - url 数据库连接地址。 - username 配置连接用户。 - password 数据库用户密码。 - 表2 连接配置参数 参数 名称 类型 说明 默认值 connectionSize 读取线程池大小 int 用于执行操作的线程数量 = 数据库连接数量,同写入线程大小。 1 readBatchSize 最多一次将get请求合并提交的数量 int 当查询请求积压后,最大的批量查询数量。 128 readBatchQueueSize get请求缓冲池大小 int 查询请求最大积压容量。 256 readTimeoutMs get操作的超时时间(毫秒/ms) int 默认值0表示不超时,会在两处位置生效: get操作从用户开始执行到client准备提交到dws的等待时间。 get sql的执行超时,即statement query timeout。 0 readSyncThreadEnable 非异步查询时,是否开启线程池 boolean 开启后future.get()异步阻塞,关闭后主线程同步调用阻塞。 true lookupScanEnable 是否开启scan查询 boolean 关联条件在非全主键匹配下,是否开启scan查询。 若为false,则join关联条件必须全为主键,否则将抛异常。 false fetchSize / lookupScanFetchSize scan一次查询大小 int 非全主键匹配下,一次条件查询的返回数量限制(默认fetchSize生效,当fetchSize为0时,lookupScanFetchSize生效)。 1000 lookupScanTimeoutMs scan操作的超时时间(毫秒/ms) int 非全主键匹配下,一次条件查询的超时限制(ms)。 60000 lookupAsync 是否采用异步方式获取数据 boolean 查询方式设置为同步or异步。 true lookupCacheType 缓存策略 LookupCacheType 设置以下缓存策略(不区分大小写): None:无缓存LRU(默认值):缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。 ALL:全量数据缓存,适合不常更新小表。 LookupCacheType.LRU lookupCacheMaxRows 缓存大小 long 当选择LRU缓存策略后,可以设置缓存大小。 1000 lookupCacheExpireAfterAccess 读取后开始计算的超时时间 Duration 当选择LRU缓存策略后,可以设置每次读取后,超时时间顺延长,默认不生效。 null lookupCacheExpireAfterWrite 写入后开始计算的超时时间 Duration 当选择LRU缓存策略后,可以设置每次写入后,超时时间固定,不论访问与否。 10s lookupCacheMissingKey 数据不存在后写入缓存 boolean 当选择LRU缓存策略后,维表数据不存在,同时将数据缓存。 false lookupCacheReloadStrategy 全量缓存重载策略 ReloadStrategy 当选择ALL缓存策略后,可以设置以下数据重载策略: PERIODIC:周期性数据重载。 TIMED:定时数据重载,以天为单位。 ReloadStrategy.PERIODIC lookupCachePeriodicReloadInterval 数据重载时间间隔 Duration 当选择PERIOD重载策略时,可以设置全量缓存重载间隔。 1h lookupCachePeriodicReloadMode 数据重载模式 ScheduleMode 当选择PERIOD重载策略时,可以设置以下重载模式(不区分大小写): FIXED_DELAY:从上一个加载结束计算重新加载间隔。 FIXED_RATE:从上一个加载开始计算重新加载间隔。 ScheduleMode.FIXED_DELAY lookupCacheTimedReloadTime 数据重载定时调度时间 string 当选择TIMED重载策略时,可以设置全量缓存重载时间,以ISO-8601格式表示。例如:“10:15”。 00:00 lookupCacheTimedReloadIntervalDays 数据重载定时周期调度间隔天数 int 当选择TIMED重载策略时,可以设置全量缓存周期调度间隔天数。 1
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' );
  • Notebook自定义镜像故障基础排查 当制作的自定义镜像使用出现故障时,请用户按照如下方法排查: 用户自定义镜像没有ma-user用户及ma-group用户组; 用户自定义镜像中/home/ma-user目录,属主和用户组不是ma-user和ma-group; 用户自定义镜像必须满足用户目录/home/ma-user权限为750,不能为其他权限; 用户自定义镜像使用远程SSH功能,OpenSSH版本要兼容或高于8.0; 用户制作的自定义镜像,在本地执行docker run启动,无法正常运行; 用户自行安装了Jupyterlab服务导致冲突的,需要用户本地使用Jupyterlab命令罗列出相关的静态文件路径,删除并且卸载镜像中的Jupyterlab服务; 用户自己业务占用了开发环境官方的8888、8889端口的,需要用户修改自己的进程端口号; 用户的镜像指定了PYTHONPATH、sys.path导致服务启动调用冲突的,需在实例启动后,再指定PYTHONPATH、sys.path; 用户使用了已开启sudo权限的专属池,使用自定义镜像时,sudo工具未安装或安装错误; 用户使用的cann、cuda环境有兼容性问题; 用户的docker镜像配置错误、网络或防火墙限制、镜像构建问题(文件权限、依赖缺失或构建命令错误)等原因导致的。 父主题: Notebook中使用自定义镜像
  • 镜像一:tensorflow1.15-mindspore1.7.0-cann5.1.0-euler2.8-aarch64 表1 tensorflow1.15-mindspore1.7.0-cann5.1.0-euler2.8-aarch64镜像介绍 AI引擎框架 URL 包含的依赖项 Mindspore-Ascend 1.7.0 swr.{region_id}.myhuaweicloud.com/atelier/notebook2.0-mul-kernel-arm-ascend-cp37:5.0.1-c81-20220726 例如: 华北-北京四 swr.cn-north-4.myhuaweicloud.com/atelier/notebook2.0-mul-kernel-arm-ascend-cp37:5.0.1-c81-20220726 PyPI 程序包 Yum 软件包 mindspore-ascend 1.7.0 ipykernel 6.7.0 ipython 7.29.0 jupyter-client 7.0.6 ma-cau 1.1.7 ma-cau-adapter 1.1.3 ma-cli 1.2.3 matplotlib 3.1.2 modelarts 1.4.22 moxing-framework 2.0.0.rc2.4b57a67b numpy 1.17.5 pandas 1.1.3 pillow 7.0.0 pip 21.2.4 psutil 5.7.0 PyYAML 5.3.1 scipy 1.5.4 scikit-learn 0.24.1 tornado 6.1 mindinsight 1.7.0 cmake cpp curl ffmpeg g++ gcc git grep python3 rpm tar unzip wget zip
  • 镜像二:tensorflow1.15-cann5.1.0-py3.7-euler2.8.3 表2 tensorflow1.15-cann5.1.0-py3.7-euler2.8.3镜像介绍 AI引擎框架 是否使用昇腾 (CANN版本) URL 包含的依赖项 Tensorflow 1.15 是 (CANN 5.1) swr.{region-id}.{局点域名}/atelier/ tensorflow_1_15_ascend:tensorflow_1.15-cann_5.1.0-py_3.7-euler_2.8.3-aarch64-d910-20220906 PyPI 程序包 Yum 软件包 tensorflow 1.15.0 tensorboard 1.15.0 ipykernel 5.3.4 ipython 7.34.0 jupyter-client 7.3.4 ma-cau 1.1.2 ma-cau-adapter 1.1.2 ma-cli 1.1.3 matplotlib 3.5.1 modelarts 1.4.7 moxing-framework 2.0.1.rc0.ffd1c0c8 numpy 1.17.5 pandas 0.24.2 pillow 9.2.0 pip 22.1.2 psutil 5.7.0 PyYAML 5.3.1 scipy 1.3.3 scikit-learn 0.20.0 tornado 6.2 ca-certificates.noarch cmake cpp curl gcc-c++ gcc gdb grep nginx python3 rpm tar unzip vim wget zip
  • Step2 在Notebook中调试模型 打开一个新的Terminal终端,进入“/home/ma-user/infer/”目录,运行启动脚本run.sh,并预测模型。基础镜像中默认提供了run.sh作为启动脚本。启动命令如下: sh run.sh 图1 运行启动脚本 上传一张预测图片(手写数字图片)到Notbook中。 图2 手写数字图片 图3 上传预测图片 重新打开一个新的Terminal终端,执行如下命令进行预测。 curl -kv -F 'images=@/home/ma-user/work/test.png' -X POST http://127.0.0.1:8080/ 图4 预测 在调试过程中,如果有修改模型文件或者推理脚本文件,需要重启run.sh脚本。执行如下命令先停止nginx服务,再运行run.sh脚本。 #查询nginx进程 ps -ef |grep nginx #关闭所有nginx相关进程 kill -9 {进程ID} #运行run.sh脚本 sh run.sh 也可以执行pkill nginx命令直接关闭所有nginx进程。 #关闭所有nginx进程 pkill nginx #运行run.sh脚本 sh run.sh 图5 重启run.sh脚本 父主题: 无需构建直接在开发环境中调试并保存镜像用于推理
  • Step1 登录SWR 登录容器镜像服务控制台,选择区域。 单击右上角“创建组织”,输入组织名称完成组织创建。您可以自定义组织名称,本示例使用“deep-learning”,实际操作时请重新命名一个组织名称。后续所有命令中使用到组织名称deep-learning时,均需要替换为此处实际创建的组织名称。 单击右上角“登录指令”,获取登录访问指令。 以root用户登录ECS环境,输入登录指令。 图1 在ECS中执行登录指令
共100000条