项目说明

  • 实现功能

    模拟实时推荐系统中,数据实时采集与数据预处理,并用Kafka进行数据实时消费功能。

  • 实现场景

    用户对商品进行评分,后台实时对其进行获取与分析,并经过计算后,生成实时推荐结果。

  • 项目架构图

  • 流程说明

1、用户在浏览器点击商品对商品进行评分时,调用商品服务的接口。

2、评分接口将用户、商品、评分等信息通过logger输出到文件。

3、Flume监听log文件,将日志信息通过log主题发送到Kafka中。

4、清洗服务接收从log主题发送过来的消息通过关键字过滤出有效信息,并将有效信息通过recommender主题发送到Kafka。

5、推荐服务接收recommender主题的消息,并经过实时算法处理等一系列处理后推送给用户展现。

  • 主要工具说明

    Flume

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

    当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

    Kafka

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由ScalaJava编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

    Zookeeper

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    早期版本的kafka用zookeeper做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zookeeper本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖。

  • 版本说明

    • Kafka_2.12-2.8.0

      编译Kafka源代码的Scala编译器版本号为2.12,Kafka的版本为2.8.0

    • Flume1.9.0

      Flume1.8.0对kafka2.8.0有兼容性问题,因为在Flume1.8.0中,还是用的kafka0.9.0.1的版本包,会造成kafka2.8.0客户端消费消息时的时间戳问题。

工具安装

JDK安装

JDK下载链接

下载jdk-8u281-linux-x64.tar.gz压缩包,并解压缩到hadoop用户的家目录的jvm文件夹

cd ~
mkdir jvm
tar -zxf jdk-8u281-linux-x64.tar.gz -C jvm

编辑环境变量:

vim ~/.bashrc

添加JAVA_HOME:

export JAVA_HOME=/home/hadoop/jvm/jdk1.8.0_281
export PATH=$JAVA_HOME/bin

让环境变量生效:

source ~/.bashrc

查看java版本:

java -version

检验环境变量是否正确:

# 检验变量值
echo $JAVA_HOME
java -version
# 与直接执行 java -version 一样
$JAVA_HOME/bin/java -version

Zookeeper安装

Zookeeper下载地址:http://mirrors.cnnic.cn/apache/zookeeper/stable/ 或 http://mirror.bit.edu.cn/apache/zookeeper/stable/。下载apache-zookeeper-3.6.3-bin.tar.gz。

解压文件:

tar -zxf apache-zookeeper-3.6.3-bin.tar.gz -C ./

重命名:

mv apache-zookeeper-3.6.3-bin zookeeper

进入zookeeper文件中创建文件夹:

cd zookeeper
mkdir tmp

复制模板配置文件并修改:

cp ./conf/zoo-sample.cfg ./conf/zoo.cfg
vim ./conf/zoo.cfg

将dataDir的路径修改为刚才创建的tmp目录路径:

启动zookeeper:

./bin/zkServer.sh start

显示Starting zookeeper … STARTED则表示启动成功。

如果需要停止zookeeper,可以通过stop命令停止:

./bin/zkServer.sh stop

Flume-ng安装

通过wget下载flume安装包:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

解压到家目录:

tar -zxf apache-flume-1.9.0-bin.tar.gz -C ~

重命名:

mv apache-flume-1.9.0-bin flume

在flume的conf目录下新建log-kafka.properties,内容为:

agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink

# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail -f /home/hadoop/flume/log/agent.log
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20

#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000

上述配置文件功能描述:

使用tail -f /home/hadoop/flume/log/agent.log命令,监听文件中改动的内容,通过正则表达式.+PRODUCT_RATING_PREFIX.+匹配内容,将匹配的结果发送到localhost:9092中Kafka的log主题中。

对于以上配置参数,大致需要明白sourceschannelssinks部分。对于这三部分的关系,官方给出了一张图:

Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成。每一个agent相当于一个数据(被封装成Event对象)传递员,内部有三个组件:

Source

采集组件,用于跟数据源对接,以获取数据。

Sink

下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据。

Channel

传输通道组件,用于从source将数据传递到sink。

进入flume目录,执行启动命令:

cd ~/flume
./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console

Kafka安装

通过wget下载安装包:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz

解压到家目录:

tar -zxf kafka_2.12-2.8.0.tgz -C ~

重命名:

mv kafka_2.12-2.8.0.tgz kafka

进入kafka目录:

cd kafka

修改kafka配置:

vim config/server.properties
listeners=PLAINTEXT://:9092
# 192.168.1.43为本机ip
advertised.listeners=PLAINTEXT://192.168.1.43:9092
zookeeper.connect=localhost:2181

启动kafka(在zookeeper启动之后):

bin/kafka-server-start.sh -daemon ./config/server.properties

如果需要关闭kafka,执行:

bin/kafka-server-stop.sh

创建主题topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic recommender

在控制台发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic recommender

输入命令后,控制台会显示需要输入,此时输入的信息在回车之后会发送到kafka中去

ctrl+c退出。

在控制台输出消费消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic recommender

可以通过在两个终端中,一个打开发送消息,另一个打开接收消息。

服务搭建

maven项目结构:

BigData
├── BusinessServer #商品服务
├── KafkaStreaming #清洗服务
└── StreamingRecommender #推荐服务

商品服务

BusinessServer(SpringBoot项目)

主要提供了一个restful接口,用于将关键信息打印至控制台,并配置日志输出至flume配置中指定的log文件。

评分接口:

package cn.javayuli.businessserver.web;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 评分controller
 *
 * @author 韩桂林
 */
@RestController
public class RatingController {

    private static final Logger LOGGER = LoggerFactory.getLogger(RatingController.class);

    private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX";

    /**
     * 用户对商品进行评分
     *
     * @param user 用户
     * @param product 商品
     * @param score 分数
     * @return
     */
    @GetMapping("/rate")
    public String doRate(@RequestParam String user, @RequestParam String product, @RequestParam Double score) {
        LOGGER.info(PRODUCT_RATING_PREFIX + ":" + user +"|"+ product +"|"+ score +"|"+ System.currentTimeMillis()/1000);
        return "SUCCESS";
    }
}

在application.properties中配置启动端口与log4j文件输出路径:

server.port=7001
logging.file.name=/home/hadoop/flume/log/agent.log

将项目打成jar包,上传至服务器,使用java -jar ****.jar运行项目。

清洗服务

KafkaStreaming(非SpringBoot项目)

引入kafka-streams相关包:

<dependencies>
     <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>
</dependencies>

创建一个Processor:

package cn.javayuli.kafkastream.processor;


import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
 * 日志预处理
 *
 * @author hanguilin
 */
public class LogProcessor implements Processor<byte[], byte[]> {

    private ProcessorContext context;

    private static final String PRODUCT_RATING_PREFIX = "PRODUCT_RATING_PREFIX:";

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(byte[] key, byte[] value) {
        String input = new String(value);
        // 根据前缀过滤日志信息,提取后面的内容
        if(input.contains(PRODUCT_RATING_PREFIX)){
            System.out.println("product rating coming!!!!" + input);
            input = input.split(PRODUCT_RATING_PREFIX)[1].trim();
            context.forward("logProcessor".getBytes(), input.getBytes());
        }
    }

    @Override
    public void close() {
    }
}

创建main函数:

package cn.javayuli.kafkastream;

import cn.javayuli.kafkastream.processor.LogProcessor;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;

/**
 * @author hanguilin
 */
public class KafkaStreamApp {
    public static void main(String[] args) {
        // kafka地址
        String brokers = "192.168.1.43:9092";

        // 定义输入和输出的topic
        String from = "log";
        String to = "recommender";

        // 定义kafka streaming的配置
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

        // 拓扑建构器
        StreamsBuilder builder = new StreamsBuilder();
        Topology build = builder.build();
        // 定义流处理的拓扑结构
        build.addSource("SOURCE", from)
                .addProcessor("PROCESS", () -> new LogProcessor(), "SOURCE")
                .addSink("SINK", to, "PROCESS");

        KafkaStreams streams = new KafkaStreams(build, settings);
        streams.start();
    }
}

将项目打成jar包,上传至服务器,使用java -cp ****.jar cn.javayuli.kafkastream.KafkaStreamApp运行项目。

推荐服务

StreamingRecommender(非SpringBoot项目)

此处只对消息进行消费,不做推荐计算。

main函数:

package cn.javayuli.streamrecommender;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author hanguilin
 */
public class ConsumerApp {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.1.43:9092");
        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("recommender"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }

    }
}


将项目打成jar包,上传至服务器,使用java -cp ****.jar cn.javayuli.streamrecommender.ConsumerApp运行项目。

如果需要在非服务器上对程序进行远程测试,需要打开服务器的7001(BusinessServer)、9092(Kafka)端口,端口命令可以参考文章《CentOS7 中端口命令》

数据模拟

下发一个评分请求:

首先商品服务会打印出日志:

查看/home/hadoop/flume/log/agent.log

可以看到,商品服务将日志追加到了/home/hadoop/flume/log/agent.log文件中。

此时,Flume监听到了文件内容发生改变,就会将追加的内容发送到Kafkalog主题中。

此时,清洗服务从log主题中获取到包含PRODUCT_RATING_PREFIX的日志信息,并将处理后的信息发送到recommender主题。

(下图中打印的是从log中取出来的数据,非处理后的数据)

由于推荐服务订阅了recommender主题,所以会对消息进行消费。

资源地址

文中只贴出了关键性代码,全部代码请查看git仓库Recommender

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐