本人实测环境:centos7.3+zookeeper+KAFKA(JDK自行安装1.8),废话少说,直接开始。

一、zookeeper安装部署(附件内附:zookeeper-3.4.11.tar.gz)

#下载zookeeper
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.11.tar.gz
#解压
tar -zxcf zookeeper-3.4.11.tar.gz
#拷贝至/usr/local/目录
mv zookeeper-3.4.11/ /usr/local/
#重命名文件夹
mv /usr/local/zookeeper-3.4.11/ /usr/local/zookeeper/
#创建目录
mkdir /usr/local/zookeeper/data/
#设置配置文件
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg

        修改配置文件:/usr/local/zookeeper/conf/zoo.cfg,添加一行:dataDir=/usr/local/zookeeper/data,整体配置如下:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181

        使用如下命令对zookeeper进行启动,查看其状态。

#启动zookeeper
/usr/local/zookeeper/bin/zkServer.sh start
#查看状态
/usr/local/zookeeper/bin/zkServer.sh status

        启动成功后,查看状态结果如下:

、KAFKA安装部署(附件内附:kafka_2.10-0.9.0.0.tgz)

#下载KAFKA
wget https://archive.apache.org/dist/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
#解压KAFKA
tar -zxvf kafka_2.10-0.9.0.0.tgz
#拷贝到/usr/local目录
cp -R kafka_2.10-0.9.0.0 /usr/local
#重命名
mv /usr/local/kafka_2.10-0.9.0.0/ /usr/local/kafka/

        编辑/usr/local/kafka/config/server.properties配置文件,其他配置不变,保证具备如下配置参数:

broker.id=0
host.name=本机IP
listeners=PLAINTEXT://:9092
#方便KAFKA远程访问
advertised.listeners=PLAINTEXT://本机IP:9092
advertised.host.name=localhost
zookeeper.connect=localhost:2181

        编辑/usr/local/kafka/bin/kafka-server-start.sh文件,修改如下配置(如果虚拟机内存足够,不必做此操作)。

export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

        启动kafka,创建topic,并进行测试。

#启动Kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
#创建topic(名称为test)
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#发送数据(生产者)
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#接受数据(消费者)
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test

三、JAVA代码(Flink实时读取Kafka数据,定时批量聚合写入Mysql,附件内附源码)

        (1)Entity学生类。

public class Student {
    public int id;
    public String name;
    public String password;
    public int age;

    public Student() {
    }

    public Student(int id, String name, String password, int age) {
        this.id = id;
        this.name = name;
        this.password = password;
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", password='" + password + '\'' +
                ", age=" + age +
                '}';
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

        (2)主函数类Main2。(按标注自行替换参数)

public class Main2 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "zookeeper所在IP:9092");
        props.put("zookeeper.connect", "zookeeper所在IP:2181");
        //可使用命令查看
        // ./kafka-consumer-groups.sh --zookeeper localhost:2181 --list
        props.put("group.id", "console-consumer-91899");//请自行替换

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer09<>(
                "test",
                new SimpleStringSchema(),
                props)).setParallelism(1)
                .map(string -> JSON.parseObject(string, Student.class));

        //从kafka接受数据,对1min内的数据做聚合
        student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
            @Override
            public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
                ArrayList<Student> students = Lists.newArrayList(values);
                if (students.size() > 0) {
                    System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
                    out.collect(students);
                }
            }
        }).addSink(new SinkToMySQL());

        env.execute("Flink add sink");
    }
}

        (3)SinkToMySQL类(将批量数据写入MySQL,按标注自行替换参数)

public class SinkToMySQL extends RichSinkFunction<List<Student>> {
    PreparedStatement ps;
    BasicDataSource dataSource;
    private Connection connection;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        dataSource = new BasicDataSource();
        connection = getConnection(dataSource);
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(List<Student> value, Context context) throws Exception {
        //遍历数据集合
        for (Student student : value) {
            ps.setInt(1, student.getId());
            ps.setString(2, student.getName());
            ps.setString(3, student.getPassword());
            ps.setInt(4, student.getAge());
            ps.addBatch();
        }
        int[] count = ps.executeBatch();//批量后执行
        System.out.println("成功了插入了" + count.length + "行数据");
    }


    private static Connection getConnection(BasicDataSource dataSource) {
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
        dataSource.setUrl("jdbc:mysql://localhost:3306/数据库名");
        dataSource.setUsername("数据库用户名");
        dataSource.setPassword("数据库密码");
        //设置连接池的一些参数
        dataSource.setInitialSize(10);
        dataSource.setMaxTotal(50);
        dataSource.setMinIdle(2);

        Connection con = null;
        try {
            con = dataSource.getConnection();
            System.out.println("创建连接池:" + con);
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}

        (4)KafkaDataMonitor类(模拟向kafka发送数据,按标注自行替换参数)

public class KafkaDataMonitor {
    public static final String broker_list = "zookeeper所在IP:9092";
    public static final String topic = "test";//此处替换为zookeeper的topic

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);
        for (int i = 1; i <= 20; i++) {
            Student student = new Student(i, "test" + i, "password" + i, 18 + i);
            ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
            producer.send(record);
            System.out.println("send data: " + JSON.toJSONString(student));
        }
        producer.flush();
    }
}

        (5)我们启动主函数类(Main2)的main方法,待启动成功后。然后,使用模拟器,启动KafkaDataMonitor类的main方法,模拟向KAFKA中写入数据。日志打印如下:

       使用navicat工具连接对应mysql数据库,我们可以看到1min内写入kafka的20条模拟数据,均已成功写入Mysql。

        附:源码+配套环境部署

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐