华为云用户手册

  • HDFS需要开启DataNode数据存储路径 DataNode默认存储路径配置为:${BIGDATA_DATA_HOME}/hadoop/dataN/dn/datadir(N≥1),N为数据存放的目录个数。 例如:${BIGDATA_DATA_HOME}/hadoop/data1/dn/datadir、${BIGDATA_DATA_HOME}/hadoop/data2/dn/datadir 设置后,数据会存储到节点上每个挂载磁盘的对应目录下面。
  • HDFS创建文件 通过"FileSystem.mkdirs(Path f)"可在HDFS上创建文件夹,其中f为文件夹的完整路径。 正确示例: public class CreateDir { public static void main(String[] args) throws Exception{ Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); Path dfs=new Path("/TestDir"); hdfs.mkdirs(dfs); } }
  • 多线程安全登录方式 如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。 login的代码样例: private Boolean login(Configuration conf){ boolean flag = false; UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB)); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } relogin的代码样例: public Boolean relogin(){ boolean flag = false; try { UserGroupInformation.getLoginUser().reloginFromKeytab(); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } 多次重复登录会导致后建立的会话对象覆盖掉之前登录建立的,将会导致之前建立的会话无法被维护监控,最终导致会话超期后部分功能不可用。
  • HDFS初始化方法 HDFS初始化是指在使用HDFS提供的API之前,需要做的必要工作。 大致过程为:加载HDFS服务配置文件,并进行Kerberos安全认证,认证通过后再实例化Filesystem,之后使用HDFS的API。此处Kerberos安全认证需要使用到的keytab文件,请提前准备。 正确示例: private void init() throws IOException { Configuration conf = new Configuration(); // 读取配置文件 conf.addResource("user-hdfs.xml"); // 安全模式下,先进行安全认证 if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { String PRINCIPAL = "username.client.kerberos.principal"; String KEYTAB = "username.client.keytab.file"; // 设置keytab密钥文件 conf.set(KEYTAB, System.getProperty("user.dir") + File.separator + "conf" + File.separator + conf.get(KEYTAB)); // 设置kerberos配置文件路径 */ String krbfilepath = System.getProperty("user.dir") + File.separator + "conf" + File.separator + "krb5.conf"; System.setProperty("java.security.krb5.conf", krbfilepath); // 进行登录认证 */ SecurityUtil.login(conf, KEYTAB, PRINCIPAL); } // 实例化文件系统对象 fSystem = FileSystem.get(conf); }
  • HDFS上传本地文件 通过FileSystem.copyFromLocalFile(Path src,Patch dst)可将本地文件上传到HDFS的指定位置上,其中src和dst均为文件的完整路径。 正确示例: public class CopyFile { public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); //本地文件 Path src =new Path("D:\\HebutWinOS"); //HDFS为止 Path dst =new Path("/"); hdfs.copyFromLocalFile(src, dst); System.out.println("Upload to"+conf.get("fs.default.name")); FileStatus files[]=hdfs.listStatus(dst); for(FileStatus file:files){ System.out.println(file.getPath()); } } }
  • 查看HDFS文件的最后修改时间 通过FileSystem.getModificationTime()可查看指定HDFS文件的修改时间。 正确示例: public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); Path fpath =new Path("/user/hadoop/test/file1.txt"); FileStatus fileStatus=hdfs.getFileStatus(fpath); long modiTime=fileStatus.getModificationTime(); System.out.println("file1.txt的修改时间是"+modiTime); }
  • MapReduce中间文件存放路径 MapReduce默认中间文件夹存放路径只有一个,${hadoop.tmp.dir}/mapred/local,建议修改为每个磁盘下均可存放中间文件。 例如:/hadoop/hdfs/data1/mapred/local、/hadoop/hdfs/data2/mapred/local、/hadoop/hdfs/data3/mapred/local等,不存在的目录会自动忽略。
  • HDFS提高读取写入性能方式 写入数据流程:HDFS Client收到业务数据后,从NameNode获取到数据块编号、位置信息后,联系DataNode,并将需要写入数据的DataNode建立起流水线,完成后,客户端再通过自有协议写入数据到Datanode1,再有DataNode1复制到DataNode2、DataNode3(三备份)。写完的数据,将返回确认信息给HDFS Client。 合理设置块大小,如设置dfs.blocksize为 268435456(即256MB)。 对于一些不可能重用的大数据,缓存在操作系统的缓存区是无用的。可将以下两参数设置为false: dfs.datanode.drop.cache.behind.reads和dfs.datanode.drop.cache.behind.writes
  • HDFS文件操作API概述 Hadoop中关于文件操作类基本上全部是在“org.apache.hadoop.fs”包中,这些API能够支持的操作包含:打开文件,读写文件,删除文件等。Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个: static FileSystem get(Configuration conf); 该类封装了几乎所有的文件操作,例如mkdir,delete等。综上基本可以得出操作文件的程序库框架: operator() { 得到Configuration对象 得到FileSystem对象 进行文件操作 }
  • 不要调用Admin的closeRegion方法关闭一个Region Admin中,提供了关闭一个Region的接口: public void closeRegion(final String regionname, final String serverName) 通过该方法关闭一个Region,HBase Client端会直接发RPC请求到Region所在的RegionServer上,整个流程对Master而言,是不感知的。也就是说,尽管RegionServer关闭了这个Region,但是,在Master侧,还以为该Region是在该RegionServer上面打开的。假如,在执行Balance的时候,Master计算出恰好要转移这个Region,那么,这个Region将无法被关闭,本次转移操作将无法完成(关于这个问题,在当前的HBase版本中的处理的确还欠缺妥当)。 因此,暂时不建议使用该方法关闭一个Region。
  • 不要关闭WAL WAL是Write-Ahead-Log的简称,是指数据在入库之前,首先会写入到日志文件中,借此来确保数据的安全性。 WAL功能默认是开启的,但是,在Put类中提供了关闭WAL功能的接口: public void setWriteToWAL(boolean write) 因此,不建议调用该方法将WAL关闭(即将writeToWAL设置为False),因为可能会造成最近1S(该值由RegionServer端的配置参数“hbase.regionserver.optionallogflushinterval”决定,默认为1S)内的数据丢失。但如果在实际应用中,对写入的速率要求很高,并且可以容忍丢失最近1S内的数据的话,可以将该功能关闭。
  • 业务表设计建议 预分Region,使Region分布均匀,提高并发 避免过多的热点Region。根据应用场景,可考虑将时间因素引入Rowkey。 同时访问的数据尽量连续存储。同时读取的数据相邻存储;同时读取的数据存放在同一行;同时读取的数据存放在同一cell。 查询频繁属性放在Rowkey前面部分。Rowkey的设计在排序上必须与主要的查询条件契合。 离散度较好的属性作为RowKey组成部分。分析数据离散度特点以及查询场景,综合各种场景进行设计。 存储冗余信息,提高检索性能。使用二级索引,适应更多查询场景。 利用过期时间、版本个数设置等操作,让表能自动清除过期数据。 在HBase中,一直在繁忙写数据的Region被称为热点Region。
  • Scan时指定StartKey和EndKey 一个有确切范围的Scan,在性能上会带来较大的好处。 代码示例: Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("familyname"),Bytes.toBytes("columnname")); scan.setStartRow( Bytes.toBytes("rowA")); // 假设起始Key为rowA scan.setStopRow( Bytes.toBytes("rowB")); // 假设EndKey为rowB for(Result result : demoTable.getScanner(scan)) { // process Result instance }
  • 创建一张表或Scan时设定blockcache为true HBase客户端建表和scan时,设置blockcache=true。需要根据具体的应用需求来设定它的值,这取决于有些数据是否会被反复的查询到,如果存在较多的重复记录,将这个值设置为true可以提升效率,否则,建议关闭。 建议按默认配置,默认就是true,只要不强制设置成false就可以,例如: HColumnDescriptor fieldADesc = new HColumnDescriptor("value".getBytes()); fieldADesc.setBlockCacheEnabled(false);
  • HDFS的读写文件注意点 HDFS不支持随机读和写。 HDFS追加文件内容只能在文件末尾添加,不能随机添加。 只有存储在HDFS文件系统中的数据才支持append,edit.log以及数据元文件不支持Append。Append追加文件时,需要将“hdfs-site.xml”中的“dfs.support.append”参数值设置为true。 “dfs.support.append”参数在开源社区版本中默认值是关闭,在FusionInsight版本默认值是开启。 该参数为服务器端参数。建议开启,开启后才能使用Append功能。 不适用HDFS场景可以考虑使用其他方式来存储数据,如HBase。
  • 调用Kafka API(AdminZkClient.createTopic)创建Topic 对于Java开发语言,正确示例: import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import kafka.admin.RackAwareMode; … KafkaZkClient kafkaZkClient = KafkaZkClient.apply(zkUrl, JaasUtils.isZkSecurityEnabled(), zkSessionTimeoutMs, zkConnectionTimeoutMs, Int.MaxValue(), Time.SYSTEM, "", "", null); AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); adminZkClient.createTopic(topic, partitions, replicas, new Properties(), RackAwareMode.Enforced$.MODULE$); … 对于Scala开发语言,正确示例: import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; … val kafkaZkClient: KafkaZkClient = KafkaZkClient.apply(zkUrl, JaasUtils.isZkSecurityEnabled(), zkSessionTimeoutMs, zkConnectionTimeoutMs, Int.MaxValue, Time.SYSTEM, "", "") val adminZkClient: AdminZkClient = new AdminZkClient(kafkaZkClient) adminZkClient.createTopic(topic, partitions, replicas)
  • Doris数据查询规则 在数据查询业务代码中建议查询失败时进行重试,再次下发查询。 in中常量枚举值超过1000后,必须修改为子查询。 禁止使用REST API(Statement Execution Action)执行大量SQL查询,该接口仅用于集群维护。 query查询条件返回结果超过5万条,则使用JDBC Catalog或者OUTFILE方式导出查询数据,否则FE上大量数据传输将占用FE资源,影响集群稳定性。 如果是交互式查询,建议使用分页方式(offset limit)导出数据,分页命令为Order by。 如果数据导出提供给第三方使用,建议使用outfile或者export方式 2个以上大于3亿的表JOIN使用Colocation Join。 亿级别大表禁止使用select *查询数据,查询时需明确要查询的字段。 使用SQL Block方式禁止select *操作。 如果是高并发点查询,建议开启行存储(Doris 2.x版本支持),并且使用PreparedStatement查询。 亿级以上表数据查询必须设置分区分桶条件。 禁止对分区表执行全分区数据扫描操作。
  • Doris数据查询建议 一次insert into select数据超过1亿条后,建议拆分为多个insert into select语句执行,分成多个批次来执行。 不要使用OR作为JOIN条件。 不建议频繁的数据delete修改,将要删除的数据攒批,偶尔进行批量删除,且需要带上条件,提升系统稳定性和删除效率。 大量数据排序(5亿以上)后返回部分数据,建议先减少数据范围再执行排序,否则大量排序会影响性能。例如: 将from table order by datatime desc limit 10优化为from table where datatime='2023-10-20' order by datatime desc limit 10。 查询任务性能调优参数parallel_fragment_exec_instance_num使用注意事项: 此参数是session级别设置,表示可并发执行的fragment数量,对CPU消耗较大,因此一般情况下不需要设置此参数。如果需要设置此参数来加速查询性能,必须遵循以下规则: 切勿设置该参数为全局生效,禁止使用set global方式进行设置。 设置参数值建议为偶数2或4(最大值不要超过单节点CPU核数的一半)。 设置此参数值时需要观察CPU使用率,CPU使用率小于50%时方可考虑设置。 如果查询SQL是insert into select大数据量的方式,不建议设置此参数。
  • Doris数据导入建议 禁止高频执行update、delete或truncate操作,推荐几分钟执行一次,使用delete必须设置分区或主键列条件。 禁止使用INSERT INTO tbl1 VALUES (“1”), (“a”);方式导入数据,少量少次写可以,多量多频次时需使用Doris提供的StreamLoad、BrokerLoad、SparkLoad或者Flink Connector方式。 在Flink实时写入数据到Doris的场景下,CheckPoint设置的时间需要考虑每批次数据量,如果每批次数据太小会造成大量小文件,推荐值为60s。 建议不使用insert values作为数据写入的主要方式,批量数据导入推荐使用StreamLoad、BrokerLoad或SparkLoad。 使用INSERT INTO WITH LABEL XXX SELECT方式进行数据导入,如果有下游依赖或查询,需要先查看导入的数据是否为可见状态。 具体查看方法:通过show load where label='xxx' SQL命令查询当前INSERT任务状态(status)是否为“VISIBLE”,如果为“VISIBLE”导入的数据才可见。 Streamload数据导入适合10 GB以内的数据量、Brokerload适合百GB以内数据,数据过大时可考虑使用SparkLoad。 禁止使用Doris的Routine Load进行导入数据操作,推荐使用Flink查询Kafka数据再写入Doris,更容易控制导入数据单批次数据量,避免大量小文件产生。如果确实已经使用了Routine Load进行导数,在没整改前请配置FE“max_tolerable_backend_down_num”参数值为“1”,以提升导入数据可靠性。 建议低频攒批导入数据,平均单表导入批次间隔需大于30s,推荐间隔60s,一次导入1000~100000行数据。
  • 避免写入单条记录超大的数据 单条记录超大的数据在影响处理效率的同时还可能写入失败,此时需要在初始化Kafka生产者实例时根据情况调整“max.request.size ”值,在初始化消费者实例时调整“max.partition.fetch.bytes”值。 例如,参考本例,可以将max.request.size 、max.partition.fetch.bytes配置项设置为“5252880”: // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT")); // 服务名 props.put(saslKerberosServiceName, "kafka"); props.put("max.request.size", "5252880"); // 安全协议类型 props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT")); // 服务名 props.put(saslKerberosServiceName, "kafka"); props.put("max.partition.fetch.bytes","5252880");
  • 合理设置分区键,控制分区数在一千以内,分区字段使用整型 建议使用toYYYYMMDD(表字段pt_d)作为分区键,表字段pt_d是date类型。 如果业务场景需要做小时分区,使用toYYYYMMDD(表字段pt_d)、toYYYYMMDD(表字段pt_h)做联合分区键,其中toYYYYMMDD(表字段pt_h)是整型小时数。 如果保存多年数据,建议考虑使用月做分区,例如toYYYYMM(表字段pt_d)。 综合考虑数据分区粒度、每个批次提交的数据量、数据的保存周期等因素,合理控制part数量。
  • 设置合理的part大小 min_bytes_to_rebalance_partition_over_jbod参数表示参与在JBOD卷中磁盘之间自动平衡分发part的最小size,该值不能设置得太小或者太大。 若该值设置得太小,小于max_bytes_to_merge_at_max_space_in_pool/1024,那么clickhouse server进程将会启动失败,另外还会引发不必要的part在磁盘间移动。 若该值设置得过大,则很难有part达到这个条件,比如:min_bytes_to_rebalance_partition_over_jbod大于max_data_part_size_bytes(卷中的磁盘可以存储的part的最大大小),则没有part能达到自动平衡的条件。
  • 本地表建表参考 本地表创建参考: CREATE TABLE mybase_local.mytable ( `did` Int32, `app_id` Int32, `region` Int32, `pt_d` Date ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/mybase_local/mytable', '{replica}') PARTITION BY toYYYYMMDD(pt_d) ORDER BY (app_id, region) SETTINGS index_granularity = 8192, use_minimalistic_part_header_in_zookeeper = 1; 使用说明: 表引擎选择: ReplicatedMergeTree:支持副本特性的MergeTree引擎,也是最常用的引擎。 ZooKeeper上的表信息注册路径,用于区分集群中的不同配置: /clickhouse/tables/{shard}/{databaseName}/{tableName}:{shard}是分片名称,{databaseName}是数据库名称,{tableName}是复制表名称。 order by 主键字段: 查询时最常使用且过滤性最高的字段作为主键。依次按照访问频度从高到低、维度基数从小到大来排。排序字段不宜太多,建议不超过4个,否则merge的压力会较大。排序字段不允许为null,如果存在null值,需要进行数据转换。 partition by 分区字段 分区键不允许为null,如果字段中有null值,需要进行数据转换。 表级别的参数配置: index_granularity:稀疏索引粒度配置,默认是8192。 use_minimalistic_part_header_in_zookeeper:ZooKeeper中数据存储是否启动新版本的优化存储方式。 建表定义可以参考官网链接:https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/mergetree/。
  • 基于大宽表进行数据分析,不建议使用大表join大表的操作,对分布式join查询转化成本地表的join查询操作,提升性能 ClickHouse分布式join的性能较差,建议在模型侧将数据聚合成大宽表再导入ClickHouse。分布式join的查询转成本地表的join查询,不仅省去大量的节点间数据传播,同时本地表参与计算的数据量也会少很多。业务层再基于所有分片本地join的结果进行数据汇总,性能会有数量级的提升。
  • 分布式表建表参考 本地表创建参考: CREATE TABLE mybase.mytable AS mybase_local.mytable ENGINE = Distributed(cluster_3shards_2replicas, mybase_local, mytable, rand()); 使用说明: 分布式表名称:mybase.mytable。 本地表名称:mybase_local.mytable。 通过“AS”关联分布式表和本地表,保证分布式表的字段定义跟本地表一致。 分布式表引擎的参数说明: cluster_3shards_2replicas:逻辑集群名称。 mybase_local:本地表所在库名。 mytable:本地表名。 rand():可选参数,分片键(sharding key),可以是表中一列的原始数据(如did),也可以是函数调用的结果,如随机值rand()。注意该键要尽量保证数据均匀分布,另外一个常用的操作是采用区分度较高的列的哈希值,如intHash64(user_id)。
  • Doris UDF开发规则 UDF中方法调用必须是线程安全的。 UDF实现中禁止读取外部大文件到内存中,如果文件过大可能会导致内存耗尽。 需避免大量递归调用,否则容易造成栈溢出或oom。 需避免不断创建对象或数组,否则容易造成内存耗尽。 Java UDF应该捕获和处理可能发生的异常,不能将异常给服务处理,以避免程序出现未知异常。可以使用try-catch块来处理异常,并在必要时记录异常信息。 UDF中应避免定义静态集合类用于临时数据的存储,或查询外部数据存在较大对象,否则会导致内存占用过高。 应该避免类中import的包和服务侧包冲突,可通过grep -lr "完全限定类名"命令来检查冲突的Jar包。如果发生类名冲突,可通过完全限定类名方式来避免。
  • Doris UDF开发建议 不要执行大量数据的复制操作,防止堆栈内存溢出。 应避免使用大量字符串拼接操作,否则会导致内存占用过高。 Java UDF应该使用有意义的名称,以便其他开发人员能够轻松理解其用途。建议使用驼峰式命名法,并以UDF结尾,例如:MyFunctionUDF。 Java UDF应该指定返回值的数据类型,并且必须具有返回值,返回值默认或异常时不要设置为NULL。建议使用基本数据类型或Java类作为返回值类型。
  • HQL编写之隐式类型转换 查询语句使用字段的值做过滤时,不建议通过Hive自身的隐式类型转换来编写HQL。因为隐式类型转换不利于代码的阅读和移植。 建议示例: select * from default.tbl_src where id = 10001; select * from default.tbl_src where name = 'TestName'; 不建议示例: select * from default.tbl_src where id = '10001'; select * from default.tbl_src where name = TestName; 表tbl_src的id字段为Int类型,name字段为String类型。
  • UDF管理 建议由管理员创建永久UDF,避免每次使用时都去add jar,和重新定义UDF。 Hive的UDF会有一些默认属性,比如“deterministic”默认为“true”(同一个输入会返回同一个结果),“stateful”(是否有状态,默认为“true”)。当用户实现的自定义UDF内部实现了汇总等,需要在类上加上相应的注解,例如如下类: @UDFType(deterministic = false) Public class MyGenericUDAFEvaluator implements Closeable {
  • 资源释放 关于ResultScanner和Table实例,在用完之后,需要调用它们的Close方法,将资源释放掉。Close方法,要放在finally块中,来确保一定会被调用到。 正确示例: ResultScanner scanner = null; try { scanner = demoTable.getScanner(s); //Do Something here. } finally { scanner.close(); } 错误示例: 在代码中未调用scanner.close()方法释放相关资源。 scanner.close()方法未放置在finally块中。 ResultScanner scanner = null; scanner = demoTable.getScanner(s); //Do Something here. scanner.close();
共100000条