华为云用户手册

  • 构建数据的恢复和容灾能力 预先构建数据的容灾和恢复能力,可以有效避免异常数据处理场景下数据误删、破坏的问题。 建议使用RabbitMQ集群实例,获得异常场景数据快速恢复能力。 在生产环境中建议使用RabbitMQ集群实例,在实例某个broker故障的情况下,不影响RabbitMQ实例持续提供服务。 建议使用多个可用区构建数据容灾能力。 RabbitMQ集群实例支持跨可用区部署,支持跨可用区容灾。如果创建实例时选择了多个可用区,当一个可用区异常时,不影响RabbitMQ实例持续提供服务。
  • RabbitMQ节点重启后消费者自动重连示例代码 以下提供一个简单的Java代码示例,此示例能够解决上面的两种错误,实现消费者的持续消费。 package rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class RabbitConsumer { public static void main(String... args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 配置实例的连接地址和端口 factory.setHost("192.168.0.2"); factory.setPort(5672); // 配置实例连接的用户名和密码 factory.setUsername("name"); factory.setPassword("password"); Connection connection = factory.newConnection(); createNewConnection(connection); } // 重连处理 public static void createNewConnection(Connection connection) { try { Thread.sleep(1000); Channel channel = connection.createChannel(); channel.basicQos(64); channel.basicConsume("queue-01", false, new CustomConsumer(channel, connection)); } catch (Exception e) { // e.printStackTrace(); createNewConnection(connection); } } static class CustomConsumer implements Consumer { private final Channel _channel; private final Connection _connection; public CustomConsumer(Channel channel, Connection connection) { _channel = channel; _connection = connection; } @Override public void handleConsumeOk(String consumerTag) { } @Override public void handleCancelOk(String consumerTag) { } @Override public void handleCancel(String consumerTag) throws IOException { System.out.println("handleCancel"); System.out.println(consumerTag); createNewConnection(_connection); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { System.out.println("handleShutdownSignal"); System.out.println(consumerTag); System.out.println(sig.getReason()); createNewConnection(_connection); } @Override public void handleRecoverOk(String consumerTag) { } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); _channel.basicAck(envelope.getDeliveryTag(), false); } } }
  • 方案概述 RabbitMQ的amqp-client虽然自带重连机制,但是自带的重连机制只会重试一次,重连失败后就不再执行。这时如果消费者没有做额外的重试机制,那么这个消费者就彻底断开与服务端的连接,无法消费消息。 amqp-client在节点断连后,根据与通道建立的节点不同,产生不同的错误。 如果通道连接的是队列所在的节点,消费者就会收到一个shutdown信号。这时amqp-client的重连机制就会生效,尝试重新连接服务端。如果连接成功,这个通道就会继续连接消费。如果连接失败,就会执行channel.close方法,关闭这个通道。 如果通道连接的不是队列所在的节点,消费者不会触发关闭动作,而是由服务端发送的一个取消动作。这个动作对amqp-client来说并不是异常行为,所以日志上不会有明显的报错,但是连接最终还是会关闭。 amqp-client出现上面两种错误时,会分别回调handleShutdownSignal以及handleCancel方法。您可以通过重写这两种方法,在回调时执行重写的重连逻辑,就能在通道关闭后重新为消费者创建新的通道继续消费。
  • 方案概述 在RabbitMQ的业务处理过程中,如果消息重发了多次,消费者端对该重复消息消费多次与消费一次的结果是相同的,多次消费并没有对业务产生负面影响,那么这个消息处理过程是幂等的。消息幂等保证了无论消息被重复投递多少次,最终的处理结果都是一致的,避免了因消息重复而对业务产生影响。 例如在支付场景下,用户购买商品后进行支付,由于网络不稳定导致用户收到多次扣款请求,导致重复扣款。但实际上扣款业务只应进行一次,商家也只应产生一条订单流水。这时候使用消息幂等就可以避免这个问题。 在实际应用中,导致消息重复的原因有网络闪断、客户端故障等,且可能发生在消息生产阶段,也可能发生在消息消费阶段。因此,可以将消息重复的场景分为以下两类: 生产者发送消息时发生消息重复: 生产者发送消息时,消息成功发送至服务端。如果此时发生网络闪断,导致生产者未收到服务端的响应,此时生产者会认为消息发送失败,因此尝试重新发送消息至服务端。当消息重新发送成功后,在服务端中就会存在两条内容相同的消息,最终消费者会消费到两条内容一样的重复消息。 消费者消费消息时发生消息重复: 消费者消费消息时,服务端将消息投递至消费者并完成业务处理。如果此时发生网络闪断,导致服务端未收到消费者的响应,此时服务端会认为消息投递失败。为了保证消息至少被消费一次,服务端会尝试投递之前已被处理过的消息,最终消费者会消费到两条内容一样的重复消息。
  • 实施方法 对于消息重复的场景,一般可以使用全局唯一ID来判断该消息是否已消费过。如果已经消费过,则直接返回处理结果,否则进行消息处理,并将全局ID记录下来。 生产者为每一条消息设置唯一的messageID,示例代码如下: //持久化消息,并且生成随机的全局唯一messageID AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder(); builder.deliveryMode(2); builder.messageId(UUID.randomUUID().toString()); //自定义发送的消息 String message = "message content"; //生产消息,exchangeName和routingKey根据实际填写Queue所属的Exchange名称和Routing Key channel.basicPublish("exchangeName", "routingKey", false, builder.build(), message.getBytes(StandardCharsets.UTF_8)); String messageId = builder.build().getMessageId(); System.out.println("messageID: " + messageId); System.out.println("Send message success!"); //关闭信道 channel.close(); //关闭连接 connection.close(); 消费者根据messageID对消息进行幂等处理,示例代码如下: //创建一个以messageID为主键的数据库表,利用数据库主键去重的方式来处理RabbitMQ幂等。 //在消费者消费前先去数据库查询这条消息是否存在,如果存在表示消息已被消费,无需处理;如果不存在表示消息未被消费,执行消费操作 //queueName根据实际填写要消费的Queue名称 channel.basicConsume("queueName", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取messageID,并判断是否为空 String messageId = properties.getMessageId(); if (StringUtils.isBlank(messageId)){ logger.info("messageId is null"); return; } //查询数据库中是否存在主键为messageID的记录,如果存在,说明这条消息已经被消费,无需处理,否则消费消息,并且在消费完成后将消息记录入库 //数据库查询逻辑省略 //todo //如果数据库中没有messageID的记录,则执行消费,否则提示消息已消费 if (null == "{数据库查出来的结果记录}"){ //获取消息 String message = new String(body,StandardCharsets.UTF_8); //手动响应 channel.basicAck(envelope.getDeliveryTag(),false); logger.info("[x] received message: " + message + "," + "messageId:" + messageId); //存入数据库表中,标识该消息已消费 //数据库插入操作省略 //todo } else { //如果根据messageID查询到消息已消费,则不进行消费 logger.error("该消息已消费,无需重复消费"); } } });
  • 方案概述 Kafka业务过载,一般表现为CPU使用率高、磁盘写满的现象。 当CPU使用率过高时,系统的运行速度会降低,并有加速硬件损坏的风险。 当磁盘写满时,相应磁盘上的Kafka日志目录会出现offline问题。此时,该磁盘上的分区副本不可读写,降低了分区的可用性和容错能力。同时由于Leader分区迁移到其他节点,会增加其他节点的负载。 CPU使用率高的原因 数据操作相关线程数(num.io.threads、num.network.threads、num.replica.fetchers)过多,导致CPU繁忙。 分区设置不合理,所有的生产和消费都集中在某个节点上,导致CPU利用率高。 磁盘写满的原因 业务数据增长较快,已有的磁盘空间不能满足业务数据需要。 节点内磁盘使用率不均衡,生产的消息集中在某个分区,导致分区所在的磁盘写满。 Topic的数据老化时间设置过大,保存了过多的历史数据,容易导致磁盘写满。
  • 前提条件 执行实施步骤前,请确保已完成以下操作: 下载Logstash。 准备一台Windows系统的主机,在主机中安装Java Development Kit 1.8.111或以上版本和Git Bash。 创建Kafka实例和Topic,并获取Kafka实例信息。 Kafka实例未开启公网访问和SASL认证时,获取表2所示信息。 表2 Kafka实例信息(未开启公网访问和SASL认证) 参数名 获取途径 内网连接地址 在Kafka实例详情页的“连接信息”区域,获取“内网连接地址”。 Topic名称 在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。 下文以topic-logstash为例介绍。 Kafka实例未开启公网访问、已开启SASL认证时,获取表3所示信息。 表3 Kafka实例信息(未开启公网访问、已开启SASL认证) 参数名 获取途径 内网连接地址 在Kafka实例详情页的“连接信息”区域,获取“内网连接地址”。 开启的SASL认证机制 在Kafka实例详情页的“连接信息”区域,获取“开启的SASL认证机制”。 启用的安全协议 在Kafka实例详情页的“连接信息”区域,获取“启用的安全协议”。 证书 在Kafka实例详情页的“连接信息”区域,在“SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。 SASL用户名和密码 在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“用户管理”,进入用户列表页面,获取用户名。如果忘记了密码,单击“重置密码”,重新设置密码。 Topic名称 在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。 下文以topic-logstash为例介绍。 Kafka实例已开启公网访问、未开启SASL认证时,获取表4所示信息。 表4 Kafka实例信息(已开启公网访问、未开启SASL认证) 参数名 获取途径 公网连接地址 在Kafka实例详情页的“连接信息”区域,获取“公网连接地址”。 Topic名称 在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。 下文以topic-logstash为例介绍。 Kafka实例已开启公网访问和SASL认证时,获取表5所示信息。 表5 Kafka实例信息(已开启公网访问和SASL认证) 参数名 获取途径 公网连接地址 在实例详情页的“连接信息”区域,获取“公网连接地址” 开启的SASL认证机制 在Kafka实例详情页的“连接信息”区域,获取“开启的SASL认证机制”。 启用的安全协议 在Kafka实例详情页的“连接信息”区域,获取“启用的安全协议”。 证书 在Kafka实例详情页的“连接信息”区域,在“SSL证书”所在行,单击“下载”。下载压缩包后解压,获取压缩包中的客户端证书文件:client.jks。 SASL用户名和密码 在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“用户管理”,进入用户列表页面,获取用户名。如果忘记了密码,单击“重置密码”,重新设置密码。 Topic名称 在Kafka实例控制台,单击实例名称,进入实例详情页。在左侧导航栏单击“Topic管理”,进入Topic列表页面,获取Topic名称。 下文以topic-logstash为例介绍。
  • 方案概述 应用场景 Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到指定的存储中。Kafka是一种高吞吐量的分布式发布订阅消息系统,也是Logstash支持的众多输入输出源之一。本章节主要介绍Logstash如何对接Kafka实例。 方案架构 Kafka实例作为Logstash输入源的示意图如下。 图1 Kafka实例作为Logstash输入源 日志采集客户端将数据发送到Kafka实例中,Logstash根据自身性能从Kafka实例中拉取数据。Kafka实例作为Logstash输入源时,可以防止突发流量对于Logstash的影响,以及解耦日志采集客户端和Logstash,保证系统的稳定性。 Kafka实例作为Logstash输出源的示意图如下。 图2 Kafka实例作为Logstash输出源 Logstash从数据库采集数据,然后发送到Kafka实例中进行存储。Kafka实例作为Logstash输出源时,由于Kafka的高吞吐量,可以存储大量数据。
  • 约束与限制 Logstash从7.5版本开始支持Kafka Integration Plugin插件,Kafka Integration Plugin插件包含Kafka input Plugin和Kafka output Plugin。Kafka input Plugin用于从Kafka实例的Topic中读取数据,Kafka output Plugin把数据写入到Kafka实例的Topic。Logstash、Kafka Integration Plugin与Kafka客户端的版本对应关系如表1所示。请确保Kafka客户端版本大于或等于Kafka实例的版本。 表1 版本对应关系 Logstash版本 Kafka Integration Plugin版本 Kafka客户端版本 8.3~8.8 10.12.0 2.8.1 8.0~8.2 10.9.0~10.10.0 2.5.1 7.12~7.17 10.7.4~10.9.0 2.5.1 7.8~7.11 10.2.0~10.7.1 2.4 7.6~7.7 10.0.1 2.3.0 7.5 10.0.0 2.1.0
  • 方案概述 Kafka将Topic划分为多个分区,消息被分布式存储在分区中。同一个消费组内,一个消费者可同时消费多个分区,但一个分区在同一时刻只能被一个消费者消费。 在消息处理过程中,如果客户端的消费速度跟不上服务端的发送速度,未处理的消息会越来越多,这部分消息就被称为堆积消息。消息没有被及时消费就会产生消息堆积,从而会造成消息消费延迟。 消息堆积原因 导致消息堆积的常见原因如下: 生产者短时间内生产大量消息到Topic,消费者无法及时消费。 消费者的消费能力不足(消费者并发低、消息处理时间长),导致消费效率低于生产效率。 消费者异常(如消费者故障、消费者网络异常等)导致无法消费消息。 Topic分区设置不合理,或新增分区无消费者消费。 Topic频繁重平衡导致消费效率降低。
  • 通过访问控制,保护数据安全性 建议对不同角色的 IAM 用户仅设置最小权限,避免权限过大导致数据泄露或被误操作。 为了更好的进行权限隔离和管理,建议您配置独立的IAM管理员,授予IAM管理员IAM策略的管理权限。IAM管理员可以根据您业务的实际诉求创建不同的用户组,用户组对应不同的数据访问场景,通过将用户添加到用户组并将IAM策略绑定到对应用户组,IAM管理员可以为不同职能部门的员工按照最小权限原则授予不同的数据访问权限,详情请参见权限管理。 建议配置安全组访问控制,保护您的数据不被异常读取和操作。 租户配置安全组的入方向、出方向规则限制,可以控制连接实例的网络范围,避免DMS for Kafka暴露给不可信第三方,详情请参见配置安全组。安全组入方向规则的“源地址”应避免设置为0.0.0.0/0。 建议将访问Kafka实例方式设置为密码访问(即开启SASL),防止未经认证的客户端误操作实例。 开启敏感操作多因子认证保护您的数据不被误删。 DMS for Kafka支持敏感操作保护,开启后执行删除实例等敏感操作时,系统会进行身份验证,进一步对数据的高危操作进行控制,保证DMS for Kafka数据的安全性。详情请参见敏感操作。
  • 审计是否存在异常数据访问 开启 云审计 服务,记录Kafka的所有访问操作,便于事后审查。 云审计服务(Cloud Trace Service, CTS ),是华为 云安全 解决方案中专业的日志审计服务,提供对各种云资源操作记录的收集、存储和查询功能,可用于支撑安全分析、合规审计、资源跟踪和问题定位等常见应用场景。 您开通云审计服务并创建和配置追踪器后,CTS可记录Kafka的管理事件和数据事件用于审计。详情请参见查看Kafka审计日志。 使用 云监控服务 对Kafka进行实时监控和告警。 为使您更好地掌握Kafka实例状态,华为云提供了 云监控 服务(Cloud Eye)。您可使用该服务监控自己的Kafka实例,执行自动实时监控、告警和通知操作,帮助您实时掌握Kafka实例中所产生的请求、流量等信息。 云监控服务不需要开通,会在您创建Kafka实例后自动启动。相关文档请参见Kafka支持的监控指标。
  • 构建数据的恢复和容灾能力 预先构建数据的容灾和恢复能力,可以有效避免异常数据处理场景下数据误删、破坏的问题。 建议Topic配置多副本,获得异常场景数据快速恢复能力。 Kafka实例支持配置Topic副本数量,配置多副本后Kafka实例会主动建立和维护同步复制,在实例某个broker故障的情况下,实例会自动将该节点上分区leader切换到其它可用的broker上,从而达到高可用的目的。 建议使用多个可用区构建数据容灾能力。 Kafka集群实例支持跨可用区部署,支持跨可用区容灾。如果创建实例时选择了多个可用区,当一个可用区异常时,不影响Kafka实例持续提供服务。
  • 消息生产与消费的幂等传递 分布式消息服务Kafka版设计了一系列可靠性保障措施,确保消息不丢失。例如使用消息同步存储机制防止系统与服务器层面的异常重启或者掉电,使用消息确认(ACK)机制解决消息传输过程中遇到的异常。 考虑到网络异常等极端情况,用户除了做好消息生产与消费的确认,还需要配合分布式消息服务Kafka版完成消息发送与消费的重复传输设计。 当无法确认消息是否已发送成功,生产者需要将消息重复发送给分布式消息服务Kafka版。 当重复收到已处理过的消息,消费者需要告诉分布式消息服务Kafka版消费成功且保证不重复处理。
  • 重视消息生产与消费的确认过程 消息生产 发送消息后,生产者需要根据分布式消息服务Kafka版的返回信息确认消息是否发送成功,如果返回失败需要重新发送。 生产消息时,生产者通过同步等待发送结果或异步回调函数,判断消息是否发送成功。在消息传递过程中,如果发生异常,生产者没有接收到发送成功的信号,生产者自己决策是否需要重复发送消息。如果接收到发送成功的信号,则表明该消息已经被分布式消息服务Kafka版可靠存储。 消息消费 消息消费时,消费者需要确认消息是否已被成功消费。 生产的消息被依次存储在分布式消息服务Kafka版的存储介质中。消费时依次获取分布式消息服务Kafka版中存储的消息。消费者获取消息后,进行消费并记录消费成功或失败的状态,并将消费状态提交到分布式消息服务Kafka版。 在消费过程中,如果出现异常,没有提交消费确认,该批消息会在后续的消费请求中再次被获取。
  • 代码示例运行结果 [2018-01-25 22:40:51,841] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:40:51,841] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:40:52,122] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69) [2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:40:52,169] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:40:52,216] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:40:52,325] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:40:52,325] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:40:54,947] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87) [2018-01-25 22:40:54,979] INFO Thread3 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87) [2018-01-25 22:41:32,347] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:41:42,353] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:41:47,816] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69) [2018-01-25 22:41:47,847] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:47,925] INFO Thread 3 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:41:47,925] INFO Thread1 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87) [2018-01-25 22:41:47,925] INFO Thread3 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:41:47,957] INFO Thread2 Have a nice sleep! (com.huawei.dms.kafka.DmsKafkaProduceDemo:87) [2018-01-25 22:41:48,472] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69) [2018-01-25 22:41:48,503] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,518] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,550] INFO Thread2 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,597] INFO Thread1 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,659] INFO Thread 2 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:41:48,659] INFO Thread2 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128) [2018-01-25 22:41:48,675] INFO Thread3 recievedrecordshello, dms kafka. (com.huawei.dms.kafka.DmsKafkaProduceDemo:32) [2018-01-25 22:41:48,675] INFO Everyone WakeUp and Work! (com.huawei.dms.kafka.DmsKafkaProduceDemo:69) [2018-01-25 22:41:48,706] INFO Thread 1 Polling! (com.huawei.dms.kafka.DmsKafkaProduceDemo:119) [2018-01-25 22:41:48,706] INFO Thread1 KEEP Poll records! (com.huawei.dms.kafka.DmsKafkaProduceDemo:128)
  • 方案概述 应用场景 在分布式消息服务Kafka版提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。 如图1所示,Topic含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当Topic中消息较少或者没有消息时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。 图1 Kafka消费者多线程消费模式 解决方案 在开了多个线程同时访问的情况下,如果Topic里已经没有消息了,其实不需要所有的线程都在poll,只需要有一个线程poll各分区的消息就足够了,当在polling的线程发现Topic中有消息,可以唤醒其他线程一起消费消息,以达到快速响应的目的。如图2所示。 这种方案适用于对消费消息的实时性要求不高的应用场景。如果要求准实时消费消息,则建议保持所有消费者处于活跃状态。 图2 优化后的多线程消费方案 消费者(Consumer)和消息分区(Partition)并不强制数量相等,Kafka的poll(long)方法帮助实现获取消息、分区平衡、消费者与Kafka broker节点间的心跳检测等功能。 因此在对消费消息的实时性要求不高场景下,当消息数量不多的时候,可以选择让一部分消费者处于wait状态。
  • 实施步骤 业务数据不均衡的处理措施: 优化业务中Topic的设计,对于数据量特别大的Topic,可对业务数据做进一步的细分,并分配到不同的Topic上。 生产者生产消息时,尽量把消息均衡发送到不同的分区上,确保分区间的数据均衡。 创建Topic时,使分区的Leader副本分散到各个Broker节点中,以保障整体的数据均衡。 Kafka提供了分区重平衡的功能,可以把分区的副本重新分配到不同的Broker节点上,解决节点间负载不均衡的问题。具体分区重平衡的操作请参考修改分区平衡。
  • 方案概述 Kafka将Topic划分为多个分区,所有消息分布式存储在各个分区上。每个分区有一个或多个副本,分布在不同的Broker节点上,每个副本存储一份全量数据,副本之间的消息数据保持同步。Kafka的Topic、分区、副本和代理的关系如下图所示: 在实际业务过程中可能会遇到各节点间或分区之间业务数据不均衡的情况,业务数据不均衡会降低Kafka集群的性能,降低资源使用率。 业务数据不均衡原因 业务中部分Topic的流量远大于其他Topic,会导致节点间的数据不均衡。 生产者发送消息时指定了分区,未指定的分区没有消息,会导致分区间的数据不均衡。 生产者发送消息时指定了消息Key,按照对应的Key发送消息至对应的分区,会导致分区间的数据不均衡。 系统重新实现了分区分配策略,但策略逻辑有问题,会导致分区间的数据不均衡。 Kafka扩容了Broker节点,新增的节点没有分配分区,会导致节点间的数据不均衡。 业务使用过程中随着集群状态的变化,多少会发生一些Leader副本的切换或迁移,会导致个别Broker节点上的数据更多,从而导致节点间的数据不均衡。
  • 使用SDK设置视频封面 视频点播 提供的服务端SDK,对API接口进行了封装,您可以在 SDK中心 下载对应SDK,然后进行集成开发。 视频上传时设置封面:服务端SDK提供了本地上传、OBS转存、URL拉取三种媒资上传方法,可以在对应的方法中上传本地图片设置封面,或者截图设置封面。 视频更新时设置封面:服务端SDK支持在视频上传完成后,可以调用视频更新方法时上传本地图片,更新视频封面。 视频处理时设置封面:服务端SDK支持在视频上传完成后,可以调用音视频方法时设置截图参数,选择某张截图作为视频封面。
  • 场景说明 随着视频点播存储视频文件量的增加,设置视频封面不仅能提升展示的美观性,还能方便通过封面查找相关视频。同时,上传的视频封面也将会生成对应的封面地址,实现加速分发,可以直接将封面与视频文件一同引用到网页中。 上传视频时,点播服务会默认截取视频的第一帧作为封面图片。您也可以通过上传图片或截图封面来更新视频的封面。 上传封面:适用于需要通过封面表达视频的大概内容及重点的场景。在上传前,您需要提前线下设计一张JPG或PNG格式的封面图片。 截图封面:适用于希望将视频中的某个瞬间画面来设置成封面的场景。无需要额外准备工作,您可以直接使用视频点播的截图功能生成。 您可以通过以下方式自定义视频封面: 控制台设置视频封面 调用API设置视频封面 使用SDK设置视频封面
  • 调用API设置视频封面 视频点播API支持通过媒资上传、视频更新、视频处理三种方式设置视频的封面图片。 视频上传时设置封面 视频点播提供了上传、OBS转存、URL拉取三种创建媒资的方式,其中上传方式创建媒资支持上传或截图封面,其它二种方式仅支持截图封面。具体如下所示: 上传方式创建媒资 上传封面:调用创建媒资:上传方式接口,在请求参数中设置“cover_type”,即上传封面的图片类型,然后在请求的返回参数中获取“cover_upload_url”,通过“cover_upload_url”上传封面图片即可。 截图封面:调用创建媒资:上传方式接口,在请求参数中设置“thumbnail”,设置截图类型,指定某张截图作为封面。 OBS转存和URL拉取方式创建媒资 分别调用创建媒资:OBS转存方式口或创建媒资:URL拉取注入接口,在请求参数中设置“thumbnail”,设置截图类型,指定某张截图作为封面。 视频更新时设置封面 调用视频更新接口,在请求参数中设置“cover_type”,即上传封面的图片类型,然后在请求的返回参数中获取“cover_upload_url”,通过“cover_upload_url”上传封面图片即可。 视频处理时设置封面 调用视频处理接口生成截图,然后指定某张截图作为封面。若您需要在已生成的截图中更换封面,则可以先调用查询媒资详细信息接口,获取该视频的截图URL,然后调用设置封面接口修改截图封面。
  • 本地上传 支持批量上传音视频文件,便于快速将媒资上传到点播服务中,使用浏览器登录控制台即可进行上传。 华为云点播服务提供的本地上传功能有如下限制: 控制台由于安全策略,长时间上传可能会由于登录失效导致大文件上传失败,在上传大量文件时,需要操作控制台,从而保证控制台不自动退出登录。 支持上传的格式如下所示: 视频文件格式:MP4,TS,MOV,MXF,MPG,FLV,WMV,AVI,M4V,F4V,MPEG,3GP,ASF,MKV,WEBM,M3U8。其中,M3U8仅支持URL拉取方式上传。 音频文件格式:MP3,OGG,WAV,WMA,APE,FLAC,AAC,AC3,MMF,AMR,M4A,M4R,WV,MP2。
  • OBS转存 若在开通点播服务前,已在华为云OBS桶中存储了大量的音视频文件,希望使用视频点播的转码、截图等功能对这些音视频进行处理。您可以使用该功能将OBS桶中的音视频文件复制转存到点播服务中,然后使用点播服务的相关功能。 华为云点播服务提供的OBS转存功能有如下限制: 不支持跨区域转存,如“华北-北京四”OBS桶中的音视频只能转存到“华北-北京四”点播服务中。 转存是指将OBS桶中的音视频文件复制一份到点播服务中,因此,若OBS桶中的音视频不删除,则OBS和点播服务中都将会有相关的存储费用产生。 支持转存的格式如下所示: 视频文件格式:MP4,TS,MOV,MXF,MPG,FLV,WMV,AVI,M4V,F4V,MPEG,3GP,ASF,MKV,WEBM,M3U8。其中,M3U8仅支持URL拉取方式上传。 音频文件格式:MP3,OGG,WAV,WMA,APE,FLAC,AAC,AC3,MMF,AMR,M4A,M4R,WV,MP2。 OBS转存方式暂只支持调用API实现,您可以调用创建媒资:OBS转存接口实现该功能。
  • Windows下安装JDK 官网下载JDK文件。以JDK8为例,单击JDK下的下载按钮进行下载。 下载完成后按照提示安装,安装位置可自选,比如安装到本地“C:\Program Files\Java\jdk1.8.0_131”。 安装完成后,配置Java环境变量。 右击"计算机",单击"属性",选择"高级系统设置"; 选择"高级"选项卡,单击"环境变量"; 在"系统变量"中设置3个变量:JAVA_HOME、PATH、CLASSPATH(大小写均可),变量值如表1所示。 若此三项属性已存在则单击"编辑",不存在则单击"新建"。 表1 JAVA环境变量 变量名 变量值 变量说明 JAVA_HOME JDK安装的实际路径 例如:“C:\Program Files (x86)\Java\jdk1.8.0_1311” PATH %JAVA_HOME%\bin;%JAVA_HOME%\jre\bin 在原PATH值后添加 CLASSPATH .;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar; 注意前面有个"." 打开命令行窗口,输入“java -version”。显示Java版本信息即表示配置成功。 以JDK 8为例,成功示例图如下:
  • Linux下安装JDK 根据系统情况下载JDK安装包,建议下载JDK1.8。 您需要根据自己的Linux系统版本下载对应的JDK1.8版本,下载前请先勾选“Accept License Agreement”。 图1 下载Linux版JDK 解压安装包到JDK目录下。 tar -xvf jdk-8u191-linux-x64.tar.gz -C /home/vod/jdk/ 配置环境变量。 执行vi /etc/profile命令进入profile文件。 在profile文件末尾加入如下内容。 #set java environment export JAVA_HOME=/home/vod/jdk/jdk1.8.0_191 export JRE_HOME=/home/vod/jdk/jdk1.8.0_191/jre export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JRE_HOME/lib/tools.jar export PATH=$JAVA_HOME/bin:$PATH 执行:wq命令保存profile文件并退出。 执行java -version验证JDK是否安装成功。 回显以下JDK版本信息则表示安装成功。 [root@ecs-c525-web ~]# java -version java version "1.8.0_191" Java(TM) SE Runtime Environment (build 1.8.0_191-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)
  • 功能介绍 开通视频点播服务后,您可以使用如下方式将音视频上传至点播服务,从而进行相关管理操作。 本地上传:支持将存储在本地磁盘的音视频文件上传到视频点播中。 URL拉取:支持基于音视频源文件URL,离线拉取上传到点播系统。 音视频托管:支持将OBS桶中的音视频文件托管给视频点播,从而通过点播处理音视频,并支持将处理后产生的媒资文件存储在OBS桶中。 您可以参考如下步骤进行视频上传,也可以参考视频指导来操作。
  • 使用限制 支持上传的音视频文件格式如下所示: 视频文件格式:MP4,TS,MOV,MXF,MPG,FLV,WMV,AVI,M4V,F4V,MPEG,3GP,ASF,MKV,WEBM,M3U8。其中,M3U8仅支持URL拉取方式上传。 音频文件格式:MP3,OGG,WAV,WMA,APE,FLAC,AAC,AC3,MMF,AMR,M4A,M4R,WV,MP2。 若上传的文件名有空格,上传后将会去除命名中的空格。
  • 流量命中率 选择需要查看的时间、 域名 及时间粒度,即可查看到指定时间跨度内的流量命中率统计详情。如图5所示。 流量命中率=命中缓存产生的流量/请求总流量,请求总流量为命中缓存产生的流量和请求回源产生的流量之和。 您可以单击“下载”,将统计详情导出到本地。 统计表中呈现的是所选域名在查询时间段内的流量命中率统计信息,将鼠标指针停留在趋势图上,滚动鼠标滚轮可针对某时间跨度范围内的趋势图时间横轴进行拉大或缩小整体占比。 图5 流量命中率统计信息
  • VOD FullAccess策略内容 { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "vod:*:*" ] } ] }
共100000条