华为云用户手册

  • 创建数据库 DLI提供创建数据库的接口。您可以使用该接口创建数据库,示例代码如下: 1 2 3 4 5 6 7 8 def create_db(dli_client): try: db = dli_client.create_database('db_for_test') except DliException as e: print(e) return print(db) “default”为内置数据库,不能创建名为“default”的数据库。 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 导入数据 DLI提供导入数据的接口。您可以使用该接口将存储在OBS中的数据导入到已创建的DLI表中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def import_data(dli_client, db_name, tbl_name, queue_name): options = { "with_column_header": True, "delimiter": ",", "quote_char": "\"", "escape_char": "\\", "date_format": "yyyy/MM/dd", "timestamp_format": "yyyy/MM/dd hh:mm:ss" } try: job_id, status = \ dli_client.import_table(tbl_name, db_name, 'obs://bucket/obj/data.csv', 'csv', queue_name=queue_name, options=options) except DliException as e: print(e) return print(job_id) print(status) 在提交导入作业前,可选择通过data_type参数设置导入数据的类型,例如将data_type设置为csv。csv数据的具体格式通可过options参数设置,例如:csv的分隔符,转义符等。 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
  • 导出数据 DLI提供导出数据的接口。您可以使用该接口将DLI表中的数据导出到OBS中。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 def export_data(dli_client, db_name, tbl_name, queue_name): try: job_id, status = dli_client.export_table(tbl_name, db_name, 'obs://bucket/obj', queue_name=queue_name) except DliException as e: print(e) return print(job_id) print(status) 在提交导出作业前,可选设置数据格式、压缩类型、导出模式等,导出格式只支持csv格式。 当OBS桶目录下有文件夹和文件同名时,加载数据会优先指向该路径下的文件而非文件夹。建议创建OBS对象时,在同一级中不要出现同名的文件和文件夹。
  • 查询所有作业 DLI提供查询所有作业的接口。您可以使用该接口执行查询当前工程下的所有作业的信息并获取查询结果。示例代码如下: 1 2 3 4 5 6 7 8 def list_all_sql_jobs(dli_client): try: sql_jobs = dli_client.list_sql_jobs() except DliException as e: print(e) return for sql_job in sql_jobs: print(sql_job) 该SDK接口不支持sql_pattern,即通过指定sql片段作为作业过滤条件进行查询。 如果需要则可以通过查询所有作业API接口指定该参数进行查询。
  • 提交作业 DLI提供查询作业的接口。您可以使用该接口执行查询并获取查询结果。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def run_sql(dli_client, db_name, queue_name): # execute SQL try: sql_job = dli_client.execute_sql('select * from tbl_dli_for_test', db_name, queue_name=queue_name) result_set = sql_job.get_result(queue_name=queue_name) except DliException as e: print(e) return if result_set.row_count == 0: return for row in result_set: print(row) # export the query result to obs try: status = sql_job.export_result('obs://bucket/obj', queue_name=queue_name) except DliException as e: print(e) return print(status)
  • 取消作业 DLI提供取消作业的接口。您可以使用该接口取消已经提交的作业,若作业已经执行结束或失败则无法取消。示例代码如下: 1 2 3 4 5 6 def cancel_sql(dli_client, job_id): try: dli_client.cancel_sql(job_id) except DliException as e: print(e) return
  • 查询SQL类型作业 您可以使用该接口查询当前工程下的所有SQL类型作业的信息并获取查询结果。示例代码如下: def list_sql_jobs(dli_client): try: sql_jobs = dli_client.list_sql_jobs() except DliException as e: print(e) return
  • 操作步骤 安装JDK。从Oracle官网下载并安装JDK1.8版本安装包。 配置环境变量,在“控制面板”选择“系统”属性,单击“环境变量”。 选择“系统变量”,新建 “JAVA_HOME 变量”,路径配置为JDK安装路径,例如:“D:\Java\jdk1.8.0_45”。 编辑 “Path 变量”,在“变量值”中增加“%JAVA_HOME%\bin;”。 新建 “CLASSPATH 变量”,在“变量值”中填写 “.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar”。 检验是否配置成功,运行cmd ,输入 java -version。运行结果,请参见图1,显示版本信息,则说明安装和配置成功。 图1 检验配置是否成功
  • 修订记录 发布日期 修订说明 2023-12-05 第十五次正式发布。 优化DLI SDK参考手册结构,补充V3版本SDK使用说明。 2023-09-16 第十四次正式发布。 SDK的获取与安装补充关于SDK安装包中的.sha256文件的下载说明。 2023-07-18 第十三次正式发布。 表相关修改DECIMAL默认精度。 2023-03-09 第十二次正式发布。 SDK的获取与安装新增下载sdk的方式。 2023-01-30 第十一次正式发布。 Python SDK环境配置,补充关于安装Python需要配置Visual C++编译环境的相关说明。 2020-4-28 第十次正式发布。 调整章节目录。 2019-8-30 第九次正式发布。 调整章节目录,设置代码高亮。 2019-8-16 第八次正式发布。 将“集群”修改为“队列”。 2019-1-20 第七次正式发布。 Python SDK支持复杂数据类型。 2018-11-15 第六次正式发布。 Java SDK支持复杂数据类型。 2018-9-26 第五次正式发布。 创建/删除队列 查询队列列表 上传资源包 查询资源列表 执行批处理作业 查询所有批处理任务 2018-7-26 第四次正式发布。 增加Python SDK内容。 2018-04-23 第三次正式发布。 服务更名。 2018-02-24 第二次正式发布。 修改SDK的获取与安装章节中的下载地址。 2018-02-08 第一次正式发布。
  • Spark作业相关 表6 Spark作业相关API&SDK的对应关系表 Class Method Java Method Python Method API BatchJob 提交批处理作业 asyncSubmit submit_spark_batch_job POST /v2.0/{project_id}/batches 删除批处理作业 deleteBatchJob del_spark_batch_job DELETE /v2.0/{project_id}/batches/{batch_id} 查询所有批处理作业 listAllBatchJobs - GET /v2.0/{project_id}/batches 查询批处理作业详情 - - GET /v2.0/{project_id}/batches/{batch_id} 查询批处理作业状态 getStateBatchJob - GET /v2.0/{project_id}/batches/{batch_id}/state 查询批处理作业日志 getBatchJobLog - GET /v2.0/{project_id}/batches/{batch_id}/log
  • Flink作业模板相关 表7 Flink作业模板相关API&SDK的对应关系表 Class Java Method Python Method API Template createFlinkJobTemplate - POST /v1.0/{project_id}/streaming/job-templates updateFlinkJobTemplate - PUT /v1.0/{project_id}/streaming/job-templates/{template_id} deleteFlinkJobTemplate - DELETE /v1.0/{project_id}/streaming/job-templates/{template_id} getFlinkJobTemplates - GET /v1.0/{project_id}/streaming/job-templates
  • Flink作业相关 表5 Flink作业相关API&SDK的对应关系表 Class Method Java Method Python Method API Job 创建Flink SQL作业 submitFlinkSqlJob - POST /v1.0/{project_id}/streaming/sql-jobs 创建Flink自定义作业 createFlinkJarJob - POST /v1.0/{project_id}/streaming/flink-jobs 更新Flink SQL作业 updateFlinkSqlJob - PUT /v1.0/{project_id}/streaming/sql-jobs/{job_id} 更新Flink自定义作业 updateFlinkJarJob - PUT /v1.0/{project_id}/streaming/flink-jobs/{job_id} 查询Flink作业列表 getFlinkJobs - GET /v1.0/{project_id}/streaming/jobs 查询Flink作业详情 getFlinkJobDetail - GET /v1.0/{project_id}/streaming/jobs/{job_id} 查询Flink作业执行计划图 getFlinkJobExecuteGraph - GET /v1.0/{project_id}/streaming/jobs/{job_id}/execute-graph 查询Flink作业监控信息 getFlinkJobsMetrics - POST /v1.0/{project_id}/streaming/jobs/metrics 查询Flink作业APIG网关服务访问地址 getFlinkApigSinks - GET /v1.0/{project_id}/streaming/jobs/{job_id}/apig-sinks 运行Flink作业 runFlinkJob - POST /v1.0/{project_id}/streaming/jobs/run 停止Flink作业 stopFlinkJob - POST /v1.0/{project_id}/streaming/jobs/stop 批量删除Flink作业 deleteFlinkJobInBatch - POST /v1.0/{project_id}/streaming/jobs/delete
  • SQL作业相关 表4 SQL作业相关API&SDK的对应关系表 Class Method Java Method Python Method API Database 创建数据库 createDatabase create_database POST /v1.0/{project_id}/databases 删除数据库 deleteDatabase delete_database DELETE /v1.0/{project_id}/databases/{database_name} 查询所有数据库 listAllDatabases list_databases GET /v1.0/{project_id}/databases 修改数据库用户 - - PUT /v1.0/{project_id}/databases/{database_name}/owner Table 创建DLI表 createDLITable create_dli_table POST /v1.0/{project_id}/databases/{database_name}/tables 创建OBS表 createObsTable create_obs_table POST /v1.0/{project_id}/databases/{database_name}/tables 删除表 deleteTable delete_table DELETE /v1.0/{project_id}/databases/{database_name}/tables/{table_name} 查询所有表 listAllTables list_tables GET /v1.0/{project_id}/databases/{database_name}/tables?keyword=tb&with-detail=true 描述表信息 getTableDetail get_table_schema GET /v1.0/{project_id}/databases/{database_name}/tables/{table_name} 预览表内容 - - GET /v1.0/{project_id}/databases/{database_name}/tables/{table_name}/preview 修改表用户 - - PUT /v1.0/{project_id}/databases/{database_name}/tables/{table_name}/owner Job 导入数据 submit import_table POST /v1.0/{project_id}/jobs/import-table 导出数据 submit export_table POST /v1.0/{project_id}/jobs/export-table 提交作业 submit execute_sql POST /v1.0/{project_id}/jobs/submit-job 取消作业 cancelJob - DELETE /v1.0/{project_id}/jobs/{job_id} 查询所有作业 listAllJobs - GET /v1.0/{project_id}/jobs?page-size={size}¤t-page={page_number}&start={start_time}&end={end_time}&job-type={QUERY}&queue_name={test}&order={duration_desc} 查询作业结果 queryJobResultInfo - GET/v1.0/{project_id}/jobs/{job_id}?page-size={size}¤t-page={page_number} 查询作业状态 - - GET/v1.0/{project_id}/jobs/{job_id}/status 查询作业详细信息 - - GET/v1.0/{project_id}/jobs/{job_id}/detail 查询SQL类型作业 listSQLJobs - - 检查SQL语法 - - POST /v1.0/{project_id}/jobs/check-sql 导出查询结果 - - POST /v1.0/{project_id}/jobs/{job_id}/export-result
  • 队列相关 表2 队列相关API&SDK的对应关系表 Class Method Java Method Python Method API Queue 创建队列 createQueue - POST /v1.0/{project_id}/queues 删除队列 deleteQueue - DELETE /v1.0/{project_id}/queues/{queue_name} 获取默认队列 getDefaultQueue - - 查询所有队列 listAllQueues list_queues GET/v1.0/{project_id}/queues
  • 资源相关 表3 资源相关API&SDK的对应关系表 Class Method Java Method Python Method API packageResources 上传资源包 uploadResources upload_resource POST /v2.0/{project_id}/resources 删除资源包 deleteResource delete_resource DELETE /v2.0/{project_id}/resources/{resource_name} 查询所有资源包 listAllResources list_resources GET /v2.0/{project_id}/resources 查询指定资源包 getResource get_package_resource GET /v2.0/{project_id}/resources/{resource_name}
  • 提交批处理作业 DLI提供执行批处理作业的接口。您可以使用该接口执行批处理作业。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static void runBatchJob(Cluster cluster) throws DLIException { SparkJobInfo jobInfo = new SparkJobInfo(); jobInfo.setClassName("your.class.name"); jobInfo.setFile("xxx.jar"); jobInfo.setCluster_name("queueName"); // 调用BatchJob对象的asyncSubmit接口提交批处理作业 BatchJob job = new BatchJob(cluster, jobInfo); job.asyncSubmit(); while (true) { SparkJobStatus jobStatus = job.getStatus(); if (SparkJobStatus.SUCCESS.equals(jobStatus)) { System.out.println("Job finished"); return; } if (SparkJobStatus.DEAD.equals(jobStatus)) { throw new DLIException("The batch has already exited"); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } Cluster为用户自建的队列。 传参不能为JSON格式。 对应批处理作业提交提供两个接口: 异步 asyncSubmit,提交后直接返回,不等待 同步 submit,提交后会一直等待作业执行结束
  • 删除批处理作业 DLI提供删除批处理作业的接口。您可以使用该接口删除批处理作业。示例代码如下: 1 2 3 4 5 6 7 private static void deleteBatchJob(DLIClient client) throws DLIException { //提交Spark批处理运行作业的Id String batchId = "0aae0dc5-f009-4b9b-a8c3-28fbee399fa6"; // 调用BatchJob对象的delBatch接口取消批处理作业 MessageInfo messageInfo = client.delBatchJob(batchId); System.out.println(messageInfo.getMsg()); }
  • 查询批处理作业状态 DLI提供查询批处理作业状态的接口。您可以使用该接口查询批处理作业当前的状态信息。示例代码如下: private static void getStateBatchJob(DLIClient client) throws DLIException { BatchJob batchJob = null; SparkJobInfo jobInfo = new SparkJobInfo(); jobInfo.setClusterName("queueName"); jobInfo.setFile("xxx.jar"); jobInfo.setClassName("your.class.name"); batchJob = new BatchJob(client.getCluster("queueName"), jobInfo); batchJob.asyncSubmit(); SparkJobStatus sparkJobStatus=batchJob.getStatus(); System.out.println(sparkJobStatus); }
  • 查询所有表 DLI提供查询表的接口。您可以使用该接口查询数据库下的所有表。示例代码如下: 1 2 3 4 5 6 7 8 9 def list_all_tbls(dli_client, db_name): try: tbls = dli_client.list_tables(db_name, with_detail=True) except DliException as e: print(e) return for tbl in tbls: print(tbl.name) 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 创建OBS表 DLI提供创建OBS表的接口。您可以使用该接口创建数据存储在OBS的表。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def create_obs_tbl(dli_client, db_name, tbl_name): cols = [ Column('col_1', 'string'), Column('col_2', 'string'), Column('col_3', 'smallint'), Column('col_4', 'int'), Column('col_5', 'bigint'), Column('col_6', 'double'), Column('col_7', 'decimal(10,0)'), Column('col_8', 'boolean'), Column('col_9', 'date'), Column('col_10', 'timestamp') ] tbl_schema = TableSchema(tbl_name, cols) try: table = dli_client.create_obs_table(db_name, tbl_schema, 'obs://bucket/obj', 'csv') except DliException as e: print(e) return print(table) 创建OBS表需要指定OBS路径,且该路径需要提前创建。 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 删除表 DLI提供删除表的接口。您可以使用该接口删除数据库下的所有表。示例代码如下: 1 2 3 4 5 6 7 8 def delete_tbls(dli_client, db_name): try: tbls = dli_client.list_tables(db_name) for tbl in tbls: dli_client.delete_table(db_name, tbl.name) except DliException as e: print(e) return 表删除后,将不可恢复,请谨慎操作。 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 创建DLI表 DLI提供创建DLI表的接口。您可以使用该接口创建数据存储在DLI内部的表。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def create_dli_tbl(dli_client, db_name, tbl_name): cols = [ Column('col_1', 'string'), Column('col_2', 'string'), Column('col_3', 'smallint'), Column('col_4', 'int'), Column('col_5', 'bigint'), Column('col_6', 'double'), Column('col_7', 'decimal(10,0)'), Column('col_8', 'boolean'), Column('col_9', 'date'), Column('col_10', 'timestamp') ] sort_cols = ['col_1'] tbl_schema = TableSchema(tbl_name, cols, sort_cols) try: table = dli_client.create_dli_table(db_name, tbl_schema) except DliException as e: print(e) return print(table) 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 描述表信息 您可以使用该接口获取表的元数据描述信息。示例代码如下: def get_table_schema(dli_client, db_name, tbl_name): try: table_info = dli_client.get_table_schema(db_name, tbl_name) print(table_info) except DliException as e: print(e) return
  • 获取DLI SDK 在“DLI SDK DOWNLOAD”页面,单击选择所需的SDK链接,即可获取对应的SDK安装包。 “dli-sdk-java-x.x.x.zip”压缩包,解压后目录结构如下: 表1 目录结构 名称 说明 jars SDK及其依赖的jar包。 maven-install 安装至本地Maven仓库的脚本及对应jar包。 dli-sdk-java.version Java SDK版本说明。
  • 查询表的分区信息(包含分区的创建和修改时间) DLI提供查询表分区信息的接口。您可以使用该接口查询数据库下表的分区信息(包括分区的创建和修改时间)。示例代码如下: 1 2 3 4 5 6 7 8 9 10 11 private static void showPartitionsInfo(DLIClient client) throws DLIException { String databaseName = "databasename"; String tableName = "tablename"; //调用DLIClient对象的showPartitions方法查询数据库下表的分区信息(包括分区的创建和修改时间) PartitionResult partitionResult = client.showPartitions(databaseName, tableName); PartitionListInfo partitonInfos = partitionResult.getPartitions(); //获取分区的创建和修改时间 Long createTime = partitonInfos.getPartitionInfos().get(0).getCreateTime().longValue(); Long lastAccessTime = partitonInfos.getPartitionInfos().get(0).getLastAccessTime().longValue(); System.out.println("createTime:"+createTime+"\nlastAccessTime:"+lastAccessTime); }
  • 查询所有队列 DLI提供查询队列列表接口,您可以使用该接口并选择相应的队列来执行作业。示例代码如下: 1 2 3 4 5 6 7 8 9 def list_all_queues(dli_client): try: queues = dli_client.list_queues() except DliException as e: print(e) return for queue in queues: print(queue.name) 完整样例代码和依赖包说明请参考:Python SDK概述。
  • 查询作业APIG网关服务访问地址 DLI提供查询Flink作业APIG访问地址的接口。您可以使用该接口查询作业APIG网关服务访问地址。示例代码如下: 1 2 3 4 5 private static void getFlinkApigSinks(DLIClient client) throws DLIException { Long jobId = 59L;//作业1ID FlinkJobApigSinksResponse result = client.getFlinkApigSinks(jobId); System.out.println(result); }
  • 更新自定义作业 DLI提供更新Flink自定义作业的接口。您可以使用该接口更新已经创建的自定义作业,目前仅支持Jar格式和运行在独享队列中。示例代码如下: 1 2 3 4 5 6 private static void updateFlinkJob(DLIClient client) throws DLIException { UpdateFlinkJarJobRequest body = new UpdateFlinkJarJobRequest(); body.name("update-job"); JobUpdateResponse result = client.updateFlinkJarJob(body,202L); System.out.println(result); }
  • 查询作业列表 DLI提供查询Flink作业列表的接口。您可以使用该接口查询作业列表。作业列表查询支持以下参数: name,status,show_detail,cursor,next,limit,order。本示例排序方式选择降序desc,将会列出作业id小于cursor的作业列表信息。示例代码如下: 1 2 3 4 private static void QueryFlinkJobListResponse(DLIClient client) throws DLIException { QueryFlinkJobListResponse result = client.getFlinkJobs(null, "job_init", null, true, 0L, 10, null, null,null,null,null); System.out.println(result); }
  • 查询作业详情 DLI提供查询Flink作业详情的接口。您可以使用该接口查询作业的详情。示例代码如下: 1 2 3 4 5 private static void getFlinkJobDetail(DLIClient client) throws DLIException { Long jobId = 203L;//作业ID GetFlinkJobDetailResponse result = client.getFlinkJobDetail(jobId); System.out.println(result); }
共100000条