1 应用场景

向MySQL数据库(InnoDB引擎):插入大量数据,如10万条数据,对于离线任务而言,
可以选择单线程也可以选择多线程,
本文通过对比单线程和多线程插入数据的速度,建议使用多线程。
单线程:直接使用MySQL原生客户端或者Workbench客户端直接使用SQL语句执行,或者,使用Java实现;
多线程:使用Java API实现,进行测试。

2 测试

2.1 单线程

一个线程插入10万条数据。

package com.monkey.java_study.thread;

import com.monkey.java_study.common.config.ThreadPoolConfig;
import com.monkey.java_study.database.DatabaseConnectionFactory;
import com.monkey.java_study.database.IDatabaseConnection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.monkey.java_study.common.constant.DatabaseConstant.MY_SQL;

/**
 * 线程池测试.
 *
 * @author xindaqi
 * @date 2021-11-26 10:40
 */
public class ThreadPoolTest {

    private static final Logger logger = LogManager.getLogger(ThreadPoolTest.class);

    private static final Integer NUMBER_OF_THREADS = 5;
    
    /**
     * 单线程插入数据.
     */
    public static void singleThreadInsertDataTest() {

        DatabaseConnectionFactory databaseConnectionFactory = new DatabaseConnectionFactory();
        IDatabaseConnection databaseConnection = databaseConnectionFactory.databaseConnection(MY_SQL);
        Connection conn = databaseConnection.databaseLink();
        Statement stmt = null;
        try {
            final Integer ROW_NUMBER_START = 0;
            final Integer ROW_NUMBER_END = 100000;
            Connection connUse = conn;
            stmt = connUse.createStatement();
            String emb = "'test_data'";
            long start = System.currentTimeMillis();
            for (int i = ROW_NUMBER_START; i < ROW_NUMBER_END; i++) {
                String sql = String.format("INSERT INTO `db_monkey_run`.`tb_test_insert` (emb) VALUES (%s)", emb);
                int flag = stmt.executeUpdate(sql);
            }
            long end = System.currentTimeMillis();
            long timeCost = end - start;

            BigDecimal timeCostBd = new BigDecimal(timeCost);
            long ONE_MINUTE_UNIT = 60000L;
            BigDecimal unitBd = new BigDecimal(ONE_MINUTE_UNIT);
            double timeCostMin = timeCostBd.divide(unitBd, 3, BigDecimal.ROUND_HALF_UP).doubleValue();
            logger.info(">>>>>>>>>Single thread insert database, data number:{}, time cost:{} min", ROW_NUMBER_END, timeCostMin);
        } catch (SQLException se) {
            throw new RuntimeException(se);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException se1) {
                throw new RuntimeException(se1);
            }
        }
    }

    public static void main(String[] args) {
        try {
            singleThreadInsertDataTest();
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        } finally {
            ThreadPoolConfig.threadPoolExecutorGenerate.shutdown();
        }
    }
}

测试结果如图2.1所示。由测试结果可是,单线程插入10万条数据耗时2.653分钟。
在这里插入图片描述

图2.1 单线程插入数据

2.2 多线程

多线程从线程池中获取线程,线程池配置如下。

2.2.1 线程池

package com.monkey.java_study.common.config;

import com.monkey.java_study.common.constant.ThreadPoolConstant;

import java.util.concurrent.*;

/**
 * 线程池配置.
 *
 * @author xindaqi
 * @date 2021-11-24 15:31
 */
public class ThreadPoolConfig {

    /**
     * 线程池
     */
    public static ExecutorService threadPoolExecutorGenerate = new ThreadPoolExecutor(
            ThreadPoolConstant.CORE_THREAD_NUM,
            ThreadPoolConstant.MAX_THREAD_NUM,
            ThreadPoolConstant.KEEP_ALIVE_TIME_SECONDS,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(ThreadPoolConstant.QUEUE_LENGTH),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
}

线程池常量

package com.monkey.java_study.common.constant;

/**
 * 线程池常量.
 *
 * @author xindaqi
 * @date 2021-07-22 16:48
 */
public class ThreadPoolConstant {

    /**
     * 核心线程数量
     */
    public static final int CORE_THREAD_NUM = 10;

    /**
     * 最大线程数量
     */
    public static final int MAX_THREAD_NUM = 15;

    /**
     * 非核心线程存活时间
     */
    public static final long KEEP_ALIVE_TIME_SECONDS = 1L;

    /**
     * 任务队列长度
     */
    public static final int QUEUE_LENGTH = 8;

    /**
     * 线程超时时间
     */
    public static final long TIME_OUT = 70;

    private ThreadPoolConstant() {
        throw new AssertionError(ThreadPoolConstant.class.getName() + StringConstant.PRIVATE_INSTANCE_ERROR);
    }
}

2.2.2 多线程写入

五个线程,每个线程插入2万条数据。

package com.monkey.java_study.thread;

import com.monkey.java_study.common.config.ThreadPoolConfig;
import com.monkey.java_study.database.DatabaseConnectionFactory;
import com.monkey.java_study.database.IDatabaseConnection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.monkey.java_study.common.constant.DatabaseConstant.MY_SQL;

/**
 * 线程池测试.
 *
 * @author xindaqi
 * @date 2021-11-26 10:40
 */
public class ThreadPoolTest {

    private static final Logger logger = LogManager.getLogger(ThreadPoolTest.class);

    private static final Integer NUMBER_OF_THREADS = 5;

    /**
     * 多线程插入数据.
     *
     * @param threadNumber 线程号
     * @param batch        数据量
     */
    public static void multipleThreadInsertDataTest(int threadNumber, int batch) {

        DatabaseConnectionFactory databaseConnectionFactory = new DatabaseConnectionFactory();
        IDatabaseConnection databaseConnection = databaseConnectionFactory.databaseConnection(MY_SQL);
        Connection conn = databaseConnection.databaseLink();
        Statement stmt = null;
        try {
            final Integer ROW_NUMBER_START = threadNumber * batch;
            final Integer ROW_NUMBER_END = threadNumber * batch + batch;
            Connection connUse = conn;
            stmt = connUse.createStatement();
            String emb = "'test_data'";
            long start = System.currentTimeMillis();
            for (int i = ROW_NUMBER_START; i < ROW_NUMBER_END; i++) {
                String sql = String.format("INSERT INTO `db_monkey_run`.`tb_test_insert` (emb) VALUES (%s)", emb);
                int flag = stmt.executeUpdate(sql);
            }
            long end = System.currentTimeMillis();
            long timeCost = end - start;
            BigDecimal timeCostBd = new BigDecimal(timeCost);
            long ONE_MINUTE_UNIT = 60000L;
            BigDecimal unitBd = new BigDecimal(ONE_MINUTE_UNIT);
            double timeCostMin = timeCostBd.divide(unitBd, 3, BigDecimal.ROUND_HALF_UP).doubleValue();
            logger.info(">>>>>>>>>Multiple thread insert database, data number:{}, time cost:{} min", ROW_NUMBER_END, timeCostMin);
        } catch (SQLException se) {
            throw new RuntimeException(se);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException se1) {
                throw new RuntimeException(se1);
            }
        }
    }

    public static void main(String[] args) {
        try {
            // 批量数据:2万条
            int insertBatch = 20000;
            // 5个线程:每个线程插入20万条数据
            for (int i = 0; i < NUMBER_OF_THREADS; i++) {
                final int thread = i;
                ThreadPoolConfig.threadPoolExecutorGenerate.submit(() -> multipleThreadInsertDataTest(thread, insertBatch));
            }

        } catch (Exception ex) {
            throw new RuntimeException(ex);
        } finally {
            ThreadPoolConfig.threadPoolExecutorGenerate.shutdown();
        }
    }
}

测试结果如图2.2所示。由测试结果可是,单线程插入10万条数据耗时2.653分钟。
在这里插入图片描述

图2.2 多线程插入数据

3 小结

序号线程时间
1单线程2.635分钟/线程
2多线程0.861分钟/线程
  • 多线程处理数据比单线程处理速度快;
  • 对于批量插入数据任务,将数据拆分成批量数据,多线程执行;
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐