Flink基础 -- 2.Flink的安装和第一个Demo
Flink的安装Flink的相关安装步骤如下:装虚拟机装系统装jdk装scala(不需要不用)装Hadoop(不需要不用)装Flink配置环境变量 如果只是刚开始的自我测试,安装还是很简单的,直接下载包,上传服务器,tar解压,配置了环境变量,source一下,ok,可以用了,这时不放start-cluster.sh一下启动flink吧(这里只是测试,安装了单节点)。启动后...
Flink的安装
Flink的相关安装步骤如下:
- 装虚拟机
- 装系统
- 装jdk
- 装scala(不需要不用)
- 装Hadoop(不需要不用)
- 装Flink
- 配置环境变量
如果只是刚开始的自我测试,安装还是很简单的,直接下载包,上传服务器,tar解压,配置了环境变量,source一下,ok,可以用了,这时不放start-cluster.sh一下启动flink吧(这里只是测试,安装了单节点)。启动后查看web页面:http://node01:8081
接下来我们就去IDEA开始写Demo吧。
Demo
向平常一样建一个maven项目就可以,但是需要下载scala插件,建立maven项目后需要设置sdk才能创建scala类(不需要可以不操作),然后pom内容如下(根据自己的需要修改):
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打出jar包引用关联包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!-- 将依赖包放到lib文件夹中 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>
${project.build.directory}/lib
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
接下来是一个从socket中读取文本的单词计数wordcount的java/scala版本的Demo:
JAVA
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
String ip;
int port;
try {
ip = args[0];
port = Integer.parseInt(args[1]);
} catch (Exception e) {
System.err.println("No ip/port specified.");
return;
}
System.out.println(ip + ":" + port);
//获取flink的运行环境,这里获取的是流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//绑定一个socket地址,获取socket内容
DataStream<String> text = env.socketTextStream(ip, port, "\n");
//lambda表达式写法
// DataStream<WordWithCount> wordCounts = text.flatMap((String value, Collector<WordWithCount> out) -> {
// for (String s : value.split("\n")) {
// out.collect(new WordWithCount(s, 1L));
// }
// }).keyBy("word")
// .timeWindow(Time.seconds(5), Time.seconds(1))
// .reduce((wc1, wc2) -> new WordWithCount(wc1.word, wc1.count + wc2.count));
//切分、压平、记1、分组、设置窗口参数、计数
DataStream<WordWithCount> wordCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
for (String s : value.split("\n")) {
out.collect(new WordWithCount(s, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount wc1, WordWithCount wc2) throws Exception {
return new WordWithCount(wc1.word, wc1.count + wc2.count);
}
});
//将结果输出在控制台
wordCounts.print();
//flink提交任务,开始计算
env.execute("Socket Window Word Count Java");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
Scala
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
val (ip, port) = try{
(args(0), args(1).toInt)
}catch {
case ex: Exception => System.err.println("No ip/port specified.")
return
}
//获取flink的运行环境,这里获取的是流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//绑定一个socket地址,获取socket内容
val text: DataStream[String] = env.socketTextStream(ip, port, '\n')
//切分、压平、记1、分组、设置窗口参数、计数
val wordCount = text.flatMap(_.split("\n"))
.map(WordWithCount(_, 1L))
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
//将结果输出在控制台
wordCount.print()
//flink提交任务,开始计算
env.execute("Socket Window Word Count Scala")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}
这个Demo读取一个socket内容、根据窗口操作进行计数,所以我们需要在服务器上通过nc命令开启一个socket。
linux中:nc -l 9000
注意要关闭防火墙,或者开放端口,保证客户端能连接到。
然后在IDEA中该Demo的启动参数中绑定该服务器的ip、port。
这个时候我们就可以运行程序了,然后在服务器中开始输入字符,看到IDEA控制台输出结果:
这样我们的第一个Demo就算执行成功了,这里我要说明一下:我这里犯了一个低级错误,我服务器开了nc后,然后为了验证我能不能通,我windows下也开了telnet进行了监听,同时程序也在监听这个端口,当时只是为了验证nc是通的,然后发现,telnet监听的能收到内容而程序收不到,我以为是我哪里写错了,整了好一阵子才发现原来是开着telnet的原因,telnet和程序都监听了node01的9000端口,而nc中输入的内容只能被一方收到,是先监听的那一方,所以程序中一直没打印出来,所以telnet通了接口关闭就好,不要telnet和程序同时监听或者先启动程序也好!!!下面是telnet和程序都监听了端口,程序收不到内容而没有打印的图示:
服务器中:
telnet窗口中:
程序中未收到信息:
打包在集群中运行
将程序打包,根据上面pom中的配置,会打出lib依赖包和程序包两个,将这两个包上传服务器:
服务器启动flink:
服务器提交任务:
../../flink-1.7.1/bin/flink run --class com.thebigblue.flink.scala.test.SocketWindowWordCountScala ./flink-demo-1.0-SNAPSHOT.jar node01 9000
nc中发送信息:
查看flink日志:发现程序输出结果
查看flink web ui:
到这里就算是一次Demo成功跑成了,但是想说个小问题,就是在pom中要加上最后的那个配置:
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
这个配置会在运行时将flink的核心的class加载进去,如果没有这个配置,将会报错:找不到flink的环境。
如果不想加这段配置就需要在项目配置中自己加上相关jar包了,具体如下:
1.ctrl+shift+alt+s 打开Project Structure配置
2.在Libraries中添加jar包
3.jar包在flink安装目录中的lib包下和opt包下,所以需要把这两个文件夹的jar包关联上
好了,一个Demo算是跑完了,让我们继续吧。
更多推荐
所有评论(0)