一、实验环境

以下环境需要提前在VMWare虚拟机中搭建完成:

1.1 Maxwell 周边生态组件版本

软件版本说明
CentOS7.6-
CDH版本Cloudera Express 5.16.2 (JDK 1.8)-
KafkaCDH集成的 0.11.0+kafka3.0.0-
Maxwell1.34.1 (JDK 11,需单独安装)安装目录: /opt/module/maxwell , 需在${MAXWELL_HOME}/bin/maxwell 文件头部, 添加JAVA_HOME, 如: JAVA_HOME=/opt/module/jdk-11.0.7

设置软链接:

ln -s /opt/module/maxwell-1.34.1 /opt/module/maxwell

新建日志目录:

mkdir -p /opt/module/maxwell/logs

1.2 数据库相关信息(数据源)

数据库编号数据库名称数据库IP数据库PORT备注
1maxwell111.111.111.1113306Maxwell元数据库, binlog : master.000001
2gmall111.111.111.1123306业务数据库(在线商城)

1.3 Kafka相关信息(数据落地)

MQ编号MQ 列表
1dn3:9092,dn4:9092,dn5:9092

二、实现目标

目标:

将电子商城业务库 gmall 下, 过滤出指定的几张数据表, 并同步至 kafka集群的指定topic(名称: topic_db)主题中.

如下图 part-1 部分所示:

.
在这里插入图片描述

三、Maxwell配置

3.1 目录结构

[hdfs@dn5 module]$ tree -L 2 /opt/module/maxwell
maxwell
├── bin
│   ├── maxwell
│   ├── maxwell-benchmark
│   ├── maxwell-bootstrap
│   └── maxwell-docker
├── config.md
├── config.properties.example
├── kinesis-producer-library.properties.example
├── lib
│   └── ****.jar
├── LICENSE
├── log4j2.xml
├── maxwelllog
│   └── gmall_new_kafka.log
├── quickstart.md
├── README_2.md
├── README.md
├── run-gmall_new_kafka.sh
└── my_custom_config
    └── gmall_rtdw_test.properties

3.2 maxwell配置文件

log_level=info

# Maxwell id info
client_id=gmall_client_v2022u0101
replica_server_id=1234

# Maxwell database info
host=111.111.111.111
user=USER
password=PASSWORD
port=3306

# Kafka configuration
producer=kafka
kafka.bootstrap.servers=dn3:9092,dn4:9092,dn5:9092
kafka_topic=topic_db
kafka_partition_hash=murmur3
producer_partition_by=primary_key

# Biz database(s) info
replication_host=111.111.111.112
replication_user=ROOT
replication_password=PASSWORD
replication_port=3306
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
exclude_dbs=*
include_dbs=gmall
include_tables=activity_info,base_category1,base_category2,base_category3,base_province,base_region,cart_info,order_info,order_detal,payment_info,sku_info,spu_info,user_info

3.3 创建maxwell启动脚本

run-gmall_new_kafka.sh

#!/bin/bash

/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/my_custom_config/gmall_rtdw_test.properties >> /opt/module/maxwell/logs/gmall_new_kafka.log 2>&1

也可以将其设置为systemctl服务:

maxwell_gmall_rtdw_test_etl.service

[Unit]
Description=maxwell gmall_rtdw_test
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
ExecStart=/opt/module/maxwell/run-gmall_new_kafka.sh
WorkingDirectory=/opt/module/maxwell/
StandardOutput=inherit
StandardError=inherit
Restart=always
RestartSec=20
User=hdfs
StartLimitIntervalSec=0

[Install]
WantedBy=multi-user.target

3.4 服务管控

使用如下命令管控systemctl服务:

# 开机自启
systemctl enable maxwell_gmall_rtdw_test_etl.service
systemctl is-enabled maxwell_gmall_rtdw_test_etl.service
systemctl disable maxwell_gmall_rtdw_test_etl.service

# 状态查询
systemctl status maxwell_gmall_rtdw_test_etl.service
journalctl -xef

# 启停
systemctl start maxwell_gmall_rtdw_test_etl.service
systemctl stop maxwell_gmall_rtdw_test_etl.service
systemctl restart maxwell_gmall_rtdw_test_etl.service

3.5 查看进程

查看jps

[hdfs@dn5 maxwell-1.34.1]$ jps -ml | grep maxwell
11223 com.zendesk.maxwell.Maxwell --config /opt/module/maxwell/my_custom_config/gmall_rtdw_test.properties

查看服务:

[root@dn5 system]# systemctl status maxwell_gmall_rtdw_test_etl.service
● maxwell_gmall_rtdw_test_etl.service - maxwell gmall_new_kafka
   Loaded: loaded (/etc/systemd/system/maxwell_gmall_rtdw_test_etl.service; enabled; vendor preset: disabled)
   Active: active (running) since Wed 2022-01-01 12:12:12 CST; 1 months 15 days ago
 Main PID: 11223 (run-gmall_new_kafka.sh)
   CGroup: /system.slice/maxwell_gmall_rtdw_test_etl.service
           ├─22569 /bin/bash /opt/module/maxwell/run-gmall_new_kafka.sh
           └─22570 /opt/module/jdk-11.0.7/bin/java -Xmx27g -Xms15g -Dfile.encoding=UTF-8 -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegist...

Jan 1 12:12:12 dn5 systemd[1]: Started maxwell gmall_new_kafka.

四、查看Maxwell启动日志

more /opt/module/maxwell/logs/gmall_new_kafka.log

[hdfs@dn5 maxwelllog]$ more gmall_new_kafka.log 

2022-07-12 10:28:25,091 [main] WARN  Filter - using exclude/include/includeColumns is deprecated.  Please update your configuration to use: 
2022-07-12 10:28:25,095 [main] WARN  Filter - filter = "exclude: *.*, include: gmall.*, exclude: *.*, include: *.activity_info, include: *.base_category1, include: *.base_
category2, include: *.base_category3, include: *.base_province, include: *.base_region, include: *.cart_info, include: *.order_info, include: *.order_detal, include: *.pay
ment_info, include: *.sku_info, include: *.spu_info, include: *.user_info"
2022-07-12 10:28:25,596 [main] INFO  Maxwell - Starting Maxwell. maxMemory: 8392802304 bufferMemoryUsage: 0.25
2022-07-12 10:28:26,098 [main] INFO  ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [dn3:9092, dn4:9092, dn5:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-07-12 10:28:26,146 [main] INFO  AppInfoParser - Kafka version : 1.0.0
2022-07-12 10:28:26,146 [main] INFO  AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
2022-07-12 10:28:26,179 [main] INFO  Maxwell - Maxwell v1.34.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[master.001138:256152843], lastHeartbe
at=0]
2022-07-12 10:28:26,684 [main] INFO  AbstractSchemaStore - Maxwell is capturing initial schema
2022-07-12 10:28:36,057 [main] INFO  BinlogConnectorReplicator - Setting initial binlog pos to: master.001138:256152843
2022-07-12 10:28:36,078 [blc-172.20.105.58:3306] INFO  BinaryLogClient - Connected to 111.111.111.112:3306 at master.000001/256152843 (sid:1333, cid:63998820)
2022-07-12 10:28:36,078 [blc-172.20.105.58:3306] INFO  BinlogConnectorReplicator - Binlog connected.

五、实验效果

触发binlog产生:

5.1 若干binlog生成(1+)

基于业务逻辑, 触发生成binlog数据:

update ${DB}.${TABLE_NAME} t set t.create_time=DATE_ADD(t.create_time,INTERVAL 1 SECOND)
where t.${BIZ_DATE}>='2022-01-15 00:00:00';

如修改指定spu的描述:

update gmall.spu_info t 
set t.description = '小米10-test-maxwell'
where t.id=1;

使用kafka消费者, 查看maxwell 解析出的 MYSQL binlog数据:
[hdfs@dn5 ~]$ kafka-console-consumer --zookeeper dn3:2181 --topic topic_db

{
	"database": "gmall",
	"table": "spu_info",
	"type": "update",
	"ts": 1657598220,
	"xid": 1338044537,
	"commit": true,
	"data": {
		"id": 1,
		"spu_name": "小米10",
		"description": "小米10-test-maxwell",
		"category3_id": 61,
		"tm_id": 1
	},
	"old": {
		"description": "小米10"
	}
}

5.2 全表binlog生成

[hdfs@dn5 maxwell]$ bin/maxwell-bootstrap --database gmall --table base_trademark --config ./my_custom_config/gmall_rtdw_test.properties
connecting to jdbc:mysql://111.111.111.111:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false

批量生成指定一批表的binlog:
batch_upsert_table_process.sh

#!/bin/bash
 
include_tables="activity_info,activity_rule,activity_sku,base_category1,base_category2,base_category3,base_province,base_region,base_trademark,coupon_info,coupon_range,coupon_use,financial_sku_cost,sku_info,spu_info,user_info"
 
function get_element_by_split_comma(){
    ifs_old=$IFS
    IFS=","
 
    for tbl in $(echo "${include_tables}");do
        echo 'Now is handle table >>>>>>>>>>>>>>>>>>>>> '$tbl
        /data/maxwell/bin/maxwell-bootstrap --database gmall --table $tbl --config /data/maxwell/my_custom_config/gmall_rtdw_test.properties
    done
 
    #Recovery IFS to default
    IFS=$ifs_old
}

# shell脚本内调用函数 
get_element_by_split_comma

使用kafka消费者, 查看maxwell 解析出的 MYSQL binlog数据:
[hdfs@dn5 ~]# kafka-console-consumer --zookeeper dn3:2181 --topic topic_db

{"database":"gmall","table":"base_trademark","type":"bootstrap-start","ts":1657615954,"data":{}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":1,"tm_name":"三星","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":2,"tm_name":"苹果","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":3,"tm_name":"华为","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":4,"tm_name":"TCL","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":5,"tm_name":"小米","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":6,"tm_name":"长粒香","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":7,"tm_name":"金沙河","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":8,"tm_name":"索芙特","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":9,"tm_name":"CAREMiLLE","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":10,"tm_name":"欧莱雅","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-insert","ts":1657615954,"data":{"id":11,"tm_name":"香奈儿","logo_url":"/static/default.jpg"}}
{"database":"gmall","table":"base_trademark","type":"bootstrap-complete","ts":1657615954,"data":{}}

可以看到, type 一共有3种类型,其中 bootstrap-start 与 bootstrap-complete 的data属性为空.

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐