MAPREDUCE服务 MRS-Flink Job Pipeline样例程序开发思路:数据规划
时间:2024-12-09 15:37:27
数据规划
- 发布者Job使用自定义算子每秒钟产生10000条数据
- 数据包含两个属性:分别是Int和String类型
- 配置文件
- nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:
nettyconnector.registerserver.topic.storage: /flink/nettyconnector
- nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如:
nettyconnector.sinkserver.port.range: 28444-28943
- nettyconnector.sinkserver.subnet:设置网络所属域,例如:
nettyconnector.sinkserver.subnet: 10.162.0.0/16
- nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如:
- 接口说明
- 注册服务器接口
注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口:
public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */ void start(Configuration configuration) throws Exception; /** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */ void createTopicNode(String topic) throw Exception; /** *将信息注册到某个topic节点(目录)下 * @param topic 需要注册到的目录 * @param registerRecord 需要注册的信息 */ void register(String topic, RegisterRecord registerRecord) throws Exception; /** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception; /** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */ void unregister(String topic, int recordId) throws Exception; /** * 查询信息 * @param 查询信息所在的topic *@recordId 查询信息的ID */ RegisterRecord query(String topic, int recordId) throws Exception; /** * 查询某个Topic是否存在 * @param topic */ Boolean isExist(String topic) throws Exception; /** *关闭注册服务器句柄 */ void shutdown() throws Exception;
工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。
- NettySink算子
Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler, int numberOfSubscribedJobs)
- name:为本NettySink的名称。
- topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。
- registerServerHandler:为注册服务器的句柄。
- numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。
- NettySource算子
Class NettySource(String name, String topic, RegisterServerHandler registerServerHandler)
- name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。
- topic:订阅的NettySink的topic。
- registerServerHandler:为注册服务器的句柄。
- 注册服务器接口
NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。
support.huaweicloud.com/devg-lts-mrs/mrs_07_260024.html
看了此文的人还看了
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格
推荐文章
- 小程序免费开发_免费小程序开发平台_免费开发小程序_免费的小程序平台
- 免费店铺小程序_免费制作小程序_小程序免费开发平台_免费的小程序
- 免费小程序_免费下载小程序_小程序免费开发_零售管理_教育
- GaussDB开发_GaussDB数据库开发_高斯数据库开发_华为云
- MapReduce服务_什么是Flink_如何使用Flink
- 数据治理中心_数据开发_数据开发示例_使用教程-华为云
- GaussDB怎么样_华为云数据库_高斯数据库怎么样-华为云
- 什么是Spark_如何使用Spark_Spark的功能是什么
- GaussDB数据库搭建_GaussDB怎么样_高斯数据库搭建
- 数据治理中心_数据开发_数据开发能力_脚本和节点介绍-华为云