现在要用storm做一个计数器,我的方案是:不断地输入一串字符串,然后统计每个单词的频数.

这篇博客从以下几个方面进行阐述:

  1. 基本配置
  2. 流程分析与类的确定
  3. 奉上代码(含注释)
一:基本配置
这里注意,导包的时候要注意,否则可能会出现神奇的强制类型转换或是提示你在使用一个不存在的方法
  • 三台搭建好storm集群的linux虚拟机centos7
  • 一台用于编程的window8.1虚拟机
  • 这三台虚拟机使用桥接模式,即与宿主机在同一个网络中

二.流程分析与类的确定
  • TopologyWordCount ------用于构建整个逻辑拓扑,是整个storm的核心
  • CreateSpout ------源源不断的创建原始字符串
  • SplitBolt ------把原始字符串分割为单词后,把每个单词发送出去
  • CountBolt ------对传来的单词进行频数记录
  • PrintBolt ------把所有结果进行打印

注:既然可以源源不断的创建字符串,那么PrintBolt要打印结果就需要有一个时间限制,在这里,设定10s打印一次.

三.奉上代码

先奉上pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>storm</groupId>
    <artifactId>storm</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--上面的按照你自己的项目来就好,修改下面的内容就可以了-->
    <properties>
        <jstorm.version>2.1.1</jstorm.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <slf4j.version>1.7.12</slf4j.version>
        <joad-time.version>2.9.4</joad-time.version>
        <storm-kafka.version>0.9.4</storm-kafka.version>
        <kafka.version>0.9.0.0</kafka.version>
        <esper.version>5.4.0</esper.version>
    </properties>

    <dependencies>


        <!-- https://mvnrepository.com/artifact/com.espertech/esper -->


        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${joad-time.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>com.alibaba.jstorm</groupId>
            <artifactId>jstorm-core</artifactId>
            <version>${jstorm.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm-kafka.version}</version>
        </dependency>


        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>${slf4j.version}</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>TopologyWordCount.java</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-my-jar-with-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>


  • TopologyWordCount 用于构建整个逻辑拓扑,是整个storm的核心
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class TopologyWordCount {
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
        TopologyBuilder builder = new TopologyBuilder();

        //设置数据源
        builder.setSpout("spout", new CreateSpout(), 1);
        //读取spout的数据源,完成切分字符串的操作
        builder.setBolt("split", new SplitBolt(), 1).shuffleGrouping("spout");
        //读取split后的数据,进行count(tick周期10秒)
        builder.setBolt("count", new CountBolt(), 1).fieldsGrouping("split", new Fields("word"));
        //读取show之后的缓冲后的数据,进行最终的打印
        builder.setBolt("final", new PrintBolt(), 1).shuffleGrouping("count");


        /*---------------套路--------------------*/
        Config config = new Config();
        config.setDebug(false);
        //集群模式
        if (args != null && args.length > 0) {
            config.setNumWorkers(2);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            //单机模式
        } else {
            config.setMaxTaskParallelism(1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", config, builder.createTopology());
            Thread.sleep(3000000);
            cluster.shutdown();
        }
    }
}
这里定义了整个拓扑结构,在学习时可能 .shuffleGrouping() 这种函数不太明白,这个叫做storm的分组策略,现在讲解太细致不太好,因此我用下面的这个图简单的告诉大家大致的含义:


在这里,那个.shuffleGrouping()传入的参数就表明它要接收的数据是来自哪个Bolt 或是Spout

  • CreateSpout 源源不断的创建原始字符串
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.joda.time.DateTime;

import java.util.Map;

/**
 * 创建数据
 */
public class CreateSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private String[] sentences = null; //用来存放数据


    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        sentences = new String[]{"Hahahaha! I am coming ~~"}; // "Hahaha...这个字符串即为要源源不断发送的信息"
    }

    @Override
    public void nextTuple() {
        /*storm会循环的调用这个方法*/
        /*线程进行休眠,10s发送一次数据,在这10s内,让其余工作进行*/
        Utils.sleep(10000);
        //获得数据源
        System.out.println(new DateTime().toString("HH:mm:ss") + "--------------CreateSpout 开始发送数据----------");
        this.collector.emit(new Values(sentences)); //发送出去
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//发送时设置接收方的Tuple实例中的key
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }

}


这里为什么要休眠10s,就是为了让整个流程每10s执行一遍,否则很难看清楚整个流程是如何执行的.

  • SplitBolt 把原始字符串分割为单词后,把每个单词发送出去
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.joda.time.DateTime;

import java.util.Map;

/**
 * 按照空格切分字符串,然后推出去
 */
public class SplitBolt extends BaseRichBolt {

    private OutputCollector outputCollector;
    private int countTime = 0;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        /*这是官方文档中 tuple.getString(int i)的解释:
          Returns the String at position i in the tuple. If that field is not a String, you will get a runtime error.
                public String getString ( int i);
         */
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" ")) {
            System.out.println(new DateTime().toString("HH:mm:ss") + "--------------------SplitBolt 开始运行--------------------\n" + "> > > >  第"+count() +"次发送数据,这次发送的是:" + word);
            outputCollector.emit(new Values(word));
        }
    }
    //这是为了得到当前一共发送了多少个单词了,加深理解
    private int count() {
        return ++countTime;
    }

    /*在发射的时候,将接收方的tuple中的 key 设置为"word"*/
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
  • CountBolt 对传来的单词进行频数记录
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.TupleHelpers;
import org.joda.time.DateTime;

import java.util.HashMap;
import java.util.Map;

public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();
    private OutputCollector outputCollector;


    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = new HashMap<String, Object>();
        /*加入Tick时间窗口, 统计*/
        /*------------------?????????????????????????---------------------------*/
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        return conf;
    }

    @Override
    public void execute(Tuple tuple) {
        /*时间窗口定义为10s内的统计数据,统计完毕后,发射到下一阶段的bolt进行处理*/
        //发射完成后return结束,开始新一轮的事件窗口计数操作
        if (TupleHelpers.isTickTuple(tuple)) {/*来判断是否应该发射当前窗口数据*/
            System.out.println((new DateTime().toString("HH:mm:ss")) + "--------------------sumWordBolt 开始运行--------------------\n发送的数据内容是" + counts);
            outputCollector.emit(new Values(counts));
            return;
        }

        /*如果没有到发送时间,就继续统计wordcount*/
        String word = tuple.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word_map"));
    }
}
  • PrintBolt 把所有结果进行打印
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.joda.time.DateTime;

import java.util.Map;

public class PrintBolt extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    }

    @Override
    public void execute(Tuple input) {
        System.out.println(new DateTime().toString("HH:mm:ss") + "--------------------final bolt 开始运行--------------------");
        /*----------???????????????????????---------------------------*/
        Map<String, Integer> counts = (Map<String, Integer>) input.getValue(0);
        /*最后一个阶段,将最后的结果打印出来*/
        System.out.println(justForm(20-8)+"key"+justForm(20-8)+"      "+"value");
        for (Map.Entry<String, Integer> kv : counts.entrySet()) {
            /*这里的justForm()函数是为了保证格式一致*/
            System.out.println(kv.getKey() + justForm(kv.getKey().length()) + " 频数 : " + kv.getValue());
        }
    }
	
	//保证格式一致的私有方法
    private String justForm(int length) {
        for (int i = 0; i < 20 - length; i++) {
            System.out.print(" ");
        }
        return "";
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}



在这里,如果有谜一般的强制类型转换,或是方法上的报错,估计是导包的时候错了,认真检查一下是不是导包导错了


运行之后的效果图可以帮助你理解整个storm的流程:


Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐