storm学习之接口实现及代码发布集群

本文详细介绍了Apache Storm的接口实现,包括spout和bolt的创建,以及数据流分组策略。同时,文章提供了单词计数的代码示例,并讲解了如何打包和发布代码到Storm集群,强调了在pom.xml中设置storm-core依赖为provided以及使用maven-shade-plugin打包的注意事项。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

2.storm学习之接口实现及代码发布集群

已单词计数为列,代码列子参见《Storm分布式实时计算模式》一书;

代码github地址:https://github.com/yuexianbing/storm_test;

基础概念:
  1. Storm的核心结构是tuple,无限制的tuple组成的序列即为Stream;
  2. spout是storm topology的主要数据入口;
  3. bolt接收spout的数据,对数据计算后可输出多个数据流,bolt可订阅多个由spout或bolt发射的数据;
Storm的数据流分组:

     Storm内置了7种数据流分组方式:

  1. Shuffle grouping(随机分组):随机将tuple分发给bolt的各task,每个bolt接收到相同数量的tuple;
  2. Field group(按字段分组):将所有具有相同字段值的tuple路由到同一个bolt的task;
  3. All grouping(全复制分组):将所有的tuple复制后分发给所有的bolt task。每个订阅数据流的task都会接收到tuple的拷贝;
  4. Golble grouping(全局分组):将所有的tuples路由到唯一的一个task上。storm按最小的taskId来选取接收数据的task。这种分组,将所有的tuple转发到一个jvm实例,可能会引起集群中某一个节点崩溃;
  5. None grouping(部分组):功能上和随机分组相同;
  6. Direct grouping(指向型分组):数据源调用emitDirect()方法判断一个tuple应由哪个Storm组件接收;
  7. Local or shhuffle grouping(本地或随机分组):如果worker内由接收数据的bolt task,storm会将tuple分发给同一个worker内的bolt task。其他情况下采用随机分组。本地或随机分组可减少网络传输,从而提高topology的性能。
spout接口实现:
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @author yuebing
 * @version 1.0 2017/9/4
 */
public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector outputCollector;
    private String[] sentence = {
            "I have a dog",
            "The dog is big dog",
            "Tt's name is huahua",
            "It is a brave dog"
    };
    private int index = 0;
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.outputCollector = spoutOutputCollector;
    }
    //提交tuple
    @Override
    public void nextTuple() {
        if (index < sentence.length) {
            this.outputCollector.emit(new Values(sentence[index]));
            index++;
        }
    }
    //发射tuple
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}
bolt接口实现:
  1. 语句拆分bolt:
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     * @author yuebing
     * @version 1.0 2017/9/4
     */
    public class SpiltSentenceBolt extends BaseRichBolt {
        private OutputCollector collector;
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }
        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for (String word : words) {
                this.collector.emit(new Values(word));
            }
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word"));
        }
    }
  2. 单词拆分及计数bolt:
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author yuebing
     * @version 1.0 2017/9/4
     */
    public class WordCountBolt extends BaseRichBolt {
    
        private OutputCollector outputCollector;
    
        private Map<String, Long> counts = null;
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.outputCollector = outputCollector;
            counts = new HashMap<String, Long>();
        }
    
        @Override
        public void execute(Tuple tuple) {
            String word = tuple.getStringByField("word");
            Long count = counts.get(word);
            if (count == null) {
                count = 0L;
            }
            count++;
            counts.put(word, count);
            outputCollector.emit(new Values(word, count));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word", "count"));
        }
    }
  3. 结果计算bolt:
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author yuebing
     * @version 1.0 2017/9/4
     */
    public class ResultBolt extends BaseRichBolt {
    
        private Map<String, Long> counts = null;
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            counts = new HashMap<String, Long>();
        }
    
        @Override
        public void execute(Tuple tuple) {
            String word = tuple.getStringByField("word");
            Long count = tuple.getLongByField("count");
            this.counts.put(word, count);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
        //topology结束时执行
        public void cleanup() {
            System.out.println("start");
            for (Map.Entry<String, Long> count : counts.entrySet()) {
                System.out.println("" + count.getKey() + ": " + count.getValue());
            }
        }
    }
    Topology实现:
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

/**
 * @author yuebing
 * @version 1.0 2017/9/4
 */
public class WordCountTopology {

    private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(WordCountTopology.class);
    private static final String SENTENCE_SPOUT_ID = "sentence_spout";
    private static final String SPLIT_BOLT_ID = "split_bolt";
    private static final String COUNT_BOLT_ID = "count_bolt";
    private static final String REPORT_BOLT_ID = "report_bolt";
    private static final String WORD_COUNT_TOPOLOGY_NAME = "word_count_topology";

    private TopologyBuilder builder = new TopologyBuilder();

    public WordCountTopology () {
        //spout及bolt初始化配置
        builder.setSpout(SENTENCE_SPOUT_ID, new SentenceSpout());
        builder.setBolt(SPLIT_BOLT_ID, new SpiltSentenceBolt()).shuffleGrouping(SENTENCE_SPOUT_ID);
        builder.setBolt(COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        builder.setBolt(REPORT_BOLT_ID, new ResultBolt()).globalGrouping(COUNT_BOLT_ID);
    }

    public void runLocal(Config config) {
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(WORD_COUNT_TOPOLOGY_NAME, config, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology(WORD_COUNT_TOPOLOGY_NAME);
        cluster.shutdown();
    }

    public void runCluster(String num, Config config) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        StormSubmitter.submitTopology(num, config, builder.createTopology());
    }

    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        Config config = new Config();
       // config.setNumWorkers(2);
        WordCountTopology topology = new WordCountTopology();
        if (args.length == 0) {
            LOG.info("本地WordCountTopology启动!!");
            topology.runLocal(config);
        } else {
            LOG.info("生产WordCountTopology启动!!");
            topology.runCluster(args[0], config);
        }
    }
}

打包:

  1. 因为集群环境已有storm的jar包,所有打包时不能在将storm的jar包打到测试topology的jar包中,否则集群环境,运行时会报错(重复读取default.yml)。因此pom.xml文件中storm的jar依赖如下设置:
    <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <scope>provided</scope>
                <version>1.1.1</version>
            </dependency>
  2. 另需在pom中配置主类入口及将其他依赖包打进topology的jar中;
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.4</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ybin.storm.wordcount.WordCountTopology</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
代码发布集群:
  1. 将代码上传到nimbus所在的节点;
  2. 启动topology:
  3. ./storm jar /opt/project/storm/strom.test-1.0-SNAPSHOT.jar com.ybin.storm.wordcount.WordCountTopology WordCountTopology:

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值