大数据项目之电商数仓、Maxwell使用、 Maxwell启停脚本、增量数据同步、历史数据全量同步、采集通道Maxwell配置、通道测试
但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-inser
·
7. 业务数据采集模块
7.2 采集工具
7.2.4 Maxwell使用
7.2.4.1 启动Kafka集群
若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态。
7.2.4.2 Maxwell启停
7.2.4.2.1 启动Maxwell
[summer@hadoop102 maxwell-1.29.2]$ bin/maxwell --config config.properties --daemon
7.2.4.2.2 停止Maxwell
[summer@hadoop102 ~]$ ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9
7.2.4.2.3 Maxwell启停脚本
7.2.4.2.3.1 创建并编辑Maxwell启停脚本
[summer@hadoop102 bin]$ vim mxw.sh
7.2.4.2.3.2 脚本内容如下
#!/bin/bash
MAXWELL_HOME=/opt/module/maxwell-1.29.2
status_maxwell(){
result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
return $result
}
start_maxwell(){
status_maxwell
if [[ $? -lt 1 ]]; then
echo "启动Maxwell"
$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
else
echo "Maxwell正在运行"
fi
}
stop_maxwell(){
status_maxwell
if [[ $? -gt 0 ]]; then
echo "停止Maxwell"
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
else
echo "Maxwell未在运行"
fi
}
case $1 in
start )
start_maxwell
;;
stop )
stop_maxwell
;;
restart )
stop_maxwell
start_maxwell
;;
esac
7.2.4.3 增量数据同步
7.2.4.3.1 启动Kafka消费者
[summer@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
7.2.4.3.2 模拟生成数据
[summer@hadoop102 bin]$ java -jar gmall2020-mock-db-2021-11-14.jar
7.2.4.3.3 观察Kafka消费者
7.2.4.4 历史数据全量同步
上一节,我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。
7.2.4.4.1 Maxwell-bootstrap
[summer@hadoop102 maxwell-1.29.2]$ /opt/module/maxwell-1.29.2/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell-1.29.2/config.properties
7.2.4.4.2 boostrap数据格式
采用bootstrap方式同步的输出数据格式如下:
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-start",
"ts": 1667014630,
"data": {}
}
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-insert",
"ts": 1667014630,
"data": {
"id":194,
"login_name":"nwuckp5",
"nick_name":"环雪",
"passwd":null,
"name":"鲍环雪",
"phone_num":"13878128474",
"email":"nwuckp5@126.com",
"head_img":null,
"user_level":"2",
"birthday":"2002-12-14",
"gender":"F",
"create_time":"2020-06-14 10:31:20",
"operate_time":null,
"status":null
}
}
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-insert",
"ts": 1667014630,
"data": {
"id":195,
"login_name":"eu3kk9va08",
"nick_name":"文辉",
"passwd":null,
"name":"齐文辉",
"phone_num":"13771612693",
"email":"eu3kk9va08@sina.com",
"head_img":null,
"user_level":"1",
"birthday":"1984-06-14",
"gender":null,
"create_time":"2020-06-14 10:31:20",
"operate_time":null,
"status":null
}
}
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-complete",
"ts": 1667014630,
"data": {}
}
注意事项:
1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。
7.3 采集通道Maxwell配置
7.3.1 修改Maxwell配置文件config.properties
[summer@hadoop102 maxwell-1.29.2]$ vim config.properties
7.3.2 配置参数如下
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
#kafka topic配置
kafka_topic=topic_db
# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
7.3.3 重新启动Maxwell
[summer@hadoop102 maxwell-1.29.2]$ mxw.sh restart
7.3.4 通道测试
7.3.4.1 启动Zookeeper以及Kafka集群
7.3.4.2 启动一个Kafka Console Consumer,消费topic_db数据
[summer@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db
7.3.4.3 生成模拟数据
[summer@hadoop102 bin]$ cd /opt/module/db_log/
[summer@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar
7.3.4.4 观察Kafka消费者是否能消费到数据
更多推荐
已为社区贡献1条内容
所有评论(0)