华为云用户手册

  • 使用场景 默认情况下,RabbitMQ生产者生产的消息存储在内存中,当节点宕机或重启时,如何确保消息不丢失呢?RabbitMQ通过持久化机制实现,持久化包括Exchange持久化、Queue持久化和Message持久化。持久化是将内存中的消息写入到磁盘中,以防异常情况导致内存中的消息丢失。但是磁盘的读写速度远不如内存,开启消息持久化后,RabbitMQ的性能会下降。与惰性队列不同,持久化消息会在磁盘和内存中各存储一份,只有在内存空间不够时,才会将内存中的消息删除,存储到磁盘中。 非持久化Queue、Exchange在重启之后会丢失。 非持久化Message在重启之后会丢失(经过持久化Queue/Exchange的消息不会自动变为持久化消息)。 持久化消息在尚未完成持久化时,如果服务器重启,消息会丢失。
  • 消费者确认 消费者确认是指服务端通过确认消息是否成功被消费者接收,来判断是否删除队列中的此消息。 消费者确认对数据可靠性十分重要,接收重要消息的消费应用程序在未处理完消息前不应确认消息,以便消费者有足够的时间处理消息,无需担心消息处理过程中由于消费者进程异常(如工作程序崩溃、重启等)导致消息丢失。 消费者确认在客户端上配置,通过配置basicConsume方法启用确认。在channel中启用消费者确认适用于大多数场景。 以下示例演示在Java客户端配置消费者确认(使用Channel#basicAck设置basic.ack为肯定): // this example assumes an existing channel instanceboolean autoAck = false;channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } }); 未确认的消息缓存在内存中,如果未确认的消息过多,会导致内存使用率过高,此时可以在客户端配置预取值来限制消费者预取的消息数量,具体方法请参见预取值。
  • 使用场景 生产者在发布消息后,怎么确认消息已正确发布到服务器端?服务器端又怎么确认消息已成功被消费?RabbitMQ提供的消息确认机制可以解决此问题。 在使用RabbitMQ时,生产者确认和消费者确认对于确保数据可靠性至关重要。如果连接失败,传输中的消息可能会丢失,需要重新传输。确认机制可以让服务器和客户端知道何时重新传输消息。客户端可以在收到消息时确认消息,也可以在客户端完全处理完消息后确认。生产者确认会影响性能,如果需要很高的吞吐量,应禁用生产者确认。注意,不使用生产者确认会导致可靠性下降。 更多关于消息确认机制的说明,请参考Consumer Acknowledgements and Publisher Confirms。
  • 使用场景 单一活跃消费者(Single Active Consumer)表示队列中可以注册多个消费者,但是只允许一个消费者消费消息,只有在此消费者出现异常时,才会自动转移到另一个消费者进行消费。单一活跃消费者适用于需要保证消息消费顺序性,同时提供高可靠能力的场景。 分布式消息服务RabbitMQ版3.8.35版本才提供单一活跃消费者特性。 图1 单一活跃消费者消费流程 如图1所示,Producer生产9条消息,由于队列设置了单一活跃消费者特性,只有Consumer 1在消费消息。 更多关于单一活跃消费者的说明,请参考Single Active Consumer。
  • 生产者确认 生产者确认,即服务端在收到来自生产者的消息时进行确认。 以下示例演示在Java客户端配置生产者确认: try {channel.confirmSelect() ; //将信道置为publisher confirm模式//之后正常发送消息channel . basicPublish( "exchange " , " routingKey" , null , "publisher confirm test " .getBytes());if (!channel.waitForConfirms()) {System.out.println( "send message failed " ) ;// do something else....}} catch (InterruptedException e) {e.printStackTrace() ;} 调用channel .waitForConfirms方法之后,会等待服务端确认,这是一种同步等待的方式,会对性能产生影响。如果生产者要满足at least once,就必须使用同步等待方式。
  • DMS for RabbitMQ请求条件 您可以在创建自定义策略时,通过添加“请求条件”(Condition元素)来控制策略何时生效。请求条件包括条件键和运算符,条件键表示策略语句的 Condition元素,分为全局级条件键和服务级条件键。全局级条件键(前缀为g:)适用于所有操作,服务级条件键(前缀为服务缩写,如dms:)仅适用于对应服务的操作。运算符与条件键一起使用,构成完整的条件判断语句。 DMS for RabbitMQ通过IAM预置了一组条件键,例如,您可以先使用dms:ssl条件键检查RabbitMQ实例是否开启SSL,然后再允许执行操作。下表显示了适用于DMS for RabbitMQ服务特定的条件键。 表1 DMS for RabbitMQ请求条件 DMS for RabbitMQ条件键 运算符 描述 dms:publicIP Bool IsNullOrEmpty BoolIfExists 是否开启公网 dms:ssl Bool IsNullOrEmpty BoolIfExists 是否开启SSL 父主题: 权限管理
  • 使用场景 设置预取值可以限制未被确认的消息个数,一旦消费者中未被确认的消息数量达到设置的预取值,服务端将不再向此消费者发送消息,除非至少有一个未被确认的消息被确认。设置预取值本质上是一种对消费者进行流控的方法。 设置预取值时,需要考虑多种因素: 预取值设置太小可能会损害性能,RabbitMQ会一直在等待获得发送消息的权限。 预取值设置太大可能会导致从队列中取出大量消息传递给一个消费者,而使其他消费者处于空闲状态。另外还需要考虑消费者的配置,消费者在处理消息时会将所有消息保存在内存中,太大的预取值会对消费者的性能产生负面影响,甚至可能会导致消费者崩溃。 更多关于预取值的说明,请参考Consumer Prefetch。
  • DMS for RabbitMQ资源 资源是服务中存在的对象。在DMS for RabbitMQ中,资源包括:rabbitmq,您可以在创建自定义策略时,通过指定资源路径来选择特定资源。 表1 DMS for RabbitMQ的指定资源与对应路径 指定资源 资源名称 资源路径 rabbitmq 实例 【格式】 DMS:*:*:rabbitmq:实例ID 【说明】 对于实例资源,IAM自动生成资源路径前缀DMS:*:*:rabbitmq: 通过实例ID指定具体的资源路径,支持通配符*。例如: DMS:*:*:rabbitmq:*表示任意RabbitMQ实例。 父主题: 权限管理
  • 如何设置合适的预取值? 如果您只有一个或很少几个消费者在处理消息,建议一次预取多条消息,尽量让客户端保持忙碌。如果您的处理时间和网络状态稳定,则只需将总往返时间除以每条消息在客户端的处理时间即可获得估计的预取值。 在消费者多且处理时间短的情况下,建议使用较低的预取值。过低的预取值会使消费者闲置,因为消费者在处理完消息后需要等待下一批的消息到达。过高的值可能会使单个消费者忙碌,其他消费者处于空闲状态。 在消费者多且处理时间很长的情况下,建议您将预取值设置为1,以便消息在所有消费者间均匀分布。
  • 设置预取值 以下示例演示在Java客户端为单个消费者设置预取值为10。 ConnectionFactory factory = new ConnectionFactory();Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.basicQos(10, false);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume("my_queue", false, consumer); 在Java客户端中,global的默认值为false,因此以上示例可以简单地写为channel.basicQos(10)。 global取值的含义如下: 表1 global取值说明 global取值 说明 false 分别作用于通道上的每个新消费者。 true 在通道上的所有消费者之间所共享。
  • 在客户端设置心跳超时时间 如果心跳超时时间设置过低,在短暂的网络拥塞或短暂的服务器流量控制等情况下可能会产生误报。对于大多数环境,超时时间设置为5-20秒最佳。 使用Java客户端启动心跳。 在创建连接前使用ConnectionFactory#setRequestedHeartbeat进行设置,示例如下: ConnectionFactory cf = new ConnectionFactory(); // 将心跳超时时间设置为15秒cf.setRequestedHeartbeat(15); 使用.NET客户端启用心跳。 var cf = new ConnectionFactory();// 将心跳超时时间设置为15秒cf.RequestedHeartbeat = TimeSpan.FromSeconds(15);
  • 心跳帧 心跳帧发送时间间隔为心跳超时时间/2,该值有时也被称为心跳间隔。客户端在两次错过心跳后,会被认为是不可达的。不同的客户端会以不同的方式显示这一点,但TCP连接将被关闭。当客户端检测到服务端由于心跳而无法访问时,需要重新连接。 任何流量(如协议操作、消息发布、消息确认、心跳帧等)都会被认为是有效的心跳。如果连接上有其他流量,客户端可以选择发送心跳帧,也可以选择不发送。如果连接上没有其他流量,客户端必须发送心跳帧。
  • 仲裁队列与镜像队列的差异 仲裁队列是RabbitMQ 3.8版本引入的队列类型,它与镜像队列拥有类似的功能,为RabbitMQ提供高可用的队列。镜像队列有一些设计上的缺陷,这也是RabbitMQ提供仲裁队列的原因。 镜像队列主要的缺陷在于消息同步的性能低。 镜像队列包含一个主队列和多个从队列,当生产者向主队列发送一条消息,主队列会将消息同步给从队列,所有的从队列都保存消息后,主队列才会向生产者发送确认。 RabbitMQ使用集群部署时,如果其中一个节点故障下线,待它消除故障重新上线后,它保存的所有从队列的数据都会丢失。此时运维人员需要选择是否同步主队列的数据到从队列中,如果不同步数据,会增加消息丢失的风险。如果同步数据,同步时队列是阻塞的,无法对其进行操作。当队列中存在大量堆积消息时,同步会导致队列几分钟、几小时或者更长时间不可用。
  • 心跳超时时间 心跳超时时间定义了对等TCP连接在多长时间后被服务端和客户端视为关闭。 在RabbitMQ服务端和客户端分别设置心跳超时时间,服务端和客户端会对配置的心跳超时时间进行协商,客户端必须配置该值来发送心跳。RabbitMQ官方团队维护的3个客户端(Java、.NET、Erlang语言)的心跳超时时间协商逻辑如下: 服务端和客户端设置的心跳超时时间都不为0时,两者间较小的值生效。 服务端和客户端任意一端设置的心跳超时时间为0,另一端不为0时,非0的值生效。 服务端和客户端的心跳超时时间都设置为0时,表示禁用心跳。 更多关于心跳检测的说明,请参考Detecting Dead TCP Connections with Heartbeats and TCP Keepalives。
  • 使用场景 仲裁队列(Quorum Queues)提供队列复制的能力,保障数据的高可用和安全性。使用仲裁队列可以在RabbitMQ节点间进行队列数据的复制,在一个节点宕机时,队列依旧可以正常运行。 仲裁队列适用于队列长时间存在,对队列容错和数据安全要求高,对延迟和队列特性要求相对低的场景。在可能出现消息大量堆积的场景,不推荐使用仲裁队列,因为仲裁队列的写入放大会造成成倍的磁盘占用。 仲裁队列的消息会优先保存在内存中,使用仲裁队列时,建议定义队列最大长度和最大内存占用,在消息堆积超过阈值时从内存转移到磁盘,以免造成内存高水位。 更多关于仲裁队列的说明,请参考Quorum Queues。 分布式消息服务RabbitMQ版3.8.35版本才提供仲裁队列特性。
  • 设置仲裁队列的长度 通过配置Policy或者队列属性的方式可以限制仲裁队列的长度和在内存中保存的长度。 x-max-length:仲裁队列最大消息数。如果超过则丢弃消息,或者发送到死信交换器。 x-max-length-bytes:仲裁队列最大总消息大小(字节数)。如果超过则丢弃消息,或者发送到死信交换器。 x-max-in-memory-length:限制仲裁队列的内存中最大消息数量。 x-max-in-memory-bytes:限制仲裁队列的内存中的最大总消息大小(字节数)。
  • 云审计服务支持的DMS for Kafka操作列表 通过云审计服务,您可以记录与分布式消息服务Kafka版相关的操作事件,便于日后的查询、审计和回溯。 表1 云审计服务支持的DMS for Kafka操作列表 操作名称 资源类型 事件名称 创建DMS实例订单成功 kafka createDMSInstanceOrderSuccess 创建DMS实例任务执行成功 kafka createDMSInstanceTaskSuccess 创建DMS实例订单失败 kafka createDMSInstanceOrderFailure 创建DMS实例任务执行失败 kafka createDMSInstanceTaskFailure 删除创建失败的DMS实例成功 kafka deleteDMSCreateFailureInstancesSuccess 删除创建失败的DMS实例失败 kafka deleteDMSCreateFailureInstancesFailure 删除DMS实例任务执行成功 kafka deleteDMSInstanceTaskSuccess 删除DMS实例任务执行失败 kafka deleteDMSInstanceTaskFailure 批量删除DMS实例任务 kafka batchDeleteDMSInstanceTask 提交批量删除DMS实例请求成功 kafka batchDeleteDMSInstanceSuccess 批量删除DMS实例任务执行成功 kafka batchDeleteDMSInstanceTaskSuccess 提交批量删除DMS实例请求失败 kafka batchDeleteDMSInstanceFailure 批量删除DMS实例任务执行失败 kafka batchDeleteDMSInstanceTaskFailure 提交修改DMS实例订单请求成功 kafka modifyDMSInstanceOrderSuccess 提交修改DMS实例订单请求失败 kafka modifyDMSInstanceOrderFailure 提交扩容实例请求成功 kafka extendDMSInstanceSuccess 扩容DMS实例任务执行成功 kafka extendDMSInstanceTaskSuccess 提交扩容实例请求失败 kafka extendDMSInstanceFailure 扩容DMS实例任务执行失败 kafka extendDMSInstanceTaskFailure 提交重置DMS实例密码请求成功 kafka resetDMSInstancePasswordSuccess 提交重置DMS实例密码请求失败 kafka resetDMSInstancePasswordFailure 提交重启DMS实例请求成功 kafka restartDMSInstanceSuccess 重启DMS实例任务执行成功 kafka restartDMSInstanceTaskSuccess 提交重启DMS实例请求失败 kafka restartDMSInstanceFailure 重启DMS实例任务执行失败 kafka restartDMSInstanceTaskFailure 提交批量重启DMS实例请求成功 kafka batchRestartDMSInstanceSuccess 批量重启DMS实例任务执行成功 kafka batchRestartDMSInstanceTaskSuccess 提交批量重启DMS实例请求失败 kafka batchRestartDMSInstanceFailure 批量重启DMS实例任务执行失败 kafka batchRestartDMSInstanceTaskFailure 提交修改DMS实例信息请求成功 kafka modifyDMSInstanceInfoSuccess 修改DMS实例信息任务执行成功 kafka modifyDMSInstanceInfoTaskSuccess 提交修改DMS实例信息请求失败 kafka modifyDMSInstanceInfoFailure 修改DMS实例信息任务执行失败 kafka modifyDMSInstanceInfoTaskFailure 删除后台任务成功 kafka deleteDMSBackendJobSuccess 删除后台任务失败 kafka deleteDMSBackendJobFailure 开启Smart Connect任务执行成功 kafka createConnectorTaskSuccess 创建Smart Connect任务执行成功 kafka createConnectorSinkTaskSuccess 开启Smart Connect任务执行失败 kafka createConnectorTaskFailure 创建Smart Connect任务执行失败 kafka createConnectorSinkTaskFailure 冻结DMS实例任务执行成功 kafka freezeDMSInstanceTaskSuccess 冻结DMS实例任务执行失败 kafka freezeDMSInstanceTaskFailure 解冻DMS实例任务执行成功 kafka unfreezeDMSInstanceTaskSuccess 解冻DMS实例任务执行失败 kafka unfreezeDMSInstanceTaskFailure Kafka专享实例创建Topic成功 kafka Kafka_create_topicSuccess Kafka专享实例创建Topic失败 kafka Kafka_create_topicFailure Kafka专享实例删除Topic成功 kafka Kafka_delete_topicsSuccess Kafka专享实例删除Topic失败 kafka Kafka_delete_topicsFailure 开启自动创建Topic成功 kafka enable_auto_topicSuccess 开启自动创建Topic失败 kafka enable_auto_topicFailure 重置消费组偏移量成功 kafka Kafka_reset_consumer_offsetSuccess 重置消费组偏移量失败 kafka Kafka_reset_consumer_offsetFailure 创建用户成功 kafka createUserSuccess 创建用户失败 kafka createUserFailure 删除用户成功 kafka deleteUserSuccess 删除用户失败 kafka deleteUserFailure 更新用户策略成功 kafka updateUserPoliciesTaskSuccess 更新用户策略失败 kafka updateUserPoliciesTaskFailure 父主题: 云审计服务支持的关键操作
  • 步骤三:添加DNAT规则 在“公网NAT网关”页面,在新购买的公网NAT网关后,单击“设置规则”,进入公网NAT网关详情页。 在“DNAT规则”页签,单击“添加DNAT规则”,弹出“添加DNAT规则”对话框。 图3 公网NAT网关详情页 设置如下参数。 使用场景:选择“虚拟私有云” 端口类型:选择“具体端口” 支持协议:选择“TCP” 弹性公网IP:选择已购买的弹性公网IP 公网端口:输入“9011” 实例类型:选择“自定义” 私网IP:输入获取Kafka实例的信息中记录的Kafka实例的一个内网连接地址 私网端口:输入“9011” 如果想要了解更多的参数信息,请参考添加DNAT规则。 图4 添加DNAT规则 单击“确定”,完成DNAT规则的添加。 DNAT规则添加成功后,在DNAT规则列表中查看此规则的状态,若“状态”为“运行中”,表示创建成功。 为获取Kafka实例的信息中记录的其他内网连接地址创建DNAT规则,每个DNAT规则需要设置不同的弹性公网IP。 创建DNAT规则的具体步骤参考2~4。 DNAT规则全部创建成功后,在“DNAT规则”页签,查看已创建的DNAT规则,并记录私网IP对应的弹性公网IP。 图5 DNAT规则列表
  • 步骤五:验证接口连通性 参考连接未开启SASL的Kafka实例或者连接已开启SASL的Kafka实例,测试是否可以生产和消费消息。 测试接口连通性时,注意以下几点: 连接Kafka实例的地址为“advertised.listeners IP:9011”,以图6为例,连接Kafka实例的地址为“124.xxx.xxx.167:9011,124.xxx.xxx.174:9011,124.xxx.xxx.57:9011”。 在Kafka实例安全组的入方向规则中放通9011端口。 连接Kafka实例的客户端已开启公网访问功能。
  • 步骤四:修改客户端配置文件 开启SSL双向认证后,需要在客户端的“consumer.properties”和“producer.properties”文件中,分别修改服务端证书配置,并增加客户端证书配置。 security.protocol=SSLssl.truststore.location=/opt/kafka_2.11-2.3.0/config/client.truststore.jksssl.truststore.password=dms@kafkassl.endpoint.identification.algorithm=#增加以下的客户端证书配置ssl.keystore.location=/var/private/ssl/kafka/client.keystore.jksssl.keystore.password=test123ssl.key.password=test123 security.protocol配置证书协议类型,开启SSL双向认证时,必须设置为SSL。 ssl.truststore.location配置为client.truststore.jks证书的存放路径。 ssl.truststore.password为client.truststore.jks的密码。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。 ssl.keystore.location配置为client.keystore.jks证书的存放路径。 ssl.keystore.password配置为client.keystore.jks的密码。 ssl.key.password配置为client.keystore.jks的密码。
  • 操作影响 对数据量大的Topic进行分区平衡,会占用大量的网络和存储带宽,业务可能会出现请求超时或者时延增大,建议在业务低峰期时操作。 带宽限制是指设定Topic进行副本同步的带宽上限,确保不会对该实例上的其他Topic造成流量冲击。但需要注意,带宽限制不会区分是正常的生产消息造成的副本同步还是分区平衡造成的副本同步,如果带宽限制设定过小,可能会影响正常的生产消息,且可能会造成分区平衡一直无法结束。 分区平衡任务启动后,不能删除正在进行分区平衡的Topic,否则会导致分区平衡任务无法结束。 分区平衡任务启动后,无法修改Topic的分区数。 分区平衡任务启动后,无法手动停止任务,需要等到任务完成。 分区平衡后Topic的metadata会改变,如果生产者不支持重试机制,会有少量的请求失败,导致部分消息生产失败。 数据量大的Topic进行分区平衡的时间会比较长,建议根据Topic的消费情况,适当调小Topic老化时间,使得Topic的部分历史数据被及时清理,加快迁移速度。
  • 带宽限制值估算方法 带宽限制值受分区平衡任务执行时间、分区副本Leader/Follower分布情况以及消息生产速率等因素影响,具体分析如下。 带宽限制值作用范围为整个Broker,对该Broker内所有副本同步的分区进行带宽限流。 带宽限制会将分区平衡后新增的副本视为Follower副本限流,分区平衡前的Leader副本视为Leader副本限流,Leader副本的限流和Follower副本限流分开计算。 带宽限制不会区分是正常的生产消息造成的副本同步还是分区平衡造成的副本同步,因此两者的流量都会被限流统计。 假设分区平衡任务需要在200s内完成,每个副本的数据量为100MB,在以下几种场景中,估算带宽限制值。 场景一:Topic1有2分区2副本,Topic2有1分区1副本,所有Leader副本在同一个Broker上,Topic1和Topic2分别需要新增1个副本 表1 分区平衡前Topic分区的副本分布 Topic名称 分区名称 Leader副本所在Broker Follower副本所在Broker Topic1 0 0 0,1 Topic1 1 0 0,2 Topic2 0 0 0 表2 分区平衡后Topic分区的副本分布 Topic名称 分区名称 Leader副本所在Broker Follower副本所在Broker Topic1 0 0 0,1,2 Topic1 1 0 0,1,2 Topic2 0 0 0,2 图3 场景一分区平衡图 如图3所示,有3个副本需要从Broker 0拉取数据,Broker 0中每个副本的数据量为100MB,Broker 0中只有Leader副本,Broker 1和Broker 2中只有Follower副本,由此得出以下数据: Broker 0在200s内完成分区平衡所需的带宽限制值=(100+100+100)/200=1.5MB/s Broker 1在200s内完成分区平衡所需的带宽限制值=100/200=0.5MB/s Broker 2在200s内完成分区平衡所需的带宽限制值=(100+100)/200=1MB/s 综上所述,若想要在200s内完成分区平衡任务,带宽限制值应设为大于等于1.5MB/s。 场景二:Topic1有2分区1副本,Topic2有2分区1副本,Leader副本分布在不同Broker上,Topic1和Topic2分别需要新增1个副本 表3 分区平衡前Topic分区的副本分布 Topic名称 分区名称 Leader副本所在Broker Follower副本所在Broker Topic1 0 0 0 Topic1 1 1 1 Topic2 0 1 1 Topic2 1 2 2 表4 分区平衡后Topic分区的副本分布 Topic名称 分区名称 Leader副本所在Broker Follower副本所在Broker Topic1 0 0 0,2 Topic1 1 1 1,2 Topic2 0 1 1,2 Topic2 1 2 2,0 图4 场景二分区平衡图
  • 步骤三:修改客户端配置文件 替换证书后,需要在客户端的“consumer.properties”和“producer.properties”文件中,分别修改“ssl.truststore.location”和“ssl.truststore.password”参数。 security.protocol=SASL_SSLssl.truststore.location=/opt/kafka_2.11-2.3.0/config/client.truststore.jksssl.truststore.password=dms@kafkassl.endpoint.identification.algorithm= ssl.truststore.location配置为client.truststore.jks证书的存放路径。 ssl.truststore.password为客户端证书的Truststore密码。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
  • DMS自定义策略样例 示例1:授权用户删除实例和重启实例 { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "dms:instance:modifyStatus", "dms:instance:delete" ] } ]} 示例2:拒绝用户删除实例 拒绝策略需要同时配合其他策略使用,否则没有实际作用。用户被授予的策略中,一个授权项的作用如果同时存在Allow和Deny,则遵循Deny优先。 如果您给用户授予DMS FullAccess的系统策略,但不希望用户拥有DMS FullAccess中定义的删除实例权限,您可以创建一条拒绝删除实例的自定义策略,然后同时将DMS FullAccess和拒绝策略授予用户,根据Deny优先原则,则用户可以对DMS for Kafka执行除了删除实例外的所有操作。拒绝策略示例如下: { "Version": "1.1", "Statement": [ { "Effect": "Deny", "Action": [ "dms:instance:delete" ] } ]}
  • 查看消费进度(Kafka客户端) 未开启SASL的Kafka实例,在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令查询消费进度。 ./kafka-consumer-groups.sh --bootstrap-server {broker_ip}:{port} --offsets --describe --all-groups 已开启SASL的Kafka实例,通过以下步骤查询消费进度。 (可选)如果已经设置了SSL证书配置,请跳过此步骤。否则请执行以下操作。 在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,参考3增加SSL证书配置。 在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令查询消费进度。 ./kafka-consumer-groups.sh --bootstrap-server {broker_ip}:{port} --offsets --describe --all-groups --command-config ./config/ssl-user-config.properties
  • 查询消费组列表(Kafka客户端) 未开启SASL的Kafka实例,在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令查询消费组列表。 ./kafka-consumer-groups.sh --bootstrap-server {broker_ip}:{port} --list 已开启SASL的Kafka实例,通过以下步骤查询消费组列表。 (可选)如果已经设置了SSL证书配置,请跳过此步骤。否则请执行以下操作。 在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,参考3增加SSL证书配置。 在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令查询消费组列表。 ./kafka-consumer-groups.sh --bootstrap-server {broker_ip}:{port} --list --command-config ./config/ssl-user-config.properties
  • 查看消费者列表(Kafka客户端) 未开启SASL的Kafka实例,在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令查询消费者列表。 ./kafka-consumer-groups.sh --bootstrap-server {broker_ip}:{port} --group {group_name} --members --describe 已开启SASL的Kafka实例,通过以下步骤查询消费者列表。 (可选)如果已经设置了SSL证书配置,请跳过此步骤。否则请执行以下操作。 在Kafka客户端的“/config”目录中创建“ssl-user-config.properties”文件,参考3增加SSL证书配置。 在“/{命令行工具所在目录}/kafka_{version}/bin/”目录下,通过以下命令查询消费者列表。 ./kafka-consumer-groups.sh --bootstrap-server {broker_ip}:{port} --group {group_name} --members --describe --command-config ./config/ssl-user-config.properties
  • DMS for Kafka请求条件 您可以在创建自定义策略时,通过添加“请求条件”(Condition元素)来控制策略何时生效。请求条件包括条件键和运算符,条件键表示策略语句的 Condition元素,分为全局级条件键和服务级条件键。全局级条件键(前缀为g:)适用于所有操作,服务级条件键(前缀为服务缩写,如dms:)仅适用于对应服务的操作。运算符与条件键一起使用,构成完整的条件判断语句。 DMS for Kafka通过IAM预置了一组条件键,例如,您可以先使用dms:ssl条件键检查Kafka实例是否开启SASL,然后再允许执行操作。下表显示了适用于DMS for Kafka服务特定的条件键。 表1 DMS for Kafka请求条件 DMS for Kafka条件键 运算符 描述 dms:connector Bool IsNullOrEmpty BoolIfExists 是否开启Smart Connect dms:publicIP Bool IsNullOrEmpty BoolIfExists 是否开启公网 dms:ssl Bool IsNullOrEmpty BoolIfExists 是否开启SASL 父主题: 权限管理
  • 操作影响 当流控值达到上限后,会导致生产/消费的时延增大。 设置的流控值较小且生产者速率较大时,可能会造成生产超时、消息丢失,导致部分消息生产失败。 初始生产/消费的流量较大,如果设置一个较小的流控值,会导致生产/消费的时延增大、部分消息生产失败。建议逐次减半设置流控值,待生产/消费稳定后继续减半设置,直到设置为目标流控值。例如初始生产流量100MB/s,可先设置生产流控为50MB/s,待稳定后再修改为25MB/s,直到目标流控值。
  • DMS for Kafka资源 资源是服务中存在的对象。在DMS for Kafka中,资源包括:kafka,您可以在创建自定义策略时,通过指定资源路径来选择特定资源。 表1 DMS for Kafka的指定资源与对应路径 指定资源 资源名称 资源路径 kafka 实例 【格式】 DMS:*:*:kafka:实例ID 【说明】 对于实例资源,IAM自动生成资源路径前缀DMS:*:*:kafka: 通过实例ID指定具体的资源路径,支持通配符*。例如: DMS:*:*:kafka:*表示任意Kafka实例。 父主题: 权限管理
共100000条