MapReduce服务 MRS-Flink Job Pipeline样例程序开发思路:数据规划

时间:2025-01-26 10:41:38

数据规划

  1. 发布者Job使用自定义算子每秒钟产生10000条数据
  2. 数据包含两个属性:分别是Int和String类型
  3. 配置文件
    • nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:
      nettyconnector.registerserver.topic.storage: /flink/nettyconnector
    • nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:
      nettyconnector.sinkserver.port.range: 28444-28943
    • nettyconnector.sinkserver.subnet:设置网络所属域,例如:
      nettyconnector.sinkserver.subnet: 10.162.0.0/16
  4. 接口说明
    • 注册服务器接口

      注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:

      public interface RegisterServerHandler { /**   * 启动注册服务器    * @param configuration Flink的Configuration类型   */void start(Configuration configuration) throws Exception;/**        *注册服务器上创建Topic节点(目录)        * @param topic topic节点名称        */void createTopicNode(String topic) throw Exception;/***将信息注册到某个topic节点(目录)下* @param topic 需要注册到的目录* @param registerRecord 需要注册的信息*/void register(String topic, RegisterRecord registerRecord) throws Exception;/**       *删除topic节点       * @param topic 待删除topic       */    void deleteTopicNode(String topic) throws Exception;/**   *注销注册信息   *@param topic 注册信息所在的topic   *@param recordId 待注销注册信息ID   */void unregister(String topic, int recordId) throws Exception;/**    * 查询信息* @param 查询信息所在的topic*@recordId 查询信息的ID*/RegisterRecord query(String topic, int recordId) throws Exception;/**    * 查询某个Topic是否存在    * @param topic    */Boolean isExist(String topic) throws Exception;/**    *关闭注册服务器句柄   */void shutdown() throws Exception;

      工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。

    • NettySink算子
      Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler,int numberOfSubscribedJobs)
      • name:为本NettySink的名称。
      • topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
      • registerServerHandler:为注册服务器的句柄。
      • numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
    • NettySource算子
      Class  NettySource(String name,String topic,RegisterServerHandler registerServerHandler)
      • name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
      • topic:订阅的NettySink的topic。
      • registerServerHandler:为注册服务器的句柄。

NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。

support.huaweicloud.com/devg-lts-mrs/mrs_07_260024.html