Flink的安装

Flink的相关安装步骤如下:

  1. 装虚拟机
  2. 装系统
  3. 装jdk
  4. 装scala(不需要不用)
  5. 装Hadoop(不需要不用)
  6. 装Flink
  7. 配置环境变量

  如果只是刚开始的自我测试,安装还是很简单的,直接下载包,上传服务器,tar解压,配置了环境变量,source一下,ok,可以用了,这时不放start-cluster.sh一下启动flink吧(这里只是测试,安装了单节点)。启动后查看web页面:http://node01:8081
在这里插入图片描述
  接下来我们就去IDEA开始写Demo吧。

Demo

  向平常一样建一个maven项目就可以,但是需要下载scala插件,建立maven项目后需要设置sdk才能创建scala类(不需要可以不操作),然后pom内容如下(根据自己的需要修改):


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.7.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>


    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 打出jar包引用关联包 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>

            <!-- 将依赖包放到lib文件夹中 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>
                                ${project.build.directory}/lib
                            </outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

 <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

  接下来是一个从socket中读取文本的单词计数wordcount的java/scala版本的Demo:

JAVA

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {
        String ip;
        int port;
        try {
            ip = args[0];
            port = Integer.parseInt(args[1]);
        } catch (Exception e) {
            System.err.println("No ip/port specified.");
            return;
        }
        System.out.println(ip + ":" + port);

        //获取flink的运行环境,这里获取的是流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //绑定一个socket地址,获取socket内容
        DataStream<String> text = env.socketTextStream(ip, port, "\n");

        //lambda表达式写法
//        DataStream<WordWithCount> wordCounts = text.flatMap((String value, Collector<WordWithCount> out) -> {
//            for (String s : value.split("\n")) {
//                out.collect(new WordWithCount(s, 1L));
//            }
//        }).keyBy("word")
//                .timeWindow(Time.seconds(5), Time.seconds(1))
//                .reduce((wc1, wc2) -> new WordWithCount(wc1.word, wc1.count + wc2.count));

        //切分、压平、记1、分组、设置窗口参数、计数
        DataStream<WordWithCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                for (String s : value.split("\n")) {
                    out.collect(new WordWithCount(s, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount wc1, WordWithCount wc2) throws Exception {
                        return new WordWithCount(wc1.word, wc1.count + wc2.count);
                    }
                });

        //将结果输出在控制台
        wordCounts.print();
        //flink提交任务,开始计算
        env.execute("Socket Window Word Count Java");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

Scala

object SocketWindowWordCountScala {

  def main(args: Array[String]): Unit = {
    val (ip, port) = try{
      (args(0), args(1).toInt)
    }catch {
      case ex: Exception =>  System.err.println("No ip/port specified.")
      return
    }

    //获取flink的运行环境,这里获取的是流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //绑定一个socket地址,获取socket内容
    val text: DataStream[String] = env.socketTextStream(ip, port, '\n')
    //切分、压平、记1、分组、设置窗口参数、计数
    val wordCount = text.flatMap(_.split("\n"))
      .map(WordWithCount(_, 1L))
      .keyBy("word")
      .timeWindow(Time.seconds(5), Time.seconds(1))
      .sum("count")
    //将结果输出在控制台
    wordCount.print()
    //flink提交任务,开始计算
    env.execute("Socket Window Word Count Scala")
  }

  // Data type for words with count
  case class WordWithCount(word: String, count: Long)
}

  这个Demo读取一个socket内容、根据窗口操作进行计数,所以我们需要在服务器上通过nc命令开启一个socket。

linux中:nc -l 9000
注意要关闭防火墙,或者开放端口,保证客户端能连接到。

在这里插入图片描述
  然后在IDEA中该Demo的启动参数中绑定该服务器的ip、port。
在这里插入图片描述
  这个时候我们就可以运行程序了,然后在服务器中开始输入字符,看到IDEA控制台输出结果:
在这里插入图片描述
在这里插入图片描述
  这样我们的第一个Demo就算执行成功了,这里我要说明一下:我这里犯了一个低级错误,我服务器开了nc后,然后为了验证我能不能通,我windows下也开了telnet进行了监听,同时程序也在监听这个端口,当时只是为了验证nc是通的,然后发现,telnet监听的能收到内容而程序收不到,我以为是我哪里写错了,整了好一阵子才发现原来是开着telnet的原因,telnet和程序都监听了node01的9000端口,而nc中输入的内容只能被一方收到,是先监听的那一方,所以程序中一直没打印出来,所以telnet通了接口关闭就好,不要telnet和程序同时监听或者先启动程序也好!!!下面是telnet和程序都监听了端口,程序收不到内容而没有打印的图示:
  服务器中:

在这里插入图片描述
  telnet窗口中:
在这里插入图片描述
  程序中未收到信息:
在这里插入图片描述

打包在集群中运行

  将程序打包,根据上面pom中的配置,会打出lib依赖包和程序包两个,将这两个包上传服务器:
在这里插入图片描述
  服务器启动flink:
在这里插入图片描述
在这里插入图片描述
  服务器提交任务:

../../flink-1.7.1/bin/flink run --class com.thebigblue.flink.scala.test.SocketWindowWordCountScala ./flink-demo-1.0-SNAPSHOT.jar node01 9000

在这里插入图片描述
  nc中发送信息:
在这里插入图片描述
  查看flink日志:发现程序输出结果
在这里插入图片描述
  查看flink web ui:
在这里插入图片描述
  到这里就算是一次Demo成功跑成了,但是想说个小问题,就是在pom中要加上最后的那个配置:

 <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

  这个配置会在运行时将flink的核心的class加载进去,如果没有这个配置,将会报错:找不到flink的环境。
在这里插入图片描述
  如果不想加这段配置就需要在项目配置中自己加上相关jar包了,具体如下:
  1.ctrl+shift+alt+s 打开Project Structure配置
  2.在Libraries中添加jar包
  3.jar包在flink安装目录中的lib包下和opt包下,所以需要把这两个文件夹的jar包关联上
在这里插入图片描述
在这里插入图片描述
  好了,一个Demo算是跑完了,让我们继续吧。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐