云服务器内容精选

  • 代码样例 下面代码片段在com.huawei.storm.example.common包的RandomSentenceSpout类的nextTuple方法中,作用在于将收到的字符串拆分成单词。 /** * {@inheritDoc} */ @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[] {"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; String sentence = sentences[random.nextInt(sentences.length)]; collector.emit(new Values(sentence)); }
  • 代码样例 下面代码片段在com.huawei.storm.example.wordcount包的“WordCountTopology”类的“main”方法中,作用在于构建应用程序并提交。 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.CMD); } private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception { switch (type) { case CMD: { cmdSubmit(builder, null); break; } case REMOTE: { remoteSubmit(builder); break; } case LOCAL: { localSubmit(builder); break; } } } /** * 命令行方式远程提交 * 步骤如下: * 打包成Jar包,然后在客户端命令行上面进行提交 * 远程提交的时候,要先将该应用程序和其他外部依赖(非excemple工程提供,用户自己程序依赖)的jar包打包成一个大的jar包 * 再通过storm客户端中storm -jar的命令进行提交 * * 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录 * * 运行命令如下: * ./storm jar ../example/example.jar com.huawei.storm.example.WordCountTopology */ private static void cmdSubmit(TopologyBuilder builder, Config conf) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException { if (conf == null) { conf = new Config(); } conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPO LOG Y_NAME, conf, builder.createTopology()); } private static void localSubmit(TopologyBuilder builder) throws InterruptedException { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "替换为用户jar包地址"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } private static TopologyBuilder buildTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word")); return builder; } 如果拓扑开启了ack,推荐acker的数量不大于所设置的worker数量。
  • 代码样例 下面代码片段在com.huawei.storm.example.common包的“SplitSentenceBolt”类的“execute”方法中,作用在于拆分每条语句为单个单词并发送。 /** * {@inheritDoc} */ @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); if (!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } } 下面代码片段在com.huawei.storm.example.wordcount.WordCountBolt类中,作用在于统计收到的每个单词的数量。 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.out.println("word: " + word + ", count: " + count); }
  • 代码样例 下面代码片段在com.huawei.storm.example.wordcount包的“WordCountTopology”类的“main”方法中,作用在于构建应用程序并提交。 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.CMD); } private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception { switch (type) { case CMD: { cmdSubmit(builder, null); break; } case REMOTE: { remoteSubmit(builder); break; } case LOCAL: { localSubmit(builder); break; } } } /** * 命令行方式远程提交 * 步骤如下: * 打包成Jar包,然后在客户端命令行上面进行提交 * 远程提交的时候,要先将该应用程序和其他外部依赖(非excemple工程提供,用户自己程序依赖)的jar包打包成一个大的jar包 * 再通过storm客户端中storm -jar的命令进行提交 * * 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录 * * 运行命令如下: * ./storm jar ../example/example.jar com.huawei.storm.example.WordCountTopology */ private static void cmdSubmit(TopologyBuilder builder, Config conf) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException { if (conf == null) { conf = new Config(); } conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology()); } private static void localSubmit(TopologyBuilder builder) throws InterruptedException { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "替换为用户jar包地址"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } private static TopologyBuilder buildTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word")); return builder; } 如果拓扑开启了ack,推荐acker的数量不大于所设置的worker数量。
  • 功能分解 根据上述场景进行功能分解,如表1所示: 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Strom Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Strom Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Strom Bolt 4 创建topology 请参见创建Strom Topology 部分代码请参考开发Storm应用,完整代码请参考Strom-examples示例工程。
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 代码样例 下面代码片段在com.huawei.storm.example.common.SplitSentenceBolt类中,作用在于拆分每条语句为单个单词并发送。 /** * {@inheritDoc} */ @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); if (!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } } 下面代码片段在com.huawei.storm.example.wordcount.WordCountBolt类中,作用在于统计收到的每个单词的数量。 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.out.println("word: " + word + ", count: " + count); }
  • 代码样例 下面代码片段在com.huawei.storm.example.wordcount.WordCountTopology类中,作用在于构建应用程序并提交。 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在Eclipse中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.CMD); } private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception { switch (type) { case CMD: { cmdSubmit(builder, null); break; } case REMOTE: { remoteSubmit(builder); break; } case LOCAL: { localSubmit(builder); break; } } } /** * 命令行方式远程提交 * 步骤如下: * 1、打包成Jar包,然后在客户端命令行上面进行提交 * 2、远程提交的时候,要先将该应用程序和其他外部依赖(非example工程提供,用户自己程序依赖)的jar包打包成一个大的jar包 * 3、再通过storm客户端中storm -jar的命令进行提交 * * 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录 * * 运行命令如下: *./storm jar ../example/example.jar com.huawei.streaming.storm.example.WordCountTopology */ private static void cmdSubmit(TopologyBuilder builder, Config conf) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException { if (conf == null) { conf = new Config(); } conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology()); } private static void localSubmit(TopologyBuilder builder) throws InterruptedException { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "替换为用户jar包地址"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } private static TopologyBuilder buildTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word")); return builder; }