mysql数据变更捕获的实现已经有很多开源工具,比如canal,debezium,maxwell等等。alibaba/canal实现了mysql连接协议,debezium和maxwell等则是利用mysql-binlog-connector-java开源工具连接mysql数据源,实现获取binlog日志。本篇文章介绍通过引入mysql-binlog-connector-java依赖,提供在线(即连接数据源,实时获取)和离线(读取离线文件)两种方式获取,解析binlog日志(利用mysql-binlog-connector-java的事件解析数据表增删改操作DML)。

话不多说,直接上代码。

BinLogConstants类主要提供目标数据库的连接信息,也包括解析模式选择(在线or离线)。

package com.binlog.parser;

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 监听配置信息
 *
 * @author lgq
 * @since 2022/7/27
 **/
@Data
@Component
public class BinLogConstants {
    @Value("${binlog.datasource.host}")
    private String host;

    @Value("${binlog.datasource.port}")
    private int port;

    @Value("${binlog.datasource.username}")
    private String username;

    @Value("${binlog.datasource.passwd}")
    private String passwd;

    @Value("${binlog.db}")
    private String db;

    @Value("${binlog.table}")
    private String table;

    @Value("${binlog.isOnline}")
    private boolean isOnline;

    public static final int consumerThreads = 4;

    public static final int fileScanThreads = 4;

    public static final long queueSleep = 1000;

}
BinLogListener类注册消费监听器,提供事件解析方法,并启动消费线程。
package com.binlog.parser;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;

import static com.github.shyiko.mysql.binlog.event.EventType.QUERY;
import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isUpdate;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;

/**
 * binlog日志监听器
 *
 * @author lgq
 * @since 2022/7/21
 **/
public abstract class BinLogListener {
    private final int consumerThreads = BinLogConstants.consumerThreads;
    private final BlockingQueue<BinLogItem> binLogItemQueue;
    private final ExecutorService consumer;
    // 存放每张数据表对应的listener
    private final Multimap<String, BinLogConsumerListener> listeners;
    private final MysqlConnConf mysqlConnConf;
    private final Map<String, Map<String, ColumnInfo>> dbTableCols;
    private String dbTable;

    /**
     * 监听器初始化
     *
     * @param conf
     */
    public BinLogListener(MysqlConnConf conf) {
        this.binLogItemQueue = new ArrayBlockingQueue<>(1024);
        this.mysqlConnConf = conf;
        this.listeners = ArrayListMultimap.create();
        this.dbTableCols = new ConcurrentHashMap<>();
        this.consumer = Executors.newFixedThreadPool(consumerThreads);
    }

    /**
     * 注册消费监听
     *
     * @param db       数据库
     * @param table    操作表
     * @param listener 监听器
     * @throws Exception
     */
    public void regConsumerListener(String db, String table, BinLogConsumerListener listener) throws Exception {
        String dbTable = BinLogUtils.getDBTable(db, table);
        // 获取字段集合
        Map<String, ColumnInfo> cols = BinLogUtils.getColMap(mysqlConnConf, db, table);
        // 保存字段信息
        dbTableCols.put(dbTable, cols);
        // 保存当前注册的listener
        listeners.put(dbTable, listener);
    }

    /**
     * 读取日志解析并启动多线程消费
     *
     */
    public void consume() {
        consumer.submit(() -> {
            while (true) {
                if (binLogItemQueue.size() > 0) {
                    try {
                        BinLogItem item = binLogItemQueue.take();
                        String dbTable = item.getDbTable();
                        listeners.get(dbTable).forEach(binLogListener -> binLogListener.onEvent(item));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Thread.sleep(BinLogConstants.queueSleep);
            }
        });
    }

    public void parseEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();
        if (eventType == QUERY) {
            QueryEventData data = event.getData();
            System.out.println(data.getSql());
        }

        if (eventType == EventType.TABLE_MAP) {
            TableMapEventData tableData = event.getData();
            String db = tableData.getDatabase();
            String table = tableData.getTable();
            dbTable = BinLogUtils.getDBTable(db, table);
        }

        // 只处理添加删除更新三种操作
        if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
            if (isWrite(eventType)) {
                WriteRowsEventData data = event.getData();
                for (Serializable[] row : data.getRows()) {
                    if (dbTableCols.containsKey(dbTable)) {
                        BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable),
                                eventType, event.getHeader().getTimestamp());
                        item.setDbTable(dbTable);
                        binLogItemQueue.add(item);
                    }
                }
            }
            if (isUpdate(eventType)) {
                UpdateRowsEventData data = event.getData();
                for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {
                    if (dbTableCols.containsKey(dbTable)) {
                        BinLogItem item = BinLogItem.itemFromUpdate(row, dbTableCols.get(dbTable),
                                eventType, event.getHeader().getTimestamp());
                        item.setDbTable(dbTable);
                        binLogItemQueue.add(item);
                    }
                }

            }
            if (isDelete(eventType)) {
                DeleteRowsEventData data = event.getData();
                for (Serializable[] row : data.getRows()) {
                    if (dbTableCols.containsKey(dbTable)) {
                        BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable),
                                eventType, event.getHeader().getTimestamp());
                        item.setDbTable(dbTable);
                        binLogItemQueue.add(item);
                    }
                }
            }
        }
    }

    public abstract void startParseBinLog();

}

OnlineBinLogListener类通过BinaryLogClient开启连接远程实时拉取binlog日志。
package com.binlog.parser;

import java.io.IOException;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;

import lombok.extern.slf4j.Slf4j;

/**
 * 数据库监听器
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Slf4j
public class OnlineBinLogListener extends BinLogListener implements BinaryLogClient.EventListener {
    private final BinaryLogClient parseClient;

    /**
     * 监听器初始化
     *
     * @param conf
     */
    public OnlineBinLogListener(MysqlConnConf conf) {
        super(conf);
        BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPasswd());
        EventDeserializer eventDeserializer = new EventDeserializer();
        /*
        eventDeserializer.setCompatibilityMode( //序列化
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
         */
        client.setEventDeserializer(eventDeserializer);
        this.parseClient = client;
    }

    /**
     * 监听处理
     *
     * @param event
     */
    @Override
    public void onEvent(Event event) {
        parseEvent(event);
    }

    @Override
    public void startParseBinLog() {
        // 先启动消费线程
        consume();
        parseClient.registerEventListener(this);
        // 连接数据库,开始拉取binlog日志(没有过滤库,表信息)
        try {
            parseClient.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

OfflineBinLogListener实时获取指定目录中新增的完整的binlog文件(默认为500m)进行离线解析。
package com.binlog.parser;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;

import javax.annotation.Resource;

import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.binlog.config.FileProperties;
import com.binlog.util.SpringUtil;

import lombok.extern.slf4j.Slf4j;

/**
 * 数据库监听器
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Slf4j
public class OfflineBinLogListener extends BinLogListener {
    private MysqlBinLogReader mysqlBinLogReader;

    private FileProperties fileProperties;

    /**
     * 监听器初始化
     *
     * @param conf
     */
    public OfflineBinLogListener(MysqlConnConf conf) {
       super(conf);
       this.mysqlBinLogReader = (MysqlBinLogReader)SpringUtil.getApplicationContext().getBean("mysqlBinLogReader");
       this.fileProperties = (FileProperties)SpringUtil.getApplicationContext().getBean("fileProperties");
    }

    public OfflineBinLogListener(MysqlConnConf conf, MysqlBinLogReader mysqlBinLogReader,
                                 FileProperties fileProperties) {
        super(conf);
        this.mysqlBinLogReader = mysqlBinLogReader;
        this.fileProperties = fileProperties;
    }

    @Override
    public void startParseBinLog() {
        // 先启动消费线程
        consume();
        BlockingQueue<String> fileNameQueue = mysqlBinLogReader.getFileNameQueue();
        ExecutorService offlineFileProcess = mysqlBinLogReader.getOfflineFileProcess();
        offlineFileProcess.submit(() -> {
            Thread.currentThread().setName("scanBinLogFileConsumer");
            while (true) {
                if (!fileNameQueue.isEmpty()) {
                    String fileName;
                    try {
                        fileName = fileNameQueue.take();
                        File binlogFile = new File(fileProperties.getLocalBackupDir(), fileName);
                        EventDeserializer eventDeserializer = new EventDeserializer();

                        BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
                        for (Event event; (event = reader.readEvent()) != null; ) {
                            parseEvent(event);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                }
            }
        });
    }

}

BinLogListenerStarter监听启动器注册指定的库,表监听器,并启动日志解析和消费线程。
package com.binlog.parser;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;


/**
 * 测试监听器
 * SpringBoot启动成功后的执行业务线程操作
 * CommandLineRunner去实现此操作
 * 在有多个可被执行的业务时,通过使用 @Order 注解,设置各个线程的启动顺序(value值由小到大表示启动顺序)。
 * 多个实现CommandLineRunner接口的类必须要设置启动顺序,不然程序启动会报错!
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Slf4j
@Component
@Order(value = 1)
public class BinLogListenerStarter implements CommandLineRunner {

    @Resource
    private BinLogConstants binLogConstants;

    @Override
    public void run(String... args) throws Exception {
        log.info("初始化配置信息:" + binLogConstants.toString());

        // 初始化配置信息
        MysqlConnConf conf = new MysqlConnConf(binLogConstants.getHost(), binLogConstants.getPort(),
                binLogConstants.getUsername(), binLogConstants.getPasswd());

        // 初始化监听器
        BinLogListener binLogListener;
        if (binLogConstants.isOnline()) {
            binLogListener = new OnlineBinLogListener(conf);
        } else {
            binLogListener = new OfflineBinLogListener(conf);
        }

        // 获取table集合
        List<String> tableList = BinLogUtils.getListByStr(binLogConstants.getTable());
        if (Objects.isNull(tableList) || tableList.size() == 0) {
            return;
        }
        // 注册监听
        tableList.forEach(table -> {
            log.info("注册监听信息,注册DB:{}, 注册表:{}", binLogConstants.getDb(), table);
            try {
                binLogListener.regConsumerListener(binLogConstants.getDb(), table, item -> {
                    log.info("监听逻辑处理:打印一下数据变更信息: {}", item.toString());
                });
            } catch (Exception e) {
                log.error("BinLog监听异常:{}", e);
            }
        });
        // 开启日志获取和解析业务线程
        binLogListener.startParseBinLog();
    }
}

ColumnInfo存储表字段信息。
package com.binlog.parser;

import lombok.Data;

/**
 * 字段属性对象
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Data
public class ColumnInfo {
    public int inx;
    public String colName; // 列名
    public String dataType; // 类型
    public String schema; // 数据库
    public String table; // 表
    public boolean isPKC; //是否主键

    public ColumnInfo(String schema, String table, int idx, String colName, String dataType, boolean isPKC) {
        this.schema = schema;
        this.table = table;
        this.colName = colName;
        this.dataType = dataType;
        this.inx = idx;
        this.isPKC = isPKC;
    }
}
BinLogItem存储解析日志获得的结构化数据。
package com.binlog.parser;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import com.github.shyiko.mysql.binlog.event.EventType;
import com.google.common.collect.Maps;

import lombok.Data;

import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;

/**
 * binlog对象
 *
 * @author lgq
 * @since 2021/7/26
 **/
@Data
public class BinLogItem implements Serializable {
    private static final long serialVersionUID = 1L;

    private String dbTable;
    private EventType eventType;
    private Long timestamp = null;
    private Long serverId = null;
    // 存储字段-之前的值之后的值
    private Map<String, Serializable> before = null;
    private Map<String, Serializable> after = null;
    // 存储字段--类型
    private Map<String, ColumnInfo> columnInfoMap = null;
    // json格式的数据变更信息
    private Map<String, Object> messageJson = new HashMap<>();

    /**
     * 新增或者删除操作数据格式化
     */
    public static BinLogItem itemFromInsertOrDeleted(Serializable[] row, Map<String, ColumnInfo> columnInfoMap,
                                                     EventType eventType, Long timestamp) {
        if (null == row || null == columnInfoMap) {
            return null;
        }
       /* if (row.length != columnInfoMap.size()) {
            return null;
        }*/
        // 初始化Item
        BinLogItem item = new BinLogItem();
        item.eventType = eventType;
        item.columnInfoMap = columnInfoMap;
        item.before = Maps.newHashMap();
        item.after = Maps.newHashMap();
        item.timestamp = timestamp;

        // json预设主键
        item.messageJson.put("pkc", null);

        Map<String, Serializable> beOrAf = Maps.newHashMap();
        columnInfoMap.entrySet().forEach(entry -> {
            String key = entry.getKey();
            ColumnInfo columnInfo = entry.getValue();
            beOrAf.put(key, row[columnInfo.inx]);
            if (columnInfo.isPKC) {
                // json设置主键
                item.messageJson.put("pkc", key);
            }
        });

        // 写操作放after,删操作放before
        if (isWrite(eventType)) {
            item.after = beOrAf;
        }
        if (isDelete(eventType)) {
            item.before = beOrAf;
        }

        // 构造json格式的数据变更信息
        item.messageJson.put("op", Operator.valueOf(eventType));
        item.messageJson.put("ts", item.timestamp);
        item.messageJson.put("before", item.before);
        item.messageJson.put("after", item.after);

        return item;
    }

    /**
     * 更新操作数据格式化
     */
    public static BinLogItem itemFromUpdate(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, ColumnInfo> columnInfoMap,
                                            EventType eventType, Long timestamp) {
        if (null == mapEntry || null == columnInfoMap) {
            return null;
        }
        // 初始化Item
        BinLogItem item = new BinLogItem();
        item.eventType = eventType;
        item.columnInfoMap = columnInfoMap;
        item.before = Maps.newHashMap();
        item.after = Maps.newHashMap();
        item.timestamp = timestamp;
        // json预设主键
        item.messageJson.put("pkc", null);

        Map<String, Serializable> be = Maps.newHashMap();
        Map<String, Serializable> af = Maps.newHashMap();

        columnInfoMap.entrySet().forEach(entry -> {
            String key = entry.getKey();
            ColumnInfo columnInfo = entry.getValue();
            be.put(key, mapEntry.getKey()[columnInfo.inx]);
            af.put(key, mapEntry.getValue()[columnInfo.inx]);
            if (columnInfo.isPKC) {
                // json设置主键
                item.messageJson.put("pkc", key);
            }
        });

        item.before = be;
        item.after = af;

        /*
        * 构造json格式的数据变更信息
        */
        item.messageJson.put("op", Operator.valueOf(eventType));
        item.messageJson.put("ts", item.timestamp);
        item.messageJson.put("before", item.before);
        item.messageJson.put("after", item.after);

        return item;
    }

    @Override
    public String toString() {
        return messageJson.toString();
    }

}

BinLogUtils提供一些工具类方法供解析时使用。
package com.binlog.parser;

import com.github.shyiko.mysql.binlog.event.EventType;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.io.Serializable;
import java.sql.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isUpdate;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;

/**
 * 监听工具
 *
 * @author lgq
 * @since 2021/7/27
 **/
@Slf4j
@Component
public class BinLogUtils {

    private static BinLogUtils binLogUtils;

    @PostConstruct
    public void init() {
        binLogUtils = this;
    }

    /**
     * 拼接dbTable
     */
    public static String getDBTable(String db, String table) {
        return db + "-" + table;
    }

    /**
     * 获取columns集合
     */
    public static Map<String, ColumnInfo> getColMap(MysqlConnConf conf, String db, String table) throws ClassNotFoundException {
        try {
            Class.forName("com.mysql.jdbc.Driver");
            // 保存当前注册的表的column信息
            Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.getHost() + ":"
                    + conf.getPort(), conf.getUsername(), conf.getPasswd());
            // 执行sql
            String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +
                    "DATA_TYPE, ORDINAL_POSITION, case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +
                    " FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";
            PreparedStatement ps = connection.prepareStatement(preSql);
            ps.setString(1, db);
            ps.setString(2, table);
            ResultSet rs = ps.executeQuery();
            Map<String, ColumnInfo> map = new HashMap<>(rs.getRow());
            while (rs.next()) {
                String schema = rs.getString("TABLE_SCHEMA");
                String tableName = rs.getString("TABLE_NAME");
                String column = rs.getString("COLUMN_NAME");
                int idx = rs.getInt("ORDINAL_POSITION");
                String dataType = rs.getString("DATA_TYPE");
                String isPKC = rs.getString("IS_PKC");
                if (column != null && idx >= 1) {
                    map.put(column, new ColumnInfo(schema, tableName, idx - 1, column, dataType,
                            "Y".equals(isPKC))); // sql的位置从1开始
                }
            }
            ps.close();
            rs.close();
            return map;
        } catch (SQLException e) {
            log.error("load db conf error, db_table={}:{} ", db, table, e);
        }
        return null;
    }

    /**
     * 根据DBTable获取table
     *
     * @param dbTable
     * @return java.lang.String
     */
    public static String getTable(String dbTable) {
        if (Objects.isNull(dbTable)) {
            return "";
        }
        String[] split = dbTable.split("-");
        if (split.length == 2) {
            return split[1];
        }
        return "";
    }

    /**
     * 将逗号拼接字符串转List
     *
     * @param str
     * @return
     */
    public static List<String> getListByStr(String str) {
        if (Objects.isNull(str)) {
            return Lists.newArrayList();
        }

        return Arrays.asList(str.split(","));
    }

    /**
     * 根据操作类型获取对应集合
     *
     * @param binLogItem
     * @return
     */
    public static Map<String, Serializable> getOptMap(BinLogItem binLogItem) {
        // 获取操作类型
        EventType eventType = binLogItem.getEventType();
        if (isWrite(eventType) || isUpdate(eventType)) {
            return binLogItem.getAfter();
        }
        if (isDelete(eventType)) {
            return binLogItem.getBefore();
        }
        return null;
    }

    /**
     * 获取操作类型
     *
     * @param binLogItem
     * @return
     */
    public static Integer getOptType(BinLogItem binLogItem) {
        // 获取操作类型
        EventType eventType = binLogItem.getEventType();
        if (isWrite(eventType)) {
            return 1;
        }
        if (isUpdate(eventType)) {
            return 2;
        }
        if (isDelete(eventType)) {
            return 3;
        }
        return null;
    }

}

MysqlBinLogReader初始化并启动扫描指定目录下binlog日志的线程。
package com.binlog.parser;

import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.Resource;

import com.binlog.config.FileProperties;
import com.binlog.util.FileInfoUtil;

import lombok.Data;
import org.springframework.stereotype.Component;

/**
 * @author lgq
 * @since 2022/7/21
 */
@Component
@Data
public class MysqlBinLogReader {
    private final int fileScanThreads = 4;

    private final ExecutorService offlineFileProcess;

    private final BlockingQueue<String> fileNameQueue;

    @Resource
    private FileProperties fileProperties;

    public MysqlBinLogReader(){
        this.offlineFileProcess = Executors.newFixedThreadPool(fileScanThreads);
        this.fileNameQueue = new ArrayBlockingQueue<>(1024);
    }

    public MysqlBinLogReader(FileProperties fileProperties){
        this.offlineFileProcess = Executors.newFixedThreadPool(fileScanThreads);
        this.fileNameQueue = new ArrayBlockingQueue<>(1024);
        this.fileProperties = fileProperties;
    }

    public void scanBinLogFile() {
        offlineFileProcess.submit(() -> {
            Thread.currentThread().setName("scanBinLogFileProducer");
            while (true) {
                TreeMap<Long, String> newBinLogFilesPath = FileInfoUtil.getNewBinLogFilesPath(
                        fileProperties.getLocalBackupDir(), fileProperties.getBackupLogName(),
                        fileProperties.getLastFileName());
                newBinLogFilesPath.forEach((createTime, fileName) -> {
                    fileNameQueue.add(fileName);
                });
                if (newBinLogFilesPath.size() > 0) {
                    fileProperties.setLastFileName(newBinLogFilesPath.lastEntry().getValue());
                }
            }
        });
    }
}
FileInfoUtil提供一些文件扫描的工具类,比如获取指定目录下新增的binlog日志文件(该文件确定已经写入完成,后面mysql的binlog日志是写入新的日志文件)。
package com.binlog.util;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.TreeMap;

public class FileInfoUtil {
    public static TreeMap<Long, String> getNewBinLogFilesPath(String localBackupDir,
                                                          String backupLogName,
                                                          String lastLogPath) throws IOException {
        //treeMap默认根据字典序排序,时间小的在前面
        TreeMap<Long, String> newBinLogFileMap = new TreeMap<>();
        File dir = new File(localBackupDir);
        Long lastFileCreateTime = 0L;
        if (!ObjectsUtil.isEmpty(lastLogPath)) {
            File lastFile = new File(lastLogPath);
            lastFileCreateTime = Files.readAttributes(lastFile.toPath(), BasicFileAttributes.class)
                    .creationTime().toMillis();
        }
        String [] files = dir.list();
        for (int i = 0; i < files.length; i++) {
            //排除日志文件
            if (files[i].equals(backupLogName)) {
                continue;
            }
            File file = new File(dir, files[i]);
            //排除异常情况:目录
            if (file.isDirectory()) {
                continue;
            }
            long fileCreateTime = Files.readAttributes(file.toPath(), BasicFileAttributes.class).creationTime().toMillis();
            if (fileCreateTime > lastFileCreateTime) {
                newBinLogFileMap.put(fileCreateTime, files[i]);
            }
        }
        return newBinLogFileMap;
    }
}
FilesScanStarter开启文件扫描线程,捕获新增的日志文件。
package com.binlog.parser;

import javax.annotation.Resource;

import com.binlog.config.FileProperties;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 * @author lgq
 * @since 2022/7/21
 */
@Slf4j
@Order(value = 0)
@Component
public class FilesScanStarter implements CommandLineRunner {
    @Resource
    private MysqlBinLogReader mysqlBinLogReader;

    @Override
    public void run(String... args) {
        mysqlBinLogReader.scanBinLogFile();
    }
}

其他的配置文件如下。

package com.binlog.parser;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * 数据库配置
 *
 * @author lgq
 * @since 2022/7/21
 **/
@Data
@AllArgsConstructor
public class MysqlConnConf {
    private String host;
    private int port;
    private String username;
    private String passwd;
}


//application.yml文件
/*
server:
  port: 8080

binlog:
  localBackupDir: E:\backup\binlog
  backupLog: E:\backup\binlog\backuplog.txt
  backupLogName: backuplog.txt
  lastFileName:
  table: abc
  db: db1
  isOnline: false

  datasource:
    host: 1.1.1.1
    port: 3306
    username: abc
    passwd: 123456
*/




离线方式的指定目录文件获取方式有很多,比如scp,tfp等命令拷贝,或者直接通过mysql提供的mysqlbinlog命令实时拉取,最后给出mysqlbinlog远程实时拉取binlog日志的脚本。

#!/bin/sh
BACKUP_BIN=/usr/bin/mysqlbinlog
LOCAL_BACKUP_DIR=/backup/binlog/
BACKUP_LOG=/backup/binlog/backuplog
REMOTE_HOST=1.1.1.1
REMOTE_PORT=3306
REMOTE_USER=abc
REMOTE_PASS=123456
FIRST_BINLOG=binlog.000001  
#time to wait before reconnecting after failure
SLEEP_SECONDS=10
##create local_backup_dir if necessary
mkdir -p ${LOCAL_BACKUP_DIR}
cd ${LOCAL_BACKUP_DIR}
## 运行while循环,连接断开后等待指定时间,重新连接
while :
do
 if [ `ls -A "${LOCAL_BACKUP_DIR}" |wc -l` -eq 0 ];then
 LAST_FILE=${FIRST_BINLOG}
 else
 LAST_FILE=`ls -l ${} | grep -v backuplog |tail -n 1 |awk '{print $9}'`
 fi
 ${BACKUP_BIN} --raw --read-from-remote-server --stop-never --host=${REMOTE_HOST} --port=${REMOTE_PORT} --user=${REMOTE_USER} --password=${REMOTE_PASS} ${LAST_FILE}
 echo "`date +"%Y/%m/%d %H:%M:%S"` mysqlbinlog停止,返回代码:$?" | tee -a ${BACKUP_LOG}
 echo "${SLEEP_SECONDS}秒后再次连接并继续备份" | tee -a ${BACKUP_LOG} 
 sleep ${SLEEP_SECONDS}
done

本文介绍的解析过程(尤其是离线方式)有个问题,当数据表结构变更频繁(DDL)时会出现解析错误问题。(离线数据滞后,日志中的数据表结构可能与当前实际的数据表结构不一致,导致解析失败或数据解析错误),解决这个问题可以参考开源软件源代码的处理逻辑,比如maxwell。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐