云服务器内容精选

  • 消息清理机制 无论消息是否被消费,RocketMQ的消息默认保留时间为48小时,最长保留时间为720小时,修改保留时间的操作请参见修改RocketMQ消息保留时间。RocketMQ消息存储在CommitLog文件中,CommitLog文件大小为1GB,当一个CommitLog文件写满后,会生成一个新的CommitLog文件。RocketMQ删除消息是删除CommitLog文件,而不是删除一条消息。CommitLog文件为顺序写入,当最后写入的一条消息过期时,表示CommitLog文件过期。满足如下任意一个条件,CommitLog文件将会被清理: 每天凌晨4点会清理过期的文件,部分老实例由于未设置时区,清理时间为每天中午12点。 当磁盘使用率达到70%(针对4.8.0版本)或75%(针对5.x版本)时,会立刻清理过期的文件。 当磁盘使用率达到85%时,会从最早创建的文件开始清理,不管文件是否已过期,直到磁盘空间充足。
  • 前提条件 已创建RocketMQ实例和Topic。 如果通过按Message ID查询,需要提前获取消息所在的Topic名称和消息的Message ID。 Message ID为生产消息后返回的MsgId,如6中返回的内容,也可先通过Topic查询消息,记录Message ID。 如果通过按Message Key查询,需要提前获取消息所在的Topic名称和消息的Message Key。 Message Key为7中配置的消息Key,也可先通过Topic查询消息,记录Message Key。
  • 前提条件 已购买RocketMQ实例。 准备一台Linux系统的主机,在主机中安装Java Development Kit 1.8.111或以上版本,并完成环境变量配置,具体操作可参见快速连接RocketMQ并生产消费消息。 准备网络环境。 RocketMQ实例分内网地址以及公网地址两种网络连接方式。如果使用公网地址,则消息生产与消费客户端需要有公网访问权限,并配置如下安全组。 表1 安全组规则(RocketMQ实例4.8.0版本) 方向 协议 端口 源地址 说明 入方向 TCP 8200 RocketMQ客户端所在的IP地址或地址段。 使用TCP协议,通过公网访问元数据节点的端口。 入方向 TCP 10101-10199 使用TCP协议,通过公网访问业务节点的端口。 表2 安全组规则(RocketMQ实例5.x版本) 方向 协议 端口 源地址 说明 入方向 TCP 8200 RocketMQ客户端所在的IP地址或地址段。 使用TCP协议,通过公网访问实例的端口。 入方向 TCP 8081 使用gRPC协议,通过公网访问实例的端口。 入方向 TCP 10101 使用TCP协议,通过公网访问业务节点的端口。
  • 开启RocketMQ消息轨迹(Go) 在客户端开启消息轨迹的方法如下: 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 [root@ecs-test sarama]# go version go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.0 ) 生产者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { namesrvs := []string{"192.168.0.1:8100"} traceCfg := &primitive.TraceConfig{ Access: primitive.Local, Resolver: primitive.NewPassthroughResolver(namesrvs), } p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), producer.WithTrace(traceCfg)) // 增加此行代码表示开启了消息轨迹。 err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } res, err := p.SendSync(context.Background(), primitive.NewMessage("topic1", []byte("Hello RocketMQ Go Client!"))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } time.Sleep(10 * time.Second) err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 消费者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { namesrvs := []string{"192.168.0.1:8100"} traceCfg := &primitive.TraceConfig{ Access: primitive.Local, Resolver: primitive.NewPassthroughResolver(namesrvs), } c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithTrace(traceCfg), // 增加此行代码表示开启了消息轨迹。 ) err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } }
  • 开启RocketMQ消息轨迹(Java) 在客户端开启消息轨迹的方法如下: 生产者开启消息轨迹(除事务消息以外的消息类型) 构造函数的“enableMsgTrace”参数传入“true”,例如: DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true); 生产者开启消息轨迹(事务消息) 构造函数的“enableMsgTrace”参数传入“true”,例如: TransactionMQProducer producer = new TransactionMQProducer(null, "ProducerGroupName", null, true, null); 消费者开启消息轨迹 构造函数的“enableMsgTrace”参数传入“true”,例如: DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName", true);
  • DMS for RocketMQ资源 资源是服务中存在的对象。在DMS for RocketMQ中,资源包括:reliability,您可以在创建自定义策略时,通过指定资源路径来选择特定资源。 表1 DMS for RocketMQ的指定资源与对应路径 指定资源 资源名称 资源路径 rocketmq 实例 【格式】 DMS:*:*:rocketmq:实例ID 【说明】 对于实例资源, IAM 自动生成资源路径前缀DMS:*:*:rocketmq: 通过实例ID指定具体的资源路径,支持通配符*。例如: DMS:*:*:rocketmq:*表示任意RocketMQ实例。
  • DMS for RocketMQ请求条件 您可以在创建自定义策略时,通过添加“请求条件”(Condition元素)来控制策略何时生效。请求条件包括条件键和运算符,条件键表示策略语句的Condition元素,分为全局级条件键和服务级条件键。全局级条件键(前缀为g:)适用于所有操作,服务级条件键(前缀为服务缩写,如dms:)仅适用于对应服务的操作。运算符与条件键一起使用,构成完整的条件判断语句。 DMS for RocketMQ通过IAM预置了一组条件键,例如,您可以先使用dms:ssl条件键检查RocketMQ实例是否开启SSL,然后再允许执行操作。下表显示了适用于DMS for RocketMQ服务特定的条件键。 表2 DMS for RocketMQ请求条件 DMS for RocketMQ条件键 运算符 描述 dms:publicIP Bool Null 是否开启公网 dms:ssl Bool Null 是否开启SSL
  • DMS for RocketMQ自定义策略样例 如果系统预置的DMS for RocketMQ权限,不满足您的授权要求,可以创建自定义策略。自定义策略中可以添加的授权项(Action)请参考细粒度策略支持的授权项。 目前华为云支持以下两种方式创建自定义策略: 可视化视图创建自定义策略:无需了解策略语法,按可视化视图导航栏选择云服务、操作、资源、条件等策略内容,可自动生成策略。 JSON视图创建自定义策略:可以在选择策略模板后,根据具体需求编辑策略内容;也可以直接在编辑框内编写JSON格式的策略内容。 具体创建步骤请参见:创建自定义策略。本章为您介绍常用的DMS for RocketMQ自定义策略样例。 示例1:授权用户删除实例和重启实例 { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "dms:instance:modifyStatus", "dms:instance:delete" ] } ] } 示例2:拒绝用户删除实例 拒绝策略需要同时配合其他策略使用,否则没有实际作用。用户被授予的策略中,一个授权项的作用如果同时存在Allow和Deny,则遵循Deny优先原则。 如果您给用户授予DMS FullAccess的系统策略,但不希望用户拥有DMS FullAccess中定义的删除实例权限,您可以创建一条拒绝删除实例的自定义策略,然后同时将DMS FullAccess和拒绝策略授予用户,根据Deny优先原则,则用户可以对DMS for RocketMQ执行除了删除实例外的所有操作。拒绝策略示例如下: { "Version": "1.1", "Statement": [ { "Effect": "Deny", "Action": [ "dms:instance:delete" ] } ] }
  • 前提条件 给用户组授权之前,请您了解用户组可以添加的DMS for RocketMQ权限,并结合实际需求进行选择,DMS for RocketMQ支持的系统权限,请参见:DMS for RocketMQ系统策略。若您需要对除DMS for RocketMQ之外的其它服务授权,IAM支持服务的所有策略请参见系统权限。 DMS for RocketMQ的权限与策略基于分布式消息服务DMS,因此在IAM服务中为DMS for RocketMQ分配用户与权限时,请选择并使用“DMS”的权限与策略。
  • 示例流程 图1 给用户授权DMS for RocketMQ权限流程 创建用户组并授权 在IAM控制台创建用户组,并授予DMS for RocketMQ的管理员权限“DMS ReadOnlyAccess”。 创建用户并加入用户组 在IAM控制台创建用户,并将其加入1中创建的用户组。 用户登录并验证权限 新创建的用户登录控制台,切换至授权区域,验证权限: 在“服务列表”中选择分布式消息服务RocketMQ版,进入RocketMQ实例主界面,单击右上角“购买RocketMQ实例”,尝试购买RocketMQ实例,如果无法购买RocketMQ实例(假设当前权限仅包含DMS ReadOnlyAccess),表示“DMS ReadOnlyAccess”已生效。 在“服务列表”中选择云硬盘(假设当前策略仅包含DMS ReadOnlyAccess),若提示权限不足,表示“DMS ReadOnlyAccess”已生效。
  • 前提条件 已创建RocketMQ实例和消费组。 如果通过按Message ID查询,需要提前获取消息所在的消费组名称和消息的Message ID。 Message ID为生产消息后返回的MsgId,如6中返回的内容,也可先通过Topic查询消息,记录Message ID。 如果通过按Message Key查询,需要提前获取消息所在的消费组名称和消息的Message Key。 Message Key为7中配置的消息Key,也可先通过Topic查询消息,记录Message Key。
  • 避免ClientId相同 客户端的ClientId默认为进程号、本机IP、instanceName进行组合。如果同一个进程内,一个消费组下启动两个消费者,则会导致两者ClientId相同,从而出现有的队列重复消费、有的队列无法消费的情况。 配置建议 由于ClientId生成时会拼接消费者的clientIP属性,同一 IP下不同消费者的clientIP相同会导致ClientId相同,所以建议添加如下代码手动设置instanceName。 producer.setInstanceName(String.valueOf(System.nanoTime()))
  • RocketMQ 4.8.0版本实例仅配置单组Broker有什么影响? 当使用RocketMQ 4.8.0版本实例,且只创建了一组Broker,或者Topic仅关联了一组Broker时,如果Broker故障则会触发主备倒换,且恢复时间较长,为30秒左右。如果Topic关联了多组Broker,Broker故障时,生产流量可自动切换到另一组Broker,可用性更高。 建议使用如下方法处理RocketMQ 4.8.0版本实例仅配置单组Broker的问题: 扩容到两组或更多组Broker。 迁移到5.x版本实例,5.x实例各种规格均有多组Broker。 配置多组Broker后,可添加如下代码开启生产者异常检测机制(仅Java客户端5.1.4及以上版本才支持该功能),缩短故障切换时间。 producer.setSendLatencyFaultEnable(true) 父主题: 实例问题
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 在“go.mod”中增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-clients/golang/v5 )
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。