2.storm学习之接口实现及代码发布集群
已单词计数为列,代码列子参见《Storm分布式实时计算模式》一书;
代码github地址:https://github.com/yuexianbing/storm_test;
基础概念:
- Storm的核心结构是tuple,无限制的tuple组成的序列即为Stream;
- spout是storm topology的主要数据入口;
- bolt接收spout的数据,对数据计算后可输出多个数据流,bolt可订阅多个由spout或bolt发射的数据;
Storm的数据流分组:
Storm内置了7种数据流分组方式:
- Shuffle grouping(随机分组):随机将tuple分发给bolt的各task,每个bolt接收到相同数量的tuple;
- Field group(按字段分组):将所有具有相同字段值的tuple路由到同一个bolt的task;
- All grouping(全复制分组):将所有的tuple复制后分发给所有的bolt task。每个订阅数据流的task都会接收到tuple的拷贝;
- Golble grouping(全局分组):将所有的tuples路由到唯一的一个task上。storm按最小的taskId来选取接收数据的task。这种分组,将所有的tuple转发到一个jvm实例,可能会引起集群中某一个节点崩溃;
- None grouping(部分组):功能上和随机分组相同;
- Direct grouping(指向型分组):数据源调用emitDirect()方法判断一个tuple应由哪个Storm组件接收;
- Local or shhuffle grouping(本地或随机分组):如果worker内由接收数据的bolt task,storm会将tuple分发给同一个worker内的bolt task。其他情况下采用随机分组。本地或随机分组可减少网络传输,从而提高topology的性能。
spout接口实现:
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* @author yuebing
* @version 1.0 2017/9/4
*/
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector outputCollector;
private String[] sentence = {
"I have a dog",
"The dog is big dog",
"Tt's name is huahua",
"It is a brave dog"
};
private int index = 0;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.outputCollector = spoutOutputCollector;
}
//提交tuple
@Override
public void nextTuple() {
if (index < sentence.length) {
this.outputCollector.emit(new Values(sentence[index]));
index++;
}
}
//发射tuple
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
bolt接口实现:
- 语句拆分bolt:
import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * @author yuebing * @version 1.0 2017/9/4 */ public class SpiltSentenceBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { this.collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
- 单词拆分及计数bolt:
import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * @author yuebing * @version 1.0 2017/9/4 */ public class WordCountBolt extends BaseRichBolt { private OutputCollector outputCollector; private Map<String, Long> counts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; counts = new HashMap<String, Long>(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = counts.get(word); if (count == null) { count = 0L; } count++; counts.put(word, count); outputCollector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); } }
- 结果计算bolt:
import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; /** * @author yuebing * @version 1.0 2017/9/4 */ public class ResultBolt extends BaseRichBolt { private Map<String, Long> counts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { counts = new HashMap<String, Long>(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = tuple.getLongByField("count"); this.counts.put(word, count); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } //topology结束时执行 public void cleanup() { System.out.println("start"); for (Map.Entry<String, Long> count : counts.entrySet()) { System.out.println("" + count.getKey() + ": " + count.getValue()); } } }
Topology实现:
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
/**
* @author yuebing
* @version 1.0 2017/9/4
*/
public class WordCountTopology {
private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(WordCountTopology.class);
private static final String SENTENCE_SPOUT_ID = "sentence_spout";
private static final String SPLIT_BOLT_ID = "split_bolt";
private static final String COUNT_BOLT_ID = "count_bolt";
private static final String REPORT_BOLT_ID = "report_bolt";
private static final String WORD_COUNT_TOPOLOGY_NAME = "word_count_topology";
private TopologyBuilder builder = new TopologyBuilder();
public WordCountTopology () {
//spout及bolt初始化配置
builder.setSpout(SENTENCE_SPOUT_ID, new SentenceSpout());
builder.setBolt(SPLIT_BOLT_ID, new SpiltSentenceBolt()).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
builder.setBolt(REPORT_BOLT_ID, new ResultBolt()).globalGrouping(COUNT_BOLT_ID);
}
public void runLocal(Config config) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(WORD_COUNT_TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology(WORD_COUNT_TOPOLOGY_NAME);
cluster.shutdown();
}
public void runCluster(String num, Config config) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
StormSubmitter.submitTopology(num, config, builder.createTopology());
}
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
Config config = new Config();
// config.setNumWorkers(2);
WordCountTopology topology = new WordCountTopology();
if (args.length == 0) {
LOG.info("本地WordCountTopology启动!!");
topology.runLocal(config);
} else {
LOG.info("生产WordCountTopology启动!!");
topology.runCluster(args[0], config);
}
}
}
打包:
- 因为集群环境已有storm的jar包,所有打包时不能在将storm的jar包打到测试topology的jar包中,否则集群环境,运行时会报错(重复读取default.yml)。因此pom.xml文件中storm的jar依赖如下设置:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <scope>provided</scope> <version>1.1.1</version> </dependency>
- 另需在pom中配置主类入口及将其他依赖包打进topology的jar中;
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.ybin.storm.wordcount.WordCountTopology</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
代码发布集群:
- 将代码上传到nimbus所在的节点;
- 启动topology:
-
./storm jar /opt/project/storm/strom.test-1.0-SNAPSHOT.jar com.ybin.storm.wordcount.WordCountTopology WordCountTopology: