本篇文章将会一步步实现如何使用Flink对接Kafka和Redis,并将Kafka中的数据存储到Redis中,这种场景也是真实项目会遇到的。

1、单机部署Kafka

1.1 下载Kafka压缩包,解压

下载链接:http://archive.apache.org/dist/kafka/

此处我下载的版本是kafka_2.12-3.2.0,这个版本需要将–bootstrap-server localhost:9092参数替代原先的–-zookeeper localhost:2181参数,否则执行命令会报错:

Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option

at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)

at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)

at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)

at joptsimple.OptionParser.parse(OptionParser.java:396)

at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:567)

at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)

at kafka.admin.TopicCommand.main(TopicCommand.scala)

w

1.2 启动内置Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

1.3 启动Kafka

bin/kafka-server-start.sh config/server.properties

1.4 常用Kafka指令

  • 创建topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic newTopic --partitions 1 --replication-factor 1
  • 删除topic
./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic newTopic
  • 查看所有的topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 向topic中推送数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic newTopic
  • 消费topic中数据
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newTopic  --from-beginning

2、单机部署Redis

下载链接:http://download.redis.io/releases/redis-5.0.7.tar.gz

这个文章很多,推荐参考:https://www.runoob.com/redis/redis-install.html

3、项目实现

3.1 pom.xml配置

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
    </dependency>
    <!-- 连接kafka依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.12.2</version>
    </dependency>

    <!--连接redis依赖-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.1.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.2</version>
    </dependency>

    <!-- 如果将程序作为 Maven 项目开发,则必须添加 flink-clients 模块的依赖 必须要有 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.2</version>
    </dependency>

    <!-- 解决 Failed to load class "org.slf4j.impl.StaticLoggerBinder". -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

3.2 Kafka Source

//从kafka读取数据
Properties properties = new Properties();
//指定kafka的Broker地址
properties.setProperty("bootstrap.servers", "localhost:9092");
//指定组ID
properties.setProperty("group.id", "summo-group");
//如果没有记录偏移量,第一次从最开始消费
properties.setProperty("auto.offset.reset", "earliest");
//kafka的消费者不自动提交偏移量
properties.setProperty("enable.auto.commit", "false");
//kafkaSource
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("newTopic", new SimpleStringSchema(), properties);

3.3 Redis Sink

Index.java

public class Index {
    /**
     * key
     */
    private String key;

    /**
     * value
     */
    private String value;

    public Index(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Index{" +
                "key='" + key + ''' +
                ", value=" + value +
                '}';
    }
}

MyReadSink.java

import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class MyReadSink extends RedisSink<Index> {

    public MyReadSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<Index> redisSinkMapper) {
        super(flinkJedisConfigBase, redisSinkMapper);
    }
}

MyReadMapper.java

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper
public class MyReadMapper implements RedisMapper<Index> {

    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.SET, "index_key");
    }

    @Override
    public String getKeyFromData(Index index) {
        return index.getKey();
    }

    @Override
    public String getValueFromData(Index index) {
        return index.getValue();
    }
}

3.4 启动入口

public class Test {
    private static final Logger LOG = LoggerFactory.getLogger(Test.class);

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从kafka读取数据
        Properties properties = new Properties();
        //指定kafka的Broker地址
        properties.setProperty("bootstrap.servers", "localhost:9092");
        //指定组ID
        properties.setProperty("group.id", "summo-group");
        //如果没有记录偏移量,第一次从最开始消费
        properties.setProperty("auto.offset.reset", "earliest");
        //kafka的消费者不自动提交偏移量
        properties.setProperty("enable.auto.commit", "false");
        //kafkaSource
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("newTopic", new SimpleStringSchema(), properties);
        SingleOutputStreamOperator<String> lines = executionEnvironment
                .addSource(kafkaSource).name("kafka source").setParallelism(1);
        DataStream<Index> indexDataStream = lines.map(line -> {
            String[] fields = line.split(":");
            try {
                LOG.info("当前输入的key:[{}];当前输入的value:[{}]", fields[0], fields[1]);
                return new Index(fields[0], fields[1]);
            } catch (Exception e) {
                LOG.error("输入的数据格式不规范");
            }
            return null;
        }).filter(index -> null != index).name("transfer node").setParallelism(1);
        //定义jedis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .build();

        //设置sink
        indexDataStream.addSink(new MyReadSink(config, new MyReadMapper())).name("redis sink");
        //执行
        executionEnvironment.execute("kafka消费的数据存入redis");
    }
}

3.5 演示效果

Kafka窗口
image.png

控制台console
image.png

Redis窗口
image.png

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐