Flume+Kafka数据采集与清洗
项目说明实现功能模拟实时推荐系统中,数据实时采集与数据预处理,并用Kafka进行数据实时消费功能。实现场景用户对商品进行评分,后台实时对其进行获取与分析,并经过计算后,生成实时推荐结果。项目架构图主要工具说明FlumeFlume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数
项目说明
-
实现功能
模拟实时推荐系统中,数据实时采集与数据预处理,并用Kafka进行数据实时消费功能。
-
实现场景
用户对商品进行评分,后台实时对其进行获取与分析,并经过计算后,生成实时推荐结果。
-
项目架构图
- 流程说明
1、用户在浏览器点击商品对商品进行评分时,调用商品服务的接口。
2、评分接口将用户、商品、评分等信息通过logger输出到文件。
3、Flume监听log文件,将日志信息通过log主题发送到Kafka中。
4、清洗服务接收从log主题发送过来的消息通过关键字过滤出有效信息,并将有效信息通过recommender主题发送到Kafka。
5、推荐服务接收recommender主题的消息,并经过实时算法处理等一系列处理后推送给用户展现。
-
主要工具说明
Flume
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
Zookeeper
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
早期版本的kafka用zookeeper做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zookeeper本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖。
-
版本说明
-
编译Kafka源代码的Scala编译器版本号为2.12,Kafka的版本为2.8.0
-
Flume1.8.0对kafka2.8.0有兼容性问题,因为在Flume1.8.0中,还是用的kafka0.9.0.1的版本包,会造成kafka2.8.0客户端消费消息时的时间戳问题。
-
工具安装
JDK安装
下载jdk-8u281-linux-x64.tar.gz压缩包,并解压缩到hadoop用户的家目录的jvm文件夹
cd ~
mkdir jvm
tar -zxf jdk-8u281-linux-x64.tar.gz -C jvm
编辑环境变量:
vim ~/.bashrc
添加JAVA_HOME:
export JAVA_HOME=/home/hadoop/jvm/jdk1.8.0_281
export PATH=$JAVA_HOME/bin
让环境变量生效:
source ~/.bashrc
查看java版本:
java -version
检验环境变量是否正确:
# 检验变量值
echo $JAVA_HOME
java -version
# 与直接执行 java -version 一样
$JAVA_HOME/bin/java -version
Zookeeper安装
Zookeeper下载地址:http://mirrors.cnnic.cn/apache/zookeeper/stable/ 或 http://mirror.bit.edu.cn/apache/zookeeper/stable/。下载apache-zookeeper-3.6.3-bin.tar.gz。
解压文件:
tar -zxf apache-zookeeper-3.6.3-bin.tar.gz -C ./
重命名:
mv apache-zookeeper-3.6.3-bin zookeeper
进入zookeeper文件中创建文件夹:
cd zookeeper
mkdir tmp
复制模板配置文件并修改:
cp ./conf/zoo-sample.cfg ./conf/zoo.cfg
vim ./conf/zoo.cfg
将dataDir的路径修改为刚才创建的tmp目录路径:
启动zookeeper:
./bin/zkServer.sh start
显示Starting zookeeper … STARTED则表示启动成功。
如果需要停止zookeeper,可以通过stop命令停止:
./bin/zkServer.sh stop
Flume-ng安装
通过wget下载flume安装包:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
解压到家目录:
tar -zxf apache-flume-1.9.0-bin.tar.gz -C ~
重命名:
mv apache-flume-1.9.0-bin flume
在flume的conf目录下新建log-kafka.properties,内容为:
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail -f /home/hadoop/flume/log/agent.log
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
上述配置文件功能描述:
使用tail -f /home/hadoop/flume/log/agent.log
命令,监听文件中改动的内容,通过正则表达式.+PRODUCT_RATING_PREFIX.+
匹配内容,将匹配的结果发送到localhost:9092
中Kafka的log
主题中。
对于以上配置参数,大致需要明白sources
,channels
,sinks
部分。对于这三部分的关系,官方给出了一张图:
Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成。每一个agent相当于一个数据(被封装成Event对象)传递员,内部有三个组件:
Source
采集组件,用于跟数据源对接,以获取数据。
Sink
下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据。
Channel
传输通道组件,用于从source将数据传递到sink。
进入flume目录,执行启动命令:
cd ~/flume
./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console
Kafka安装
通过wget下载安装包:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
解压到家目录:
tar -zxf kafka_2.12-2.8.0.tgz -C ~
重命名:
mv kafka_2.12-2.8.0.tgz kafka
进入kafka目录:
cd kafka
修改kafka配置:
vim config/server.properties
listeners=PLAINTEXT://:9092
# 192.168.1.43为本机ip
advertised.listeners=PLAINTEXT://192.168.1.43:9092
zookeeper.connect=localhost:2181
启动kafka(在zookeeper启动之后):
bin/kafka-server-start.sh -daemon ./config/server.properties
如果需要关闭kafka,执行:
bin/kafka-server-stop.sh
创建主题topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic recommender
在控制台发送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic recommender
输入命令后,控制台会显示需要输入,此时输入的信息在回车之后会发送到kafka中去
ctrl+c退出。
在控制台输出消费消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic recommender
可以通过在两个终端中,一个打开发送消息,另一个打开接收消息。
服务搭建
maven项目结构:
BigData
├── BusinessServer #商品服务
├── KafkaStreaming #清洗服务
└── StreamingRecommender #推荐服务
商品服务
BusinessServer(SpringBoot项目)
主要提供了一个restful接口,用于将关键信息打印至控制台,并配置日志输出至flume配置中指定的log文件。
评分接口:
package cn.javayuli.businessserver.web;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 评分controller
*
* @author 韩桂林
*/
@RestController
public class RatingController {
private static final Logger LOGGER = LoggerFactory.getLogger(RatingController.class);
private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX";
/**
* 用户对商品进行评分
*
* @param user 用户
* @param product 商品
* @param score 分数
* @return
*/
@GetMapping("/rate")
public String doRate(@RequestParam String user, @RequestParam String product, @RequestParam Double score) {
LOGGER.info(PRODUCT_RATING_PREFIX + ":" + user +"|"+ product +"|"+ score +"|"+ System.currentTimeMillis()/1000);
return "SUCCESS";
}
}
在application.properties中配置启动端口与log4j文件输出路径:
server.port=7001
logging.file.name=/home/hadoop/flume/log/agent.log
将项目打成jar包,上传至服务器,使用java -jar ****.jar
运行项目。
清洗服务
KafkaStreaming(非SpringBoot项目)
引入kafka-streams相关包:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
创建一个Processor
:
package cn.javayuli.kafkastream.processor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* 日志预处理
*
* @author hanguilin
*/
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX:";
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 根据前缀过滤日志信息,提取后面的内容
if(input.contains(PRODUCT_RATING_PREFIX)){
System.out.println("product rating coming!!!!" + input);
input = input.split(PRODUCT_RATING_PREFIX)[1].trim();
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void close() {
}
}
创建main函数:
package cn.javayuli.kafkastream;
import cn.javayuli.kafkastream.processor.LogProcessor;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
/**
* @author hanguilin
*/
public class KafkaStreamApp {
public static void main(String[] args) {
// kafka地址
String brokers = "192.168.1.43:9092";
// 定义输入和输出的topic
String from = "log";
String to = "recommender";
// 定义kafka streaming的配置
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
// 拓扑建构器
StreamsBuilder builder = new StreamsBuilder();
Topology build = builder.build();
// 定义流处理的拓扑结构
build.addSource("SOURCE", from)
.addProcessor("PROCESS", () -> new LogProcessor(), "SOURCE")
.addSink("SINK", to, "PROCESS");
KafkaStreams streams = new KafkaStreams(build, settings);
streams.start();
}
}
将项目打成jar包,上传至服务器,使用java -cp ****.jar cn.javayuli.kafkastream.KafkaStreamApp
运行项目。
推荐服务
StreamingRecommender(非SpringBoot项目)
此处只对消息进行消费,不做推荐计算。
main函数:
package cn.javayuli.streamrecommender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @author hanguilin
*/
public class ConsumerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.43:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("recommender"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
将项目打成jar包,上传至服务器,使用java -cp ****.jar cn.javayuli.streamrecommender.ConsumerApp
运行项目。
如果需要在非服务器上对程序进行远程测试,需要打开服务器的7001(BusinessServer)、9092(Kafka)端口,端口命令可以参考文章《CentOS7 中端口命令》。
数据模拟
下发一个评分请求:
首先商品服务会打印出日志:
查看/home/hadoop/flume/log/agent.log
可以看到,商品服务将日志追加到了/home/hadoop/flume/log/agent.log
文件中。
此时,Flume
监听到了文件内容发生改变,就会将追加的内容发送到Kafka
的log
主题中。
此时,清洗服务从log
主题中获取到包含PRODUCT_RATING_PREFIX
的日志信息,并将处理后的信息发送到recommender
主题。
(下图中打印的是从log中取出来的数据,非处理后的数据)
由于推荐服务订阅了recommender
主题,所以会对消息进行消费。
资源地址
文中只贴出了关键性代码,全部代码请查看git仓库Recommender。
更多推荐
所有评论(0)