Storm教程

Storm 工作示例

我们已经了解了 Apache Storm 的核心技术细节,现在是编写一些简单场景的时候了。

场景-移动通话记录分析器

移动呼叫及其持续时间将作为输入提供给 Apache Storm,Storm 将处理和分组同一呼叫者和接收者之间的呼叫及其总呼叫次数。

Spout 创建

Spout 是一个用于数据生成的组件。基本上,一个 spout 将实现一个 IRichSpout 接口。 "IRichSpout"接口有以下重要方法-
open-为 spout 提供执行环境。执行程序将运行此方法来初始化 spout。 nextTuple-通过收集器发出生成的数据。 close-当 spout 将要关闭时调用此方法。 declareOutputFields-声明元组的输出模式。 ack-确认处理了特定的元组 fail-指定不处理特定元组且不重新处理。

Spout Creation

open 方法的签名如下-
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf-为这个 spout 提供Storm配置。 context-提供有关拓扑中 spout 位置的完整信息、其任务 ID、输入和输出信息。 collector-使我们能够发出将由 bolt 处理的元组。

nextTuple

nextTuple 方法的签名如下-
nextTuple()
nextTuple() 从与 ack() 和 fail() 方法相同的循环中定期调用。它必须在没有工作要做时释放对线程的控制,以便其他方法有机会被调用。所以 nextTuple 的第一行检查是否 p加工完成。如果是这样,它应该在返回之前休眠至少一毫秒以减少处理器上的负载。

close

close 方法的签名如下-
close()

declareOutputFields

declareOutputFields 方法的签名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
declarer-用于声明输出流 ID、输出字段等。
此方法用于指定元组的输出模式。

ack

ack 方法的签名如下-
ack(Object msgId)
此方法确认已处理特定元组。

fail

nextTuple 方法的签名如下-
ack(Object msgId)
此方法通知尚未完全处理特定元组。 Storm 会重新处理特定的元组。

FakeCallLogReaderSpout

在我们的场景中,我们需要收集通话记录详细信息。通话记录的信息包含。
来电号码 收货人号码 持续时间
由于我们没有通话记录的实时信息,我们将生成假通话记录。虚假信息将使用 Random 类创建。完整的程序代码如下。

Coding-FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
  
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
  
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
  
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }
   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");
         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
        
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
        
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }
   //Override all the interface methods
   @Override
   public void close() {}
   public boolean isDistributed() {
      return false;
   }
   @Override
   public void activate() {}
   @Override 
   public void deactivate() {}
   @Override
   public void ack(Object msgId) {}
   @Override
   public void fail(Object msgId) {}
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt Creation

Bolt 是一个组件,它以元组为输入,处理元组,并产生新的元组作为输出。 Bolts 将实现 IRichBolt 接口。在这个程序中,两个螺栓类 CallLogCreatorBoltCallLogCounterBolt 用于执行操作。
IRichBolt 接口有以下方法-
prepare-为 Bolt 提供一个执行环境。执行程序将运行此方法来初始化 spout。 execute-处理单个输入元组。 cleanup-当一个 bolt 将要关闭时调用。 declareOutputFields-声明元组的输出模式。

Prepare

prepare 方法的签名如下-
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf-为这个 bolt 提供 Storm 配置。 context-提供有关拓扑中螺栓位置、其任务 ID、输入和输出信息等的完整信息。 collector-使我们能够发出处理后的元组。

execute

execute 方法的签名如下-
execute(Tuple tuple)
这里 tuple是要处理的输入元组。
execute 方法一次处理一个元组。元组数据可以通过元组类的 getValue 方法访问。没有必要立即处理输入元组。可以处理多个元组并作为单个输出元组输出。可以使用 OutputCollector 类发出处理后的元组。

cleanup

cleanup 方法的签名如下-
cleanup()

declareOutputFields

declareOutputFields 方法的签名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
这里的参数 declarer用于声明输出流id、输出字段等。
此方法用于指定元组的输出模式

Call log Creator Bolt

呼叫日志创建者 bolt 接收呼叫日志元组。通话记录元组有来电号码、接听号码和通话时长。这个bolt 只是通过组合呼叫者号码和接收者号码来创建一个新值。新值的格式为"来电号码-接收号码",并命名为新字段"呼叫"。完整代码如下。

Coding-CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;
   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }
   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + "-" + to, duration));
   }
   @Override
   public void cleanup() {}
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
  
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Call log Counter Bolt

调用日志计数器 bolt 接收调用及其持续时间作为元组。这个bolt 在prepare 方法中初始化一个字典(Map)对象。在 execute 方法中,它检查元组并为元组中的每个新"调用"值在字典对象中创建一个新条目,并在字典对象中设置值 1、对于已经可用的条目字典,它只是增加它的值。简单来说,这个bolt将调用及其计数保存在字典对象中。我们也可以将其保存到数据源中,而不是将调用及其计数保存在字典中。完整的程序代码如下-

Coding-CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;
   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }
   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
    
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
    
      collector.ack(tuple);
   }
   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
  
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
  
}

Creating Topology

Storm 拓扑基本上是一个 Thrift 结构。 TopologyBuilder 类提供了创建复杂拓扑的简单方法。 TopologyBuilder 类具有设置 spout (setSpout) 和设置 bolt (setBolt) 的方法。最后,TopologyBuilder 有 createTopology 来创建拓扑。使用以下代码片段创建拓扑-
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGroupingfieldsGrouping 方法有助于为 spout 和 bolts 设置流分组。

Local Cluster

出于开发目的,我们可以使用"LocalCluster"对象创建本地集群,然后使用"LocalCluster"类的"submitTopology"方法提交拓扑。 "submitTopology"的参数之一是"Config"类的实例。 "Config"类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并通过准备方法发送到所有任务(spout 和 bolt)。一旦拓扑提交到集群,我们将等待 10 秒让集群计算提交的拓扑,然后使用"LocalCluster"的"shutdown"方法关闭集群。完整的程序代码如下-

Coding-LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
    
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");
      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
      
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
    
      //Stop the topology
    
      cluster.shutdown();
   }
}

构建和运行应用程序

完整的应用程序有四个 Java 代码。他们是-
FakeCallLogReaderSpout.java CallLogCreaterBolt.java CallLogCounterBolt.java LogAnalyerStorm.java
可以使用以下命令构建应用程序-
javac-cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令运行应用程序-
java-cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

输出

一旦应用程序启动,它会输出关于集群启动过程、spout和bolt处理,最后是集群关闭过程的完整细节。在"CallLogCounterBolt"中,我们打印了调用及其计数详细信息。此信息将显示在控制台上,如下所示-
1234123402-1234123401 : 78
1234123402-1234123404 : 88
1234123402-1234123403 : 105
1234123401-1234123404 : 74
1234123401-1234123403 : 81
1234123401-1234123402 : 81
1234123403-1234123404 : 86
1234123404-1234123401 : 63
1234123404-1234123402 : 82
1234123403-1234123402 : 83
1234123404-1234123403 : 86
1234123403-1234123401 : 93

非 JVM 语言

Storm 拓扑由 Thrift 接口实现,这使得以任何语言提交拓扑变得容易。 Storm 支持 Ruby、Python 和许多其他语言。我们来看看python绑定。

Python 绑定

Python 是一种通用的解释型、交互式、面向对象的高级编程语言。 Storm 支持 Python 来实现其拓扑。 Python 支持发射、锚定、确认和记录操作。
如您所知,螺栓可以用任何语言定义。用另一种语言编写的 Bolt 作为子进程执行,Storm 通过 stdin/stdout 使用 JSON 消息与这些子进程通信。先拿一个支持python绑定的bolt WordCount示例。
public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
  
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}
这里的 WordCount 类实现了 IRichBolt 接口并使用 python 实现指定的超级方法参数"splitword.py"运行。现在创建一个名为"splitword.py"的 Python 实现。
import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()
这是 Python 的示例实现,用于计算给定句子中的单词。同样,您也可以绑定其他支持语言。
昵称: 邮箱:
Copyright © 2022 立地货 All Rights Reserved.
备案号:京ICP备14037608号-4