记录-bigdata-使用scala语言,使用flink消费kafka中的数据,统计数据存入到redis中
我这里是编码完成后,打包发送到集群上运行的!!!1.使用IDEA创建项目pom配置如下 里面掺杂了支持scala和spark的配置<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://
·
我这里是编码完成后,打包发送到集群上运行的!!!
有问题可以私聊我交流
1.使用IDEA创建项目
pom配置如下 里面掺杂了支持scala和spark的配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tledu</groupId>
<artifactId>llll</artifactId>
<version>1.0-SNAPSHOT</version>
<name>${project.artifactId}</name>
<description>My wonderfull scala app</description>
<inceptionYear>2018</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.11</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spec2.version>4.2.0</spec2.version>
<flink.version>1.10.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink包依赖配置-end -->
<!-- 日志类引入 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.6</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 引入整合redis的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.0.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-core_${scala.compat.version}</artifactId>
<version>${spec2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_${scala.compat.version}</artifactId>
<version>${spec2.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.21.0</version>
<configuration>
<!-- Tests will be run with scalatest-maven-plugin instead -->
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.0.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>TestSuiteReport.txt</filereports>
<!-- Comma separated list of JUnit test class names to execute -->
<jUnitClasses>samples.AppTest</jUnitClasses>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
编码如下 我这里是计算实时总销售额
package com.com.tledu
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import java.util.Properties
object FlinkKafka {
def main(args: Array[String]): Unit = {
// 创建flink的环境,StreamExecutionEnvironment ,消费流式数据的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建kafka配置文件
val properties = new Properties()
// 配置了bootstrap.servers: kafka的IP:端口
properties.setProperty("bootstrap.servers", "127.0.0.1:26001")
// 设置消费组
properties.setProperty("group.id", "wzxgroup")
val consumer = new FlinkKafkaConsumer[String](
"wzx", new SimpleStringSchema(), properties
)
// 设置数据源
val dataStream = env.addSource(consumer)
// 拿到数据之后,就可以使用flink算子进行操作了
dataStream.print()
// 获取O开头的数据,之后拿到数据第四项,进行类和
val result = dataStream
.filter(_.startsWith("O")) // 只保留0开头的数据
.map(_.substring(2) // 处理这里的数据,把数据的每一项的O:去掉了
.split(",")) // 切割数据,形成一个个的字段
// 求总金额
val priceResult = result
.map(item => ("price", item(3).toInt)) // 我们只关心销售额,所以将数据转成(price,销售额)
.keyBy(0) // 根据第一项进行聚合统计
.sum(1) // 根据第二项进行求和
// 配置redis的连接 注意正确的redis的ip和密码
val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("192.168.3.89")
.setPort(6379)
.setPassword("123456")
.build()
val redisSink = new RedisSink[(String, Int)](config, new MyRedisMapper)
priceResult.addSink(redisSink)
env.execute()
}
}
package com.flink
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
class MyRedisMapper extends RedisMapper[(String,Int)]{
override def getCommandDescription:RedisCommandDescription={
new RedisCommandDescription(RedisCommand.SET)
}
override def getKeyFromData(t:(String,Int)):String = t._1
override def getValueFromData(t:(String,Int)):String = t._2.toString
}
打包,上传到集群
我这里是传到了flink文件夹的bin目录里
redis的ip可以使用 ip addr 查看 密码在redis.conf中
2.在集群中运行
在flink的bin目录下运行如下指令
flink run -m yarn-cluster -c com.com.tledu.FlinkKafka(这是你要运行的类) ./llll.jar(这是你的jar包的位置,我是放在了bin目录下)
3.查看redis中的数据
进入redis的文件夹
输入如下指令即可开启redis服务器
./src/redis-server redis.conf
src/redis-cli -a 123456
进入之后可以用get key 来得到你的数据
我的key名是price
所以使用get price 即可看到我的数据
更多推荐
已为社区贡献2条内容
所有评论(0)