Storm教程

Storm Trident

Trident 是 Storm 的扩展。与 Storm 一样,Trident 也是由 Twitter 开发的。开发 Trident 的主要原因是在 Storm 之上提供高级抽象以及有状态的流处理和低延迟的分布式查询。
Trident 使用 spout 和 bolt,但这些低级组件在执行前由 Trident 自动生成。 Trident 具有函数、过滤器、连接、分组和聚合。
Trident 将流处理为一系列批处理,这些批处理称为事务。通常,这些小批量的大小将在数千或数百万个元组的数量级上,具体取决于输入流。这样,Trident 就和 Storm 不同,Storm 是逐个元组处理的。
批处理概念与数据库事务非常相似。每个事务都分配了一个事务 ID。一旦所有处理完成,交易就被认为是成功的。但是,处理事务元组之一的失败将导致整个事务被重新传输。对于每个批次,Trident 会在事务开始时调用 beginCommit,并在事务结束时提交。

Trident Topology

Trident API 公开了一个使用"TridentTopology"类创建 Trident 拓扑的简单选项。基本上,Trident 拓扑从 spout 接收输入流并对流执行有序的操作序列(过滤器、聚合、分组等)。Storm Tuple 被 Trident Tuple 替换,Bolts 被操作替换。可以创建一个简单的 Trident 拓扑如下-
TridentTopology topology = new TridentTopology();

Trident Tuples

三叉戟元组是一个命名的值列表。 TridentTuple 接口是 Trident 拓扑的数据模型。 TridentTuple 接口是 Trident 拓扑可以处理的数据的基本单元。

Trident Tuples

Trident spout 类似于 Storm spout,有额外的选项来使用 Trident 的功能。 Actually,我们仍然可以使用我们在 Storm 拓扑中使用的 IRichSpout,但它本质上是非事务性的,我们将无法使用 Trident 提供的优势。
具有使用 Trident 功能的所有功能的基本 spout 是"ITridentSpout"。它支持事务和不透明事务语义。其他 spout 是 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。
除了这些通用的 spouts 之外,Trident 还有许多 trident spout 的示例实现。其中之一是 FeederBatchSpout spout,我们可以使用它轻松发送三叉戟元组的命名列表,而无需担心批处理、并行性等。
FeederBatchSpout 创建和数据馈送可以如下所示进行-
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Spout

Trident 依赖"Trident 操作"来处理三叉戟元组的输入流。Trident API 有许多内置操作来处理从简单到复杂的流处理。这些操作范围从简单的验证到复杂的分组和三叉戟元组的聚合。让我们来看看最重要和最常用的操作。

Trident Operations

Filter 是一个用于执行输入验证任务的对象。三叉戟过滤器获取三叉戟元组字段的子集作为输入,并根据是否满足某些条件返回 true 或 false。如果返回 true,则元组保留在输出流中;否则,元组将从流中删除。 Filter 将基本上继承自 BaseFilter 类并实现 isKeep 方法。这是过滤器操作的示例实现-
public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2]
[1, 4]
可以使用"each"方法在拓扑中调用过滤器函数。"Fields"类可用于指定输入(三叉戟元组的子集)。示例代码如下-
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

Function 是一个用于对单个三叉戟元组执行简单操作的对象。它需要一个三叉戟元组字段的子集,并发出零个或多个新的三叉戟元组字段。
Function 基本上继承自 BaseFunction 类并实现了 execute 方法。下面给出了一个示例实现-
public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2, 3]
[1, 3, 4]
[1, 4, 5]
就像过滤器操作一样,可以使用 each 方法在拓扑中调用函数操作。示例代码如下-
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Aggregation

聚合是一个对象,用于对输入的批处理或分区或流执行聚合操作。 Trident 具有三种类型的聚合。它们如下-
aggregate-单独聚合每批三叉戟元组。在聚合过程中,元组最初使用全局分组进行重新分区,以将同一批次的所有分区合并为一个分区。 partitionAggregate-聚合每个分区而不是整批三叉戟元组。分区聚合的输出完全取代了输入元组。分区聚合的输出包含单个字段元组。 persistentaggregate-聚合所有批次的所有三叉戟元组,并将结果存储在内存或数据库中。
TridentTopology topology = new TridentTopology();
// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
  
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
  
// persistentAggregate-saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
可以使用CombinerAggregator、ReducerAggregator 或通用Aggregator 接口创建聚合操作。上面例子中使用的"count"聚合器是内置聚合器之一,它是使用"CombinerAggregator"实现的。实现如下-
public class Count implements CombinerAggregator<Long> {
   @Override
   public long init(TridentTuple tuple) {
      return 1L;
   }
  
   @Override
   public long combine(long val1, long val2) {
      return val1 + val2;
   }
  
   @Override
   public long zero() {
      return 0L;
   }
}

Grouping

分组操作是一个内置操作,可以通过 groupBy 方法调用。 groupBy 方法通过对指定字段执行 partitionBy 来重新分区流,然后在每个分区内,它将组字段相等的元组分组在一起。通常,我们使用"groupBy"和"persistentAggregate"来获得分组聚合。示例代码如下-
TridentTopology topology = new TridentTopology();
// persistentAggregate-saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merging and Joining

合并和加入可以分别使用"merge"和"join"方法来完成。合并合并一个或多个流。加入类似于合并,除了加入使用来自双方的三叉戟元组字段来检查和加入两个流的事实。此外,加入只能在批处理级别下工作。示例代码如下-
TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

State Maintenance

Trident 提供了一种状态维护机制。状态信息可以存储在拓扑本身中,否则您也可以将其存储在单独的数据库中。原因是为了保持一种状态,如果任何元组在处理过程中失败,则重试失败的元组。这会在更新状态时产生问题,因为您不确定此元组的状态之前是否已更新。如果元组在更新状态之前失败,那么重试元组将使状态稳定。但是,如果元组在更新状态后失败,那么重试相同的元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤以确保消息只处理一次-
小批量处理元组。 为每个批次分配一个唯一 ID。如果批处理被重试,则会被赋予相同的唯一 ID。 状态更新按批次排序。例如,在第一批的状态更新完成之前,第二批的状态更新将无法进行。

分布式RPC

分布式 RPC 用于从 Trident 拓扑中查询和检索结果。 Storm 有一个内置的分布式 RPC 服务器。分布式 RPC 服务器接收来自客户端的 RPC 请求并将其传递给拓扑。拓扑处理请求并将结果发送到分布式RPC服务器,分布式RPC服务器将其重定向到客户端。 Trident 的分布式 RPC 查询的执行方式与普通 RPC 查询类似,不同之处在于这些查询是并行运行的。

何时使用三叉戟?

在许多用例中,如果要求只处理一次查询,我们可以通过在 Trident 中编写拓扑来实现。另一方面,在 Storm 的情况下很难实现恰好一次处理。因此,Trident 对于需要处理一次的用例非常有用。 Trident 并不适用于所有用例,尤其是高性能用例,因为它增加了 Storm 并管理状态的复杂性。

Trident 的工作示例

我们将把我们在上一节中制定的通话记录分析器应用程序转换为 Trident 框架。由于其高级 API,Trident 应用程序与普通Storm相比将相对容易。 Storm 基本上需要在 Trident 中执行 Function、Filter、Aggregate、GroupBy、Join 和 Merge 操作中的任何一项。最后,我们将使用 LocalDRPC 类启动 DRPC 服务器,并使用 LocalDRPC 类的 execute 方法搜索一些关键字。

格式化调用信息

FormatCall 类的目的是格式化包含"来电者号码"和"接收者号码"的呼叫信息。完整的程序代码如下-

编码:FormatCall.java

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + "-" + toMobileNumber));
   }
}

CSVSplit

CSVSplit 类的目的是将输入的字符串以"逗号(,)"为基础进行拆分,并输出字符串中的每个单词。该函数用于解析分布式查询的输入参数。完整代码如下-

编码:CSVSplit.java

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

日志分析器

这是主要的应用程序。最初,应用程序将使用 FeederBatchSpout 初始化 TridentTopology 并提供调用者信息。可以使用 TridentTopology 类的 newStream 方法创建 Trident 拓扑流。类似地,可以使用 TridentTopology 类的 newDRCPStream 方法创建 Trident 拓扑 DRPC 流。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。 LocalDRPC 有执行方法来搜索某个关键字。完整代码如下。

编码:LogAnalyserTrident.java

import java.util.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;
import com.google.common.collect.ImmutableList;
public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
    
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));
      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));
      LocalDRPC drpc = new LocalDRPC();
      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));
      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
    
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));
         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));
         idx = idx + 1;
      }
      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401-1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401-
         1234123402,1234123401-1234123403"));
      System.out.println("DRPC : Query ends");
      cluster.shutdown();
      drpc.shutdown();
      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

构建和运行应用程序

完整的应用程序包含三个 Java 代码。它们如下-
FormatCall.java CSVSplit.java LogAnalyerTrident.java
可以使用以下命令构建应用程序-
javac-cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令运行应用程序-
java-cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

输出

一旦应用程序启动,应用程序会输出集群启动过程、操作处理、DRPC Server和客户端信息,最后是集群关闭过程的完整细节。此输出将显示在控制台上,如下所示。
DRPC : Query starts
[["1234123401-1234123402",10]]
DEBUG: [1234123401-1234123402, 10]
DEBUG: [1234123401-1234123403, 10]
[[20]]
DRPC : Query ends
昵称: 邮箱:
Copyright © 2022 立地货 All Rights Reserved.
备案号:京ICP备14037608号-4