华为云用户手册

  • HDFS文件操作API概述 Hadoop中关于文件操作类基本上全部是在“org.apache.hadoop.fs”包中,这些API能够支持的操作包含:打开文件,读写文件,删除文件等。Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个: static FileSystem get(Configuration conf); 该类封装了几乎所有的文件操作,例如mkdir,delete等。综上基本可以得出操作文件的程序库框架: operator() { 得到Configuration对象 得到FileSystem对象 进行文件操作 }
  • Doris建表规则 在创建Doris表指定分桶buckets时,每个桶的数据大小应保持在100MB~3GB之间,单分区中最大分桶数量不超过5000。 表数据超过5亿条以上必须设置分区分桶策略。 表的分桶列不要设置太多,一般情况下设置1或2个列即可,同时需要兼顾数据分布均匀和查询吞吐均衡。 数据均匀是为了避免某些桶的数据存在倾斜影响数据均衡和查询效率。 查询吞吐利用查询SQL的分桶剪裁优化避免了全桶扫描,以提升查询性能。 分桶列的选取:优先考虑数据较为均匀且常用于查询条件的列作为分桶列。 可使用以下方法分析是否会导致数据倾斜: SELECT a, b, COUNT(*) FROM tab GROUP BY a,b; 命令执行后查看各个分组的数据条数是否相差不大,如果相差超过2/3或1/2,则需要重新选择分桶字段。 2千万以内数据禁止使用动态分区。动态分区会自动创建分区,而小表用户关注不到,会创建出大量不使用的分区分桶。 创建表时,排序键key不能太多,一般建议3~5个;太多key会导致数据写入较慢,影响数据导入性能。 不使用Auto Bucket,需按照已有的数据量来进行分区分桶,能更好的提升导入及查询性能。Auto Bucket会造成Tablet数量过多,最终导致有大量的小文件。 创建表时的副本数必须至少为2,默认是3,禁止使用单副本。
  • Doris建表建议 单表物化视图不能超过6个,物化视图不建议嵌套,不建议数据写入时通过物化视图进行重型聚合和Join计算等ETL任务。 对于有大量历史分区数据,但是历史数据比较少,或者数据不均衡,或者数据查询概率较小的情况,可以创建历史分区(比如年分区,月分区),将所有历史数据放到对应分区里。 创建历史分区方式为:FROM ("2000-01-01") TO ("2022-01-01") INTERVAL 1 YEAR 1千万~2亿以内数据为了方便可以不设置分区(Doris内部有一个默认分区),直接用分桶策略即可。 如果分桶字段存在30%以上的数据倾斜,则禁止使用Hash分桶策略,改为使用Random分桶策略,相关命令为: Create table ... DISTRIBUTED BY RANDOM BUCKETS 10 ... 建表时第一个字段一定是最常查询使用的列,默认有前缀索引快速查询能力,选取最常查询且高基数的列作为前缀索引,默认将一行数据的前36个字节作为这行数据的前缀索引(varchar类型的列只能匹配20个字节,并且会匹配不足36个字节截断前缀索引) 。 超过亿级别的数据,如果有模糊匹配或者等值/in条件,可以使用倒排索引(Doris 2.x版本开始支持)或者Bloomfilter。如果是低基数列的正交查询适合使用bitmap索引(bitmap索引的基数在10000~100000之间效果较好)。 建表时需要提前规划将来要使用的字段个数,可以多预留几十个字段,类型包括整型、字符型等。避免将来字段不够使用,需要较高代价临时去添加字段。
  • 不要调用Admin的closeRegion方法关闭一个Region Admin中,提供了关闭一个Region的接口: public void closeRegion(final String regionname, final String serverName) 通过该方法关闭一个Region,HBase Client端会直接发RPC请求到Region所在的RegionServer上,整个流程对Master而言,是不感知的。也就是说,尽管RegionServer关闭了这个Region,但是,在Master侧,还以为该Region是在该RegionServer上面打开的。假如,在执行Balance的时候,Master计算出恰好要转移这个Region,那么,这个Region将无法被关闭,本次转移操作将无法完成(关于这个问题,在当前的HBase版本中的处理的确还欠缺妥当)。 因此,暂时不建议使用该方法关闭一个Region。
  • 不要关闭WAL WAL是Write-Ahead-Log的简称,是指数据在入库之前,首先会写入到日志文件中,借此来确保数据的安全性。 WAL功能默认是开启的,但是,在Put类中提供了关闭WAL功能的接口: public void setWriteToWAL(boolean write) 因此,不建议调用该方法将WAL关闭(即将writeToWAL设置为False),因为可能会造成最近1S(该值由RegionServer端的配置参数“hbase.regionserver.optionallogflushinterval”决定,默认为1S)内的数据丢失。但如果在实际应用中,对写入的速率要求很高,并且可以容忍丢失最近1S内的数据的话,可以将该功能关闭。
  • 业务表设计建议 预分Region,使Region分布均匀,提高并发 避免过多的热点Region。根据应用场景,可考虑将时间因素引入Rowkey。 同时访问的数据尽量连续存储。同时读取的数据相邻存储;同时读取的数据存放在同一行;同时读取的数据存放在同一cell。 查询频繁属性放在Rowkey前面部分。Rowkey的设计在排序上必须与主要的查询条件契合。 离散度较好的属性作为RowKey组成部分。分析数据离散度特点以及查询场景,综合各种场景进行设计。 存储冗余信息,提高检索性能。使用二级索引,适应更多查询场景。 利用过期时间、版本个数设置等操作,让表能自动清除过期数据。 在HBase中,一直在繁忙写数据的Region被称为热点Region。
  • Scan时指定StartKey和EndKey 一个有确切范围的Scan,在性能上会带来较大的好处。 代码示例: Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("familyname"),Bytes.toBytes("columnname")); scan.setStartRow( Bytes.toBytes("rowA")); // 假设起始Key为rowA scan.setStopRow( Bytes.toBytes("rowB")); // 假设EndKey为rowB for(Result result : demoTable.getScanner(scan)) { // process Result instance }
  • 创建一张表或Scan时设定blockcache为true HBase客户端建表和scan时,设置blockcache=true。需要根据具体的应用需求来设定它的值,这取决于有些数据是否会被反复的查询到,如果存在较多的重复记录,将这个值设置为true可以提升效率,否则,建议关闭。 建议按默认配置,默认就是true,只要不强制设置成false就可以,例如: HColumnDescriptor fieldADesc = new HColumnDescriptor("value".getBytes()); fieldADesc.setBlockCacheEnabled(false);
  • HDFS的读写文件注意点 HDFS不支持随机读和写。 HDFS追加文件内容只能在文件末尾添加,不能随机添加。 只有存储在HDFS文件系统中的数据才支持append,edit.log以及数据元文件不支持Append。Append追加文件时,需要将“hdfs-site.xml”中的“dfs.support.append”参数值设置为true。 “dfs.support.append”参数在开源社区版本中默认值是关闭,在FusionInsight版本默认值是开启。 该参数为服务器端参数。建议开启,开启后才能使用Append功能。 不适用HDFS场景可以考虑使用其他方式来存储数据,如HBase。
  • 调用Kafka API(AdminZkClient.createTopic)创建Topic 对于Java开发语言,正确示例: import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import kafka.admin.RackAwareMode; … KafkaZkClient kafkaZkClient = KafkaZkClient.apply(zkUrl, JaasUtils.isZkSecurityEnabled(), zkSessionTimeoutMs, zkConnectionTimeoutMs, Int.MaxValue(), Time.SYSTEM, "", "", null); AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); adminZkClient.createTopic(topic, partitions, replicas, new Properties(), RackAwareMode.Enforced$.MODULE$); … 对于Scala开发语言,正确示例: import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; … val kafkaZkClient: KafkaZkClient = KafkaZkClient.apply(zkUrl, JaasUtils.isZkSecurityEnabled(), zkSessionTimeoutMs, zkConnectionTimeoutMs, Int.MaxValue, Time.SYSTEM, "", "") val adminZkClient: AdminZkClient = new AdminZkClient(kafkaZkClient) adminZkClient.createTopic(topic, partitions, replicas)
  • Doris数据查询规则 在数据查询业务代码中建议查询失败时进行重试,再次下发查询。 in中常量枚举值超过1000后,必须修改为子查询。 禁止使用REST API(Statement Execution Action)执行大量SQL查询,该接口仅用于集群维护。 query查询条件返回结果超过5万条,则使用JDBC Catalog或者OUTFILE方式导出查询数据,否则FE上大量数据传输将占用FE资源,影响集群稳定性。 如果是交互式查询,建议使用分页方式(offset limit)导出数据,分页命令为Order by。 如果数据导出提供给第三方使用,建议使用outfile或者export方式 2个以上大于3亿的表JOIN使用Colocation Join。 亿级别大表禁止使用select *查询数据,查询时需明确要查询的字段。 使用SQL Block方式禁止select *操作。 如果是高并发点查询,建议开启行存储(Doris 2.x版本支持),并且使用PreparedStatement查询。 亿级以上表数据查询必须设置分区分桶条件。 禁止对分区表执行全分区数据扫描操作。
  • Doris数据查询建议 一次insert into select数据超过1亿条后,建议拆分为多个insert into select语句执行,分成多个批次来执行。 不要使用OR作为JOIN条件。 不建议频繁的数据delete修改,将要删除的数据攒批,偶尔进行批量删除,且需要带上条件,提升系统稳定性和删除效率。 大量数据排序(5亿以上)后返回部分数据,建议先减少数据范围再执行排序,否则大量排序会影响性能。例如: 将from table order by datatime desc limit 10优化为from table where datatime='2023-10-20' order by datatime desc limit 10。 查询任务性能调优参数parallel_fragment_exec_instance_num使用注意事项: 此参数是session级别设置,表示可并发执行的fragment数量,对CPU消耗较大,因此一般情况下不需要设置此参数。如果需要设置此参数来加速查询性能,必须遵循以下规则: 切勿设置该参数为全局生效,禁止使用set global方式进行设置。 设置参数值建议为偶数2或4(最大值不要超过单节点CPU核数的一半)。 设置此参数值时需要观察CPU使用率,CPU使用率小于50%时方可考虑设置。 如果查询SQL是insert into select大数据量的方式,不建议设置此参数。
  • Doris数据导入建议 禁止高频执行update、delete或truncate操作,推荐几分钟执行一次,使用delete必须设置分区或主键列条件。 禁止使用INSERT INTO tbl1 VALUES (“1”), (“a”);方式导入数据,少量少次写可以,多量多频次时需使用Doris提供的StreamLoad、BrokerLoad、SparkLoad或者Flink Connector方式。 在Flink实时写入数据到Doris的场景下,CheckPoint设置的时间需要考虑每批次数据量,如果每批次数据太小会造成大量小文件,推荐值为60s。 建议不使用insert values作为数据写入的主要方式,批量数据导入推荐使用StreamLoad、BrokerLoad或SparkLoad。 使用INSERT INTO WITH LABEL XXX SELECT方式进行数据导入,如果有下游依赖或查询,需要先查看导入的数据是否为可见状态。 具体查看方法:通过show load where label='xxx' SQL命令查询当前INSERT任务状态(status)是否为“VISIBLE”,如果为“VISIBLE”导入的数据才可见。 Streamload数据导入适合10 GB以内的数据量、Brokerload适合百GB以内数据,数据过大时可考虑使用SparkLoad。 禁止使用Doris的Routine Load进行导入数据操作,推荐使用Flink查询Kafka数据再写入Doris,更容易控制导入数据单批次数据量,避免大量小文件产生。如果确实已经使用了Routine Load进行导数,在没整改前请配置FE“max_tolerable_backend_down_num”参数值为“1”,以提升导入数据可靠性。 建议低频攒批导入数据,平均单表导入批次间隔需大于30s,推荐间隔60s,一次导入1000~100000行数据。
  • 避免写入单条记录超大的数据 单条记录超大的数据在影响处理效率的同时还可能写入失败,此时需要在初始化Kafka生产者实例时根据情况调整“max.request.size ”值,在初始化消费者实例时调整“max.partition.fetch.bytes”值。 例如,参考本例,可以将max.request.size 、max.partition.fetch.bytes配置项设置为“5252880”: // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT")); // 服务名 props.put(saslKerberosServiceName, "kafka"); props.put("max.request.size", "5252880"); // 安全协议类型 props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT")); // 服务名 props.put(saslKerberosServiceName, "kafka"); props.put("max.partition.fetch.bytes","5252880");
  • 合理设置分区键,控制分区数在一千以内,分区字段使用整型 建议使用toYYYYMMDD(表字段pt_d)作为分区键,表字段pt_d是date类型。 如果业务场景需要做小时分区,使用toYYYYMMDD(表字段pt_d)、toYYYYMMDD(表字段pt_h)做联合分区键,其中toYYYYMMDD(表字段pt_h)是整型小时数。 如果保存多年数据,建议考虑使用月做分区,例如toYYYYMM(表字段pt_d)。 综合考虑数据分区粒度、每个批次提交的数据量、数据的保存周期等因素,合理控制part数量。
  • 设置合理的part大小 min_bytes_to_rebalance_partition_over_jbod参数表示参与在JBOD卷中磁盘之间自动平衡分发part的最小size,该值不能设置得太小或者太大。 若该值设置得太小,小于max_bytes_to_merge_at_max_space_in_pool/1024,那么clickhouse server进程将会启动失败,另外还会引发不必要的part在磁盘间移动。 若该值设置得过大,则很难有part达到这个条件,比如:min_bytes_to_rebalance_partition_over_jbod大于max_data_part_size_bytes(卷中的磁盘可以存储的part的最大大小),则没有part能达到自动平衡的条件。
  • 本地表建表参考 本地表创建参考: CREATE TABLE mybase_local.mytable ( `did` Int32, `app_id` Int32, `region` Int32, `pt_d` Date ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/mybase_local/mytable', '{replica}') PARTITION BY toYYYYMMDD(pt_d) ORDER BY (app_id, region) SETTINGS index_granularity = 8192, use_minimalistic_part_header_in_zookeeper = 1; 使用说明: 表引擎选择: ReplicatedMergeTree:支持副本特性的MergeTree引擎,也是最常用的引擎。 ZooKeeper上的表信息注册路径,用于区分集群中的不同配置: /clickhouse/tables/{shard}/{databaseName}/{tableName}:{shard}是分片名称,{databaseName}是数据库名称,{tableName}是复制表名称。 order by 主键字段: 查询时最常使用且过滤性最高的字段作为主键。依次按照访问频度从高到低、维度基数从小到大来排。排序字段不宜太多,建议不超过4个,否则merge的压力会较大。排序字段不允许为null,如果存在null值,需要进行数据转换。 partition by 分区字段 分区键不允许为null,如果字段中有null值,需要进行数据转换。 表级别的参数配置: index_granularity:稀疏索引粒度配置,默认是8192。 use_minimalistic_part_header_in_zookeeper:ZooKeeper中数据存储是否启动新版本的优化存储方式。 建表定义可以参考官网链接:https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/mergetree/。
  • 基于大宽表进行数据分析,不建议使用大表join大表的操作,对分布式join查询转化成本地表的join查询操作,提升性能 ClickHouse分布式join的性能较差,建议在模型侧将数据聚合成大宽表再导入ClickHouse。分布式join的查询转成本地表的join查询,不仅省去大量的节点间数据传播,同时本地表参与计算的数据量也会少很多。业务层再基于所有分片本地join的结果进行数据汇总,性能会有数量级的提升。
  • 分布式表建表参考 本地表创建参考: CREATE TABLE mybase.mytable AS mybase_local.mytable ENGINE = Distributed(cluster_3shards_2replicas, mybase_local, mytable, rand()); 使用说明: 分布式表名称:mybase.mytable。 本地表名称:mybase_local.mytable。 通过“AS”关联分布式表和本地表,保证分布式表的字段定义跟本地表一致。 分布式表引擎的参数说明: cluster_3shards_2replicas:逻辑集群名称。 mybase_local:本地表所在库名。 mytable:本地表名。 rand():可选参数,分片键(sharding key),可以是表中一列的原始数据(如did),也可以是函数调用的结果,如随机值rand()。注意该键要尽量保证数据均匀分布,另外一个常用的操作是采用区分度较高的列的哈希值,如intHash64(user_id)。
  • Doris UDF开发规则 UDF中方法调用必须是线程安全的。 UDF实现中禁止读取外部大文件到内存中,如果文件过大可能会导致内存耗尽。 需避免大量递归调用,否则容易造成栈溢出或oom。 需避免不断创建对象或数组,否则容易造成内存耗尽。 Java UDF应该捕获和处理可能发生的异常,不能将异常给服务处理,以避免程序出现未知异常。可以使用try-catch块来处理异常,并在必要时记录异常信息。 UDF中应避免定义静态集合类用于临时数据的存储,或查询外部数据存在较大对象,否则会导致内存占用过高。 应该避免类中import的包和服务侧包冲突,可通过grep -lr "完全限定类名"命令来检查冲突的Jar包。如果发生类名冲突,可通过完全限定类名方式来避免。
  • Doris UDF开发建议 不要执行大量数据的复制操作,防止堆栈内存溢出。 应避免使用大量字符串拼接操作,否则会导致内存占用过高。 Java UDF应该使用有意义的名称,以便其他开发人员能够轻松理解其用途。建议使用驼峰式命名法,并以UDF结尾,例如:MyFunctionUDF。 Java UDF应该指定返回值的数据类型,并且必须具有返回值,返回值默认或异常时不要设置为NULL。建议使用基本数据类型或Java类作为返回值类型。
  • HQL编写之隐式类型转换 查询语句使用字段的值做过滤时,不建议通过Hive自身的隐式类型转换来编写HQL。因为隐式类型转换不利于代码的阅读和移植。 建议示例: select * from default.tbl_src where id = 10001; select * from default.tbl_src where name = 'TestName'; 不建议示例: select * from default.tbl_src where id = '10001'; select * from default.tbl_src where name = TestName; 表tbl_src的id字段为Int类型,name字段为String类型。
  • UDF管理 建议由管理员创建永久UDF,避免每次使用时都去add jar,和重新定义UDF。 Hive的UDF会有一些默认属性,比如“deterministic”默认为“true”(同一个输入会返回同一个结果),“stateful”(是否有状态,默认为“true”)。当用户实现的自定义UDF内部实现了汇总等,需要在类上加上相应的注解,例如如下类: @UDFType(deterministic = false) Public class MyGenericUDAFEvaluator implements Closeable {
  • 资源释放 关于ResultScanner和Table实例,在用完之后,需要调用它们的Close方法,将资源释放掉。Close方法,要放在finally块中,来确保一定会被调用到。 正确示例: ResultScanner scanner = null; try { scanner = demoTable.getScanner(s); //Do Something here. } finally { scanner.close(); } 错误示例: 在代码中未调用scanner.close()方法释放相关资源。 scanner.close()方法未放置在finally块中。 ResultScanner scanner = null; scanner = demoTable.getScanner(s); //Do Something here. scanner.close();
  • Table实例的创建 public abstract class TableOperationImpl { private static Configuration conf = null; private static Connection connection = null; private static Table table = null; private static TableName tableName = TableName.valueOf("sample_table"); public TableOperationImpl() { init(); } public void init() { conf = ConfigurationSample.getConfiguration(); try { connection = ConnectionFactory.createConnection(conf); table = conn.getTable(tableName); } catch (IOException e) { e.printStackTrace(); } } public void close() { if (table != null) { try { table.close(); } catch (IOException e) { System.out.println("Can not close table."); } finally { table = null; } } if (connection != null) { try { connection.close(); } catch (IOException e) { System.out.println("Can not close connection."); } finally { connection = null; } } } public void operate() { init(); process(); close(); } }
  • Configuration实例的创建 该类应该通过调用HBaseConfiguration的create()方法来实例化。否则,将无法正确加载HBase中的相关配置项。 正确示例: //该部分,应该是在类成员变量的声明区域声明 private Configuration hbaseConfig = null; //建议在类的构造函数中,或者初始化方法中实例化该类 hbaseConfig = HBaseConfiguration.create(); 错误示例: hbaseConfig = new Configuration();
  • 共享Configuration实例 HBase客户端代码通过创建一个与ZooKeeper之间的HConnection,来获取与一个HBase集群进行交互的权限。一个ZooKeeper的HConnection连接,对应着一个Configuration实例,已经创建的HConnection实例,会被缓存起来。也就是说,如果客户端需要与HBase集群进行交互的时候,会传递一个Configuration实例到缓存中去,HBase Client部分通过已缓存的HConnection实例,来判断属于这个Configuration实例的HConnection实例是否存在,如果不存在,会创建一个新的HConnection,如果存在,则会直接返回相应的实例。 因此,如果频频的创建Configuration实例,会导致创建很多不必要的HConnection实例,很容易达到ZooKeeper的连接数上限。 建议在整个客户端代码范围内,都共用同一个Configuration对象实例。
  • Spark应用中,需引入Spark的类 对于Java开发语言,正确示例: // 创建SparkContext时所需引入的类。 import org.apache.spark.api.java.JavaSparkContext // RDD操作时引入的类。 import org.apache.spark.api.java.JavaRDD // 创建SparkConf时引入的类。 import org.apache.spark.SparkConf 对于Scala开发语言,正确示例: // 创建SparkContext时所需引入的类。 import org.apache.spark.SparkContext // RDD操作时引入的类。 import org.apache.spark.SparkContext._ // 创建SparkConf时引入的类。 import org.apache.spark.SparkConf
  • 应用程序结束之前必须调用SparkContext.stop 利用spark做二次开发时,当应用程序结束之前必须调用SparkContext.stop()。 利用Java语言开发时,应用程序结束之前必须调用JavaSparkContext.stop()。 利用Scala语言开发时,应用程序结束之前必须调用SparkContext.stop()。 以Scala语言开发应用程序为例,分别介绍下正确示例与错误示例。 正确示例: //提交spark作业 val sc = new SparkContext(conf) //具体的任务 ... //应用程序结束 sc.stop() 错误示例: //提交spark作业 val sc = new SparkContext(conf) //具体的任务 ... 如果不添加SparkContext.stop,YARN界面会显示失败。如图1,同样的任务,前一个程序是没有添加SparkContext.stop,后一个程序添加了SparkContext.stop()。 图1 添加SparkContext.stop()和不添加的区别
  • 亲和性概念阐述 在应用没有容器化之前,原先一个虚拟机上会装多个组件,进程间会有通信。 但在做容器化拆分的时候,通常直接按进程拆分容器。比如业务进程一个容器,监控日志处理或者本地数据放在另一个容器,并且有独立的生命周期。这时如果进程分布在网络中两个较远的点,请求经过多次转发,性能会很差。 亲和性可以实现就近部署,增强网络能力实现通信上的就近路由,减少网络的损耗。 反亲和性主要是出于高可靠性考虑,尽量分散实例,某个节点故障的时候,对应用的影响只是N分之一或者只是一个实例。 应用与可用区的亲和性 亲和:决定应用组件部署在特定的可用区中。 反亲和:决定应用组件不能部署在特定的可用区中。 应用与节点间的亲和性 亲和:决定应用组件部署在某些特定的主机中。 反亲和:决定应用组件不能部署在某些特定的主机中。 应用间的亲和性 决定应用组件部署在相同或不同节点中。 亲和:用户可根据业务需求进行应用组件的就近部署,应用组件间通信就近路由,减少网络消耗。如图1所示,APP1、APP2、APP3和APP4部署在相同节点上,为亲和性部署。 图1 应用间亲和 反亲和:同个应用组件的多个实例反亲和部署,减少宕机影响;互相干扰的应用反亲和部署,避免干扰。 如图2所示,APP1、APP2、APP3和APP4分别部署在不同节点上,这四个应用为反亲和性部署。 图2 应用间反亲和
共100000条