7. 业务数据采集模块

7.2 采集工具

7.2.4 Maxwell使用

7.2.4.1 启动Kafka集群

  若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态。

7.2.4.2 Maxwell启停
7.2.4.2.1 启动Maxwell
[summer@hadoop102 maxwell-1.29.2]$ bin/maxwell --config config.properties --daemon

在这里插入图片描述

7.2.4.2.2 停止Maxwell
[summer@hadoop102 ~]$ ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9
7.2.4.2.3 Maxwell启停脚本
7.2.4.2.3.1 创建并编辑Maxwell启停脚本
[summer@hadoop102 bin]$ vim mxw.sh

在这里插入图片描述

7.2.4.2.3.2 脚本内容如下
#!/bin/bash

MAXWELL_HOME=/opt/module/maxwell-1.29.2

status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}


start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}


stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}


case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
       start_maxwell
    ;;
esac

在这里插入图片描述

在这里插入图片描述

7.2.4.3 增量数据同步
7.2.4.3.1 启动Kafka消费者
[summer@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
7.2.4.3.2 模拟生成数据
[summer@hadoop102 bin]$ java -jar gmall2020-mock-db-2021-11-14.jar 
7.2.4.3.3 观察Kafka消费者

在这里插入图片描述

7.2.4.4 历史数据全量同步

  上一节,我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。

7.2.4.4.1 Maxwell-bootstrap
[summer@hadoop102 maxwell-1.29.2]$ /opt/module/maxwell-1.29.2/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell-1.29.2/config.properties

在这里插入图片描述

7.2.4.4.2 boostrap数据格式

采用bootstrap方式同步的输出数据格式如下:

{
    "database": "gmall",
    "table": "user_info",
    "type": "bootstrap-start",
    "ts": 1667014630,
    "data": {}
}
{
    "database": "gmall",
    "table": "user_info",
    "type": "bootstrap-insert",
    "ts": 1667014630,
    "data": {
       "id":194,
       "login_name":"nwuckp5",
       "nick_name":"环雪",
       "passwd":null,
       "name":"鲍环雪",
       "phone_num":"13878128474",
       "email":"nwuckp5@126.com",
       "head_img":null,
       "user_level":"2",
       "birthday":"2002-12-14",
       "gender":"F",
       "create_time":"2020-06-14 10:31:20",
       "operate_time":null,
       "status":null
    }
}
{
    "database": "gmall",
    "table": "user_info",
    "type": "bootstrap-insert",
    "ts": 1667014630,
    "data": {
        "id":195,
        "login_name":"eu3kk9va08",
        "nick_name":"文辉",
        "passwd":null,
        "name":"齐文辉",
        "phone_num":"13771612693",
        "email":"eu3kk9va08@sina.com",
        "head_img":null,
        "user_level":"1",
        "birthday":"1984-06-14",
        "gender":null,
        "create_time":"2020-06-14 10:31:20",
        "operate_time":null,
        "status":null
    }
}
{
    "database": "gmall",
    "table": "user_info",
    "type": "bootstrap-complete",
    "ts": 1667014630,
    "data": {}
}

在这里插入图片描述

注意事项:
1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。

7.3 采集通道Maxwell配置

7.3.1 修改Maxwell配置文件config.properties

[summer@hadoop102 maxwell-1.29.2]$ vim config.properties

在这里插入图片描述

7.3.2 配置参数如下

在这里插入图片描述

log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092

#kafka topic配置
kafka_topic=topic_db

# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

7.3.3 重新启动Maxwell

[summer@hadoop102 maxwell-1.29.2]$ mxw.sh restart

在这里插入图片描述

7.3.4 通道测试

7.3.4.1 启动Zookeeper以及Kafka集群

在这里插入图片描述

7.3.4.2 启动一个Kafka Console Consumer,消费topic_db数据
[summer@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db
7.3.4.3 生成模拟数据
[summer@hadoop102 bin]$ cd /opt/module/db_log/
[summer@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 
7.3.4.4 观察Kafka消费者是否能消费到数据

在这里插入图片描述

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐