华为云用户手册

  • consumer使用规范 consumer的owner线程需确保不会异常退出,避免客户端无法发起消费请求,阻塞消费。 确保处理完消息后再做消息commit,避免业务消息处理失败,无法重新拉取处理失败的消息。 通常不建议对每条消息都进行commit,如果对每条消息都进行了commit,会导致OFFSET_COMMIT请求过多,进而导致CPU使用率过高。例如:如果一个消费请求拉取1000条消息,每条都commit,则commit请求TPS是消费的1000倍,消息体越小,这个比例越大。建议隔一定条数或时间,批量commit,或打开enable.auto.commit,这样设置会存在一个缺点,即在客户端故障时,可能丢失一部分缓存的消费进度,导致重复消费。请根据业务实际情况,设置批量commit。 consumer不能频繁加入和退出group,频繁加入和退出,会导致consumer频繁做rebalance,阻塞消费。 consumer数量不能超过topic分区数,否则会有consumer拉取不到消息。 consumer需周期poll,维持和server的心跳,避免心跳超时,导致consumer频繁加入和退出,阻塞消费。 consumer拉取的消息本地缓存应有大小限制,避免OOM(Out of Memory)。 consumer session设置为30秒,session.timeout.ms=30000。 Kafka不能保证消费重复的消息,业务侧需保证消息处理的幂等性。 消费线程退出要调用consumer的close方法,避免同一个组的其他消费者阻塞sesstion.timeout.ms的时间。 消费组名称开头不使用特殊字符(如#),使用特殊字符可能会导致云监控无法展示此消费组的监控数据。
  • 客户端网络环境说明 客户端可以通过以下方式访问Kafka实例: 如果客户端是云上ECS,与Kafka实例处于同region同VPC,则可以直接访问Kafka实例提供的内网连接地址。 如果客户端是云上ECS,与Kafka实例处于相同region但不同VPC,通过以下任意一种方式访问。 创建VPC对等连接,将两个VPC的网络打通,实现跨VPC访问。具体步骤请参考VPC对等连接说明。 创建一个云连接实例,然后在创建的云连接实例中加载需要互通的VPC,实现跨VPC访问。具体步骤请参考同区域同账号VPC互通。 利用VPC终端节点在不同VPC间建立跨VPC的连接通道,实现Kafka客户端通过内网访问Kafka实例。 如果客户端在其他网络环境,或者与Kafka实例处于不同region,则访问实例的公网地址。 公网访问时,注意修改Kafka实例的安全组,允许端口9094(未开启SASL)/9095(开启SASL)被外部网络访问。 不同网络环境,对于客户端配置来说,只是连接地址的差异,其他都一样。因此,本手册以同一VPC内子网地址的方式,介绍客户端开发环境搭建。 遇到连接超时或失败时,请注意确认网络是否连通。可使用telnet方式,检测实例连接地址与端口。
  • 在application.properties文件中填写配置 #=============== Kafka ========================== ## Kafka实例的broker信息,ip:port为实例的连接地址和端口 spring.kafka.bootstrap-servers=ip1:port1,ip2:port2,ip3:port3 #=============== 生产者配置 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== 消费者配置 ======================= spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #======== SASL配置(不开启SASL时将以下配置删除) ======= ## 设置SASL认证机制、账号和密码。 ## spring.kafka.properties.sasl.mechanism为SASL认证机制,username和password为SASL_SSL的用户名和密码,参考“收集连接信息”章节获取。 ## SASL认证机制为“PLAIN”时,配置信息如下。 spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; ## SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 spring.kafka.properties.sasl.mechanism=SCRAM-SHA-512 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; ## 设置Kafka安全协议。spring.kafka.security.protocol为安全协议。 ## 安全协议为“SASL_SSL”时,配置信息如下。 spring.kafka.security.protocol=SASL_SSL ## spring.kafka.ssl.trust-store-location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 spring.kafka.ssl.trust-store-location=E:\\temp\\client.jks ## spring.kafka.ssl.trust-store-password为服务器证书密码,无需修改,配置此密码是为了访问Java生成的jks文件。 spring.kafka.ssl.trust-store-password=dms@kafka ## spring.kafka.properties.ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 spring.kafka.properties.ssl.endpoint.identification.algorithm= ## 安全协议为“SASL_PLAINTEXT”时,配置信息如下。 spring.kafka.security.protocol=SASL_PLAINTEXT
  • Kafka实例是否支持跨VPC访问? Kafka实例支持跨VPC访问,您可以通过以下任意一个方式实现跨VPC访问: 创建VPC对等连接,将两个VPC的网络打通,实现跨VPC访问。具体步骤请参考VPC对等连接说明。 创建一个云连接实例,然后在创建的云连接实例中加载需要互通的VPC,实现跨VPC访问。具体步骤请参考同区域同账号VPC互通。 利用VPC终端节点在不同VPC间建立跨VPC的连接通道,实现Kafka客户端通过内网访问Kafka实例。具体步骤请参考跨VPC访问Kafka实例。 父主题: 连接问题
  • 获取实例连接信息 实例连接地址和端口 实例创建后,从RocketMQ实例控制台的“基本信息”页面获取,在客户端配置时,可将地址都配上。 使用TCP协议通过内网连接RocketMQ实例时,获取“连接地址”。 使用gRPC协议通过内网连接RocketMQ实例时,获取“grpc连接地址”。 使用TCP协议通过公网连接RocketMQ实例时,获取“公网连接地址”。 使用gRPC协议通过公网连接RocketMQ实例时,获取“grpc公网连接地址”。 RocketMQ实例5.x版本支持gRPC协议,4.8.0版本不支持。 图1 查看实例的连接地址和端口(5.x版本) 图2 查看实例的连接地址和端口(4.8.0版本) Topic名称 从RocketMQ实例控制台的“Topic管理”页签中获取Topic名称。 消费组名称 从RocketMQ实例控制台的“消费组管理”页签中获取消费组名称。 用户名和用户密钥 从RocketMQ实例控制台的“用户管理”页面获取用户名,在用户详情页获取用户密钥。
  • 同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码,或者通过Producer.java获取更多示例代码。 import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Main { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); //填入连接地址 producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请添加此行代码。 try { producer.start(); Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
  • 异步发送 异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。 使用异步发送需要客户端实现异步发送回调接口(SendCallback)。即消息发送方在发送了一条消息后,不需要等待服务端响应接着发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。 参考如下示例代码,或者通过AsyncProducer.java获取更多示例代码。 import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Main { public static void main(String[] args) throws InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); //填入连接地址 producer.setNamesrvAddr("192.168.120.45:8100;192.168.123.150:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 try { producer.start(); Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // 消息发送成功。 System.out.println("send message success. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println("send message failed."); throwable.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } Thread.sleep(2000); producer.shutdown(); }}
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送定时消息 发送定时消息的示例代码如下: import java.nio.charset.StandardCharsets; import java.time.Instant; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; public class ScheduledMessageProducer1 { public static final String TOPIC_NAME = "ScheduledTopic"; public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 填入连接地址 producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 producer.start(); // 定时消息投递时间戳,该消息10秒后投递 final long deliverTimestamp = Instant.now().plusSeconds(10).toEpochMilli(); // 创建消息对象 Message msg = new Message(TOPIC_NAME, "TagA", "KEY", "scheduled message".getBytes(StandardCharsets.UTF_8)); // 设置消息定时投递的时间戳属性 msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp)); // 发送消息,该消息将会在10秒后投递 SendResult sendResult = producer.send(msg); // 打印发送结果和预计投递时间 System.out.printf("%s %s%n", sendResult, UtilAll.timeMillisToHumanString2(deliverTimestamp)); producer.shutdown(); } }
  • 生产者增加用户认证信息 在生产者客户端新建配置文件“config.yaml”,添加如下认证信息。如果配置文件已存在,无需再次创建,直接添加认证信息。 ACL_ACCESS_KEY: "******" ACL_SECRET_KEY: "******" ACL_ACCESS_KEY为用户名,ACL_SECRET_KEY为用户的密钥。创建用户的步骤,请参见创建用户。为了确保用户名和密钥的安全性,建议对用户名和密钥进行加密处理。 生产者初始化时需要增加“rpcHook”参数。 普通消息、顺序消息和定时消息,增加如下代码。 RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY)); DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", rpcHook); 事务消息,增加如下代码。 RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY)); TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName", rpcHook);
  • 消费者增加用户认证信息 在消费者客户端新建配置文件“config.yaml”,添加如下认证信息。如果配置文件已存在,无需再次创建,直接添加认证信息。 ACL_ACCESS_KEY: "******" ACL_SECRET_KEY: "******" ACL_ACCESS_KEY为用户名,ACL_SECRET_KEY为用户的密钥。创建用户的步骤,请参见创建用户。为了确保用户名和密钥的安全性,建议对用户名和密钥进行加密处理。 消费者初始化时需要增加“rpcHook”参数。无论是普通消息、顺序消息、定时消息,还是事务消息,都增加如下代码。 RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY)); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "ConsumerGroupName", rpcHook);
  • 发送事务消息 参考如下示例代码,或者通过TransactionProducer.java获取更多示例代码。 import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; public class Main { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { TransactionListener transactionListener = new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("开始执行本地事务: " + message); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("收到回查,重新查询事务状态: " + messageExt); return LocalTransactionState.COMMIT_MESSAGE; } }; TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); //填入连接地址 producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 producer.setTransactionListener(transactionListener); producer.start(); Message msg = new Message("TopicTest", "TagA", "KEY", "Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); producer.shutdown(); }}
  • 概述 第二章介绍如何获取RocketMQ实例连接信息。 第三章~第七章介绍Java、Go和Python客户端访问分布式消息服务RocketMQ版的示例代码,具体如表1所示。 表1 示例代码 客户端语言 示例代码 Java(TCP协议) 收发普通消息 收发顺序消息 收发事务消息 发送定时消息 使用ACL权限访问 消费端限流 Java(gRPC协议) 收发普通消息 收发顺序消息 收发事务消息 发送定时消息 Go(TCP协议) 收发普通消息 收发顺序消息 收发事务消息 发送定时消息 使用ACL权限访问 Go(gRPC协议) 收发普通消息 收发顺序消息 收发事务消息 发送定时消息 Python(TCP协议) 收发普通消息 收发顺序消息 收发事务消息 发送定时消息 使用ACL权限访问
  • 同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码,或者通过ProducerNormalMessageExample.java获取更多示例代码。 import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; public class ProducerNormalMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageExample.class); public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourNormalTopics"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入AK/SK,创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果开启了SSL,请增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送顺序消息 参考如下示例代码,或者通过ProducerFifoMessageExample.java获取更多示例代码。 import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; public class ProducerFifoMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerFifoMessageExample.class); public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourNormalTopics"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入AK/SK,创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果开启了SSL,请增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") // 指定消息组,相同消息组的消息会被分配到同一个队列。 .setMessageGroup("yourMessageGroup0") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 发送定时消息 发送定时消息的示例代码如下,或者通过ProducerDelayMessageExample.java获取更多示例代码。 import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; public class ProducerDelayMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class); private ProducerDelayMessageExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourDelayTopic"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入AK/SK,创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果开启了SSL,请增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; Duration messageDelayTime = Duration.ofSeconds(10); final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") // 设置定时消息投递时间戳 .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 取消定时消息 取消定时消息的示例代码如下: import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; public class ProducerDelayMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class); public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourDelayTopic"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入AK/SK,创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果开启了SSL,请增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); try { // ====== 发送定时消息逻辑 ====== byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; Duration messageDelayTime = Duration.ofSeconds(10); final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") // 设置定时消息投递时间戳 .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build(); final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); // ====== 发送取消消息逻辑 ====== // 创建取消消息对象 Message cancle = provider.newMessageBuilder() .setTopic(topic) .setBody("cancel".getBytes(StandardCharsets.UTF_8)) // 设置取消消息的时间戳,该时间戳必须与要取消的定时消息的定时时间戳一致。 .setDeliveryTimestamp(message.getDeliveryTimestamp().get()) // 设置要取消消息的ID,为发送消息的唯一ID(UNIQUE_KEY),可以从发送消息的结果中获取。 .addProperty("__CANCEL_SCHEDULED_MSG", sendReceipt.getMessageId().toString()) .build(); // 发送取消消息,必须在定时消息被投递之前发送才可以取消。 final SendReceipt cancelSendReceipt = producer.send(cancle); log.info("Send cancel message successfully, messageId={}", cancelSendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 续费相关的功能 包年/包月Kafka实例续费相关的功能如表1所示。 表1 续费相关的功能 功能 说明 手动续费 包年/包月Kafka实例从购买到被自动删除之前,您可以随时在Kafka控制台为Kafka实例续费,以延长Kafka实例的使用时间。 自动续费 开通自动续费后,Kafka实例会在每次到期前自动续费,避免因忘记手动续费而导致资源被自动删除。 在一个包年/包月Kafka实例生命周期的不同阶段,您可以根据需要选择一种方式进行续费,具体如图1所示。 图1 Kafka实例生命周期 Kafka实例从购买到到期前,处于正常运行阶段,资源状态为“运行中”。 到期后,资源状态变为“已过期”。 到期未续费时,Kafka实例首先会进入宽限期,宽限期到期后仍未续费,资源状态变为“已冻结”。 超过宽限期仍未续费将进入保留期,如果保留期内仍未续费,资源将被自动删除。 华为云根据客户等级定义了不同客户的宽限期和保留期时长。 在Kafka实例到期前均可开通自动续费,到期前7日凌晨3:00首次尝试自动续费,如果扣款失败,每天凌晨3:00尝试一次,直至Kafka实例到期或者续费成功。到期前7日自动续费扣款是系统默认配置,您也可以根据需要修改此扣款日。
  • 续费简介 包年/包月Kafka实例到期后会影响Kafka实例正常运行。如果您想继续使用,需要在指定的时间内为Kafka实例续费,否则Kafka实例资源会自动释放,数据丢失且不可恢复。 续费操作仅适用于包年/包月Kafka实例,按需计费Kafka实例不需要续费,只需要保证账户余额充足即可。 Kafka实例在到期前续费成功,所有资源得以保留,且Kafka实例的运行不受影响。Kafka实例到期后的状态说明,请参见到期后影响。
  • 状态码 状态码4xx:由于明显的客户端错误(例如,格式错误的请求语法、参数错误等),华为云会返回4xx错误码,请及时检查请求消息的正确性,重新发起请求。 状态码5xx:由于华为云系统原因,导致无法完成明显有效请求的处理,可及时联系华为云客服处理。 HTTP状态码 错误码 描述 500 CBC.30000010 无效订单(可能是订单不存在),不能进行操作。 400 CBC.99003106 订单状态已发生变更,不能进行支付操作。 400 CBC.99003108 您选择的折扣不可用于该订单。 400 CBC.99003110 订单已经过了支付截止时间,请重新提交订单。 400 CBC.99003112 优惠券或储值卡已被使用。 400 CBC.99003116 选择的优惠券限制不能和折扣同时使用。 400 CBC.99003117 选择的优惠券配置了最小折扣比例,而选择的折扣小于这个折扣比例。 例如:优惠券配置的最小折扣比例为90%,而选择的折扣在所有订单行上都小于90%,则该折扣不可使用。 400 CBC.99003141 云服务配额或容量不足或规格发生变化。 400 CBC.99003147 代金券和折扣券不能同时使用。 400 CBC.99003155 企业项目资金配额不足/项目不可用。 400 CBC.99003156 云商店券和专用代金券不能同时使用。 400 CBC.99003162 支付时间超过设定的生效时间。 400 CBC.99003198 企业组织预算不足。 400 CBC.99005003 余额不足。 400 CBC.99006093 获取不到税率信息。 400 CBC.99008040 操作频繁。 500 CBC.0999 其他错误。 400 CBC.99000089 抱歉,当前订单号不能操作,请使用组合交易单号操作。
  • 功能介绍 该接口即将下线,“支付包年/包月产品订单”接口请参考支付包年/包月产品订单。 客户可以对待支付状态的包年/包月产品订单进行支付。 客户登录费用中心支付包年包月产品的待支付订单请单击这里。 余额支付包括现金账户和信用账户两种支付方式,如果两个账户都有余额,则优先现金账户支付。 同时使用订单折扣和优惠券的互斥规则如下: 如果优惠券的限制属性上存在simultaneousUseWithEmpowerDiscount字段,并且值为0,则优惠券和商务授权折扣及伙伴授予折扣不能同时使用。 如果优惠券的限制属性上存在minConsumeDiscount字段,当折扣ID包含的所有订单项上的折扣率discount_ratio都小于minConsumeDiscount字段时,则折扣ID和优惠券不能同时使用。 如果优惠券的限制属性上存在simultaneousUseWithPromotionDiscount字段,并且值为0,则优惠券和促销折扣不能同时使用。 财务托管企业子调用该接口时,若企业主帐号存在订单可用的优惠券,则支付订单时会自动使用,无需在请求参数中携带优惠券ID。
  • 状态码 状态码4xx:由于明显的客户端错误(例如,格式错误的请求语法、参数错误等),华为云会返回4xx错误码,请及时检查请求消息的正确性,重新发起请求。 状态码5xx:由于华为云系统原因,导致无法完成明显有效请求的处理,可及时联系华为云客服处理。 HTTP状态码 错误码 描述 400 CBC.5003 余额不足。 400 CBC.99000000 无对该客户的操作权限。 400 CBC.99000035 非合作伙伴子客户。 400 CBC.99000037 您没有操作该云经销商的权限。 403 CBC.0151 访问拒绝。 500 CBC.0999 其他错误。
  • 请求消息 请求参数 参数 是否必选 参数类型 取值范围 描述 customer_id 是 String 最大长度:64 客户账号ID。您可以调用查询客户列表接口获取customer_id。 amount 是 BigDecimal - 拨款金额。 单位:元。取值大于0且精确到小数点后2位。 注意该值不能大于“查询伙伴账户余额”接口响应消息表2中参数amount - designated_amount的值。 indirect_partner_id 否 String 最大长度:64 云经销商ID。获取方法请参见查询云经销商列表。 云经销商(二级经销商)给子客户拨款时,需携带该参数。除此之外,该参数不做处理。 说明: 该参数存在的情况下,如果结果返回余额不足,表示对应的二级经销商的余额不足。 如果该参数不存在,则余额不足表示调用的伙伴自身账号的余额不足。 请求示例 POST https://bss.myhuaweicloud.com/v2/accounts/partner-accounts/adjust-amount HTTP/1.1 Content-Type: application/json X-Auth-Token:MIIPAgYJKoZIhvcNAQcCo...ggg1BBIINPXsidG9rZ { "customer_id": "0666aa7a7900d5c80f6dc01a9a3598a0", "amount": 10.00 }
  • Web UI 客户可以在费用中心“我的订单”页面对待支付状态的包年/包月产品订单进行支付。 进入“我的订单”页面。 选中“待支付”页签。 开通了项目组管理的客户可以在“我的订单”右侧的下拉选项框中筛选该客户下各项目组的名称,查询各项目组待支付的订单。 根据实际情况选择支付类型。 单个支付:单击待支付订单列表对应的“支付”。 合并支付:选中待支付订单前的复选框,单击“合并支付”。 选择优惠和支付方式,单击“确认付款”。 开启敏感操作保护的客户,支付订单时需进行二次认证确认身份。 优惠包含折扣优惠和优惠券。 折扣优惠,包含商务授权折扣、合作伙伴授权折扣、合作伙伴调价优惠、线下合同折扣和折扣券优惠。折扣优惠不可同时使用。 客户下单时选择使用促销折扣或折扣券,如果促销折扣/折扣券的失效时间比订单正常取消(当前为七天)的时间少,则支付截止时间调整为促销折扣/折扣券的失效时间。 示例: 客户在2018/11/26 23:12:32下单购买某云服务时,选择使用有效期为2018-11-01 00:00:00~2018-11-30 23:59:59的 8折 折扣券。这时,该订单的失效时间调整为2018/11/30 23:59:59。(正常订单的失效时间为7天,即2018/11/26 23:12:32下单时创建订单,订单的失效时间为2018/12/03 23:12:32。) 客户下单时选择适应合同商务授权折扣或合作伙伴授予折扣,支付订单时以下单的价格为准。即客户下单时折扣在有效期,虽支付时折扣已过期,但支付时折扣价格不变,依旧享受折扣。 示例: 客户在2018/11/26 23:12:32下单购买某云服务时,选择使用有效期为2018-11-01 00:00:00~2018-11-30 23:59:59的 8折 合同商务授权折扣。订单失效时间(2018/12/03 23:12:32)前,支付该订单仍可享8折的优惠。 优惠券包含现金券和代金券。 对于弹性云服务器、云硬盘、虚拟私有云的新购订单和规格变更订单,请在订单确认页面勾选可使用的折扣优惠。进入支付页面,不可再修改折扣优惠,但可以使用现金券或代金券。 当伙伴为子客户设置订单折扣后,子客户支付订单时,不可再修改折扣优惠。 系统支持余额支付和在线支付两种支付方式。对于顾问销售类的合作伙伴子客户,支持使用“伙伴支付”的支付方式,即申请由合作伙伴提交并支付订单。使用“伙伴支付”支付方式时,不支持使用优惠券。 在“支付页面”,还可以根据当前的支付金额,生成请款合同,以便于客户向公司申请经费、报账或归档。 生成请款合同需跳转到“合同管理”页面,将取消本次支付。详细操作步骤如下: 单击“更多支付方式”,选择“生成请款合同”。 单击“生成合同”,跳转到“合同管理”页面。 申请线上合同。详细操作步骤,请参见如何申请线上合同。 获取并输入手机验证码。 使用当前客户的手机号码获取验证码。 单击“切换验证手机”,可以修改用于支付验证的手机号码。 单击“确认”。 系统提示支付成功信息。
  • 状态码 状态码4xx:由于明显的客户端错误(例如,格式错误的请求语法、参数错误等),华为云会返回4xx错误码,请及时检查请求消息的正确性,重新发起请求。 状态码5xx:由于华为云系统原因,导致无法完成明显有效请求的处理,可及时联系华为云客服处理。 HTTP状态码 错误码 描述 500 CBC.30000010 无效订单(可能是订单不存在),不能进行操作。 400 CBC.99003106 订单状态已发生变更,不能进行支付操作。 400 CBC.99003108 您选择的折扣不可用于该订单。 400 CBC.99003110 订单已经过了支付截止时间,请重新提交订单。 400 CBC.99003112 优惠券或储值卡已被使用。 400 CBC.99003116 选择的优惠券限制不能和折扣同时使用。 400 CBC.99003117 选择的优惠券配置了最小折扣比例,而选择的折扣小于这个折扣比例。 例如:优惠券配置的最小折扣比例为90%,而选择的折扣在所有订单行上都小于90%,则该折扣不可使用。 400 CBC.99003141 云服务配额或容量不足或规格发生变化。 400 CBC.99003147 代金券和折扣券不能同时使用。 400 CBC.99003155 企业项目资金配额不足/项目不可用。 400 CBC.99003156 云商店券和专用代金券不能同时使用。 400 CBC.99003162 支付时间超过设定的生效时间。 400 CBC.99003198 企业组织预算不足。 400 CBC.99005003 余额不足。 400 CBC.99006093 获取不到税率信息。 400 CBC.99008040 操作频繁。 500 CBC.0999 其他错误。 400 CBC.99000088 抱歉,支付处理中,请勿重复操作。 400 CBC.99000089 抱歉,当前订单号不能操作,请使用组合交易单号操作。
  • 功能介绍 客户可以对待支付状态的包年/包月产品订单进行支付。 客户登录费用中心支付包年包月产品的待支付订单请单击这里。 余额支付包括现金账户和信用账户两种支付方式,如果两个账户都有余额,则优先现金账户支付。 同时使用订单折扣和优惠券的互斥规则如下: 如果优惠券的限制属性上存在simultaneousUseWithEmpowerDiscount字段,并且值为0,则优惠券和商务授权折扣及伙伴授予折扣不能同时使用。 如果优惠券的限制属性上存在minConsumeDiscount字段,当折扣ID包含的所有订单项上的折扣率discount_ratio都小于minConsumeDiscount字段时,则折扣ID和优惠券不能同时使用。 如果优惠券的限制属性上存在simultaneousUseWithPromotionDiscount字段,并且值为0,则优惠券和促销折扣不能同时使用。 财务托管企业子调用该接口时,若企业主帐号存在订单可用的优惠券,则支付订单时会自动使用,无需在请求参数中携带优惠券ID。
  • IPv6网络的应用场景 如果您的实例规格支持IPv6,您可以使用IPv4/IPv6双栈网络,场景示例和资源规划如表1所示。 表1 IPv4/IPv6双栈网络的应用场景及资源规划 应用场景 场景示例 子网 IEC IPv6内网通信 同一个VPC、同一个站点下,您在IEC上部署应用,需要与其他系统(比如数据库)之间使用IPv6进行内网互访 IPv4网段 IPv6网段 IPv4私有地址:用于IPv4内网通信 IPv6地址:用于IPv6内网通信 IPv6公网通信 您在IEC上部署应用并面向公网客户端提供服务,支持客户端通过IPv6地址访问 IPv4网段 IPv6网段 IPv4私有地址+IPv4 EIP地址:用于IPv4公网通信 IPv6地址+共享带宽:用于IPv6公网通信 您在IEC上部署应用并面向公网客户端提供服务,既要支持客户端通过IPv6地址访问,还要对这些访问来源进行数据分析 图1 IPv6网络应用场景
  • 什么是IPv4/IPv6双栈网络 IPv4/IPv6双栈网络,表示为您的边缘实例提供两个版本的的IP地址:IPv4 IP地址和IPv6 IP地址,这两个IP地址都可以进行内网或者公网访问使用IPv4/IPv6双栈网络可实现以下功能: 使用IPv4私有IP地址,实现IEC在内网之间互访。 使用IPv4私有IP地址,通过绑定弹性公网IP的方式,实现IEC和公网之间互访。 使用IPv6 IP地址,实现双栈IEC在内网之间互访。 使用IPv6 IP地址,通过绑定带宽的方式,实现IEC和公网之间互访。 创建子网后,为子网开启IPv6,将自动为当前子网分配IPv6网段。
共100000条