云服务器内容精选

  • 重视消息生产与消费的确认过程 消息生产 生产消息后,生产者需要根据ROMA Connect的返回信息确认消息是否发送成功,如果返回失败需要重新发送。 每次生产消息,生产者都需要等待消息发送API的应答信号,以确认消息是否成功发送。在消息传递过程中,如果发生异常,生产者没有接收到发送成功的信号,生产者自己决策是否需要重复发送消息。如果接收到发送成功的信号,则表明该消息已经被ROMA Connect可靠存储。 消息消费 消息消费时,消费者需要确认消息是否已被成功消费。 生产的消息被依次存储在ROMA Connect的存储介质中。消费时依次获取ROMA Connect中存储的消息。消费者获取消息后,进行消费并记录消费成功或失败的状态,并将消费状态提交到ROMA Connect,由ROMA Connect决定消费下一批消息或回滚重新消费消息。 在消费过程中,如果出现异常,没有提交消费确认,该批消息会在后续的消费请求中再次被获取。
  • 消息生产与消费的幂等传递 ROMA Connect设计了一系列可靠性保障措施,确保消息不丢失。例如使用消息同步存储机制防止系统与服务器层面的异常重启或者掉电,使用消息确认(ACK)机制解决消息传输过程中遇到的异常。 考虑到网络异常等极端情况,用户除了做好消息生产与消费的确认,还需要配合ROMA Connect完成消息发送与消费的重复传输设计。 当无法确认消息是否已发送成功,生产者需要将消息重复发送给ROMA Connect。 当重复收到已处理过的消息,消费者需要告诉ROMA Connect消费成功且保证不重复处理。
  • 消息可以批量生产和消费 为提高消息发送和消息消费效率,推荐使用批量消息发送和消费。通常,默认消息消费为批量消费,而消息发送尽可能采用批量发送,可以有效减少API调用次数。 如下面两张示意图对比所示,消息批量生产与消费,可以减少API调用次数,节约资源。 图1 消息批量生产与消费 批量发送消息时,单次不能超过10条消息,总大小不能超过512KB。 批量生产(发送)消息可以灵活使用,在消息并发多的时候,批量发送,并发少时,单条发送。这样能够在减少调用次数的同时保证消息发送的实时性。 图2 消息逐条生产与消费 此外,批量消费消息时,消费者应按照接收的顺序对消息进行处理、确认,当对某一条消息处理失败时,不再需要继续处理本批消息中的后续消息,直接对已正确处理的消息进行确认即可。
  • 修改配置信息 为了方便,下文分生产与消费两个配置文件介绍。如果ROMA Connect实例开启了SASL认证,在Java客户端的配置文件中必须配置涉及SASL认证的相关信息,否则无法连接。如果没有使用SASL认证,请注释掉相关配置。 生产消息配置文件(对应生产消息代码中的mqs.sdk.producer.properties文件) 以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。 ####################### #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094 bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #发送确认参数 acks=all #键的序列化方式 key.serializer=org.apache.kafka.common.serialization.StringSerializer #值的序列化方式 value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer可以用来缓存数据的内存大小 buffer.memory=33554432 #重试次数 retries=0 ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置用户名和密码 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码,固定,请勿修改。配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm= 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap.servers:MQS连接地址和端口。 username和password:开启SASL_SSL认证时所使用的用户名和密码。 ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。 消费消息配置文件(对应消费消息代码中的mqs.sdk.consumer.properties文件) 以下粗体部分为不同MQS特有的信息,必须修改。客户端其他参数,可以自主添加。 #Topic名称在具体的生产与消费代码中。 ####################### #举例:bootstrap.servers=192.168.0.196:9095,192.168.0.196:9096,192.168.0.196:9094 bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #用来唯一标识consumer进程所在组的字符串,请您自行设定。 #如果设置同样的group id,表示这些processes都是属于同一个consumer group group.id=1 #键的序列化方式 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #值的序列化方式 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #偏移量的方式 auto.offset.reset=earliest ####################### #如果不使用SASL认证,以下参数请注释掉。 ####################### #设置jaas账号和密码,通过控制台设置 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL鉴权方式 sasl.mechanism=PLAIN #加密协议,目前支持SASL_SSL协议 security.protocol=SASL_SSL #ssl truststore文件的位置 ssl.truststore.location=E:\\temp\\client.truststore.jks #ssl truststore文件的密码,配置此密码是为了访问Java生成的jks文件。 ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm= 示例代码中的参数说明,可参考获取MQS连接信息获取参数值。 bootstrap.servers:MQS连接地址和端口。 group.id:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。 username和password:开启SASL_SSL认证时所使用的用户名和密码。 ssl.truststore.location:开启SASL_SSL认证时所使用的客户端证书。