概述

        DataSource 翻译过来为数据源,它是 jdk 提供的一个接口,然后只提供了两个 getConnection 方法,分别是无参数和需要传入用户名和密码。所以说它是跟数据库连接有关的东西,可以通过它来获取数据库连接。

public interface DataSource  extends CommonDataSource, Wrapper {

  /**
   * <p>Attempts to establish a connection with the data source that
   * this {@code DataSource} object represents.
   *
   * @return  a connection to the data source
   * @exception SQLException if a database access error occurs
   * @throws java.sql.SQLTimeoutException  when the driver has determined that the
   * timeout value specified by the {@code setLoginTimeout} method
   * has been exceeded and has at least tried to cancel the
   * current database connection attempt
   */
  Connection getConnection() throws SQLException;

  /**
   * <p>Attempts to establish a connection with the data source that
   * this {@code DataSource} object represents.
   *
   * @param username the database user on whose behalf the connection is
   *  being made
   * @param password the user's password
   * @return  a connection to the data source
   * @exception SQLException if a database access error occurs
   * @throws java.sql.SQLTimeoutException  when the driver has determined that the
   * timeout value specified by the {@code setLoginTimeout} method
   * has been exceeded and has at least tried to cancel the
   * current database connection attempt
   * @since 1.4
   */
  Connection getConnection(String username, String password)
    throws SQLException;
}

        但是这只是一个接口,具体怎么获取到数据库连接,还是由实现它的子类来决定。本文就是来讲一下 DruidDataSource, 为什么讲它呢?因为它是阿里写出来的,比较 diao。

DruidDataSource

        DruidDataSource 是阿里写出来的一个数据源, 它不仅可以获取数据库连接,还把这些数据库连接管理了起来,也就是所谓的数据库连接池。这样的话,当通过该数据源获取数据库连接的时候,如果数据库连接池里有可以使用的连接,那么就直接返回该连接,就省的每次获取数据库连接都要创建了。

        首先看一下怎么使用 DruidDataSource。

// 配置一个 DruidDataSource 实例 bean 交由 Spring 容器管理
@Configuration
public class DataSourceConfig {
    @Bean
    public DataSource dataSource() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        dataSource.setUrl("jdbc:mysql://localhost:3306/transaction?useSSL=false");
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        //连接池最小空闲的连接数
        dataSource.setMinIdle(5);
        //连接池最大活跃的连接数
        dataSource.setMaxActive(20);
        //初始化连接池时创建的连接数
        dataSource.setInitialSize(10);
        return dataSource;
    }
}

//测试类
public class DataSourceMain {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(DataSourceConfig.class);
        //获取 dataSource 实例
        DataSource dataSource = (DataSource) applicationContext.getBean("dataSource");
        try {
            //通过数据源获取数据库连接
            Connection connection = dataSource.getConnection();
            Statement statement = connection.createStatement();
            statement.execute("update AccountInfo set balance = balance + 1 where id = 1");
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    }
}

        从上可以看出,通过数据源访问跟 jdbc 方式相比,省略了实例化数据库连接驱动 Driver 驱动这一步,此外调用 getConnection() 获取连接的时候,并没有传用户名和密码,说明 dataSource 把这些配置信息都管理起来了。后面获取连接的逻辑会自给自足。

        总结来说,DruidDataSource 管理了数据库连接的一些配置信息,还帮助我们创建了连接驱动 Driver, 剩下的逻辑就跟 jdbc 一毛一样了。管理配置信息就不说了,无非就是定义一些变量,然后通过 set 方法给写进去。接下来我们开一下什么时候创建的 Driver, 以及怎么获取数据库连接的。

        首先进入 DruidDataSource 类的 getConnection() 方法

    public DruidPooledConnection getConnection() throws SQLException {
        return getConnection(maxWait);
    }

    //这个参数代表超时时间,过了这个时间还没有获取到连接就代表获取连接失败。
    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        //初始化方法
        init();

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }

        这里主要有两个方法,第一个是初始化方法,第二个是 getConnectionDirect(maxWaitMillis),很明显第二个是获取连接的,那么第一个是干什么的?

    public void init() throws SQLException {
        // inited 控制该初始化方法只调用一次,在方法后面会设置为 true,再次调用该方法就会直接返回。
        if (inited) {
            return;
        }

        // bug fixed for dead lock, for issue #2980
        //这一行是为了解决死锁问题,https://github.com/alibaba/druid/issues/2980 详情可以去这个连接看一下
        DruidDriver.getInstance();

        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

        boolean init = false;
        try {
            if (inited) {
                return;
            }

            initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

            this.id = DruidDriver.createDataSourceId();
            if (this.id > 1) {
                long delta = (this.id - 1) * 100000;
                this.connectionIdSeedUpdater.addAndGet(this, delta);
                this.statementIdSeedUpdater.addAndGet(this, delta);
                this.resultSetIdSeedUpdater.addAndGet(this, delta);
                this.transactionIdSeedUpdater.addAndGet(this, delta);
            }

            if (this.jdbcUrl != null) {
                this.jdbcUrl = this.jdbcUrl.trim();
                initFromWrapDriverUrl();
            }

            for (Filter filter : filters) {
                filter.init(this);
            }

            if (this.dbType == null || this.dbType.length() == 0) {
                this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
            }

            if (JdbcConstants.MYSQL.equals(this.dbType)
                    || JdbcConstants.MARIADB.equals(this.dbType)
                    || JdbcConstants.ALIYUN_ADS.equals(this.dbType)) {
                boolean cacheServerConfigurationSet = false;
                if (this.connectProperties.containsKey("cacheServerConfiguration")) {
                    cacheServerConfigurationSet = true;
                } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
                    cacheServerConfigurationSet = true;
                }
                if (cacheServerConfigurationSet) {
                    this.connectProperties.put("cacheServerConfiguration", "true");
                }
            }

            if (maxActive <= 0) {
                throw new IllegalArgumentException("illegal maxActive " + maxActive);
            }

            if (maxActive < minIdle) {
                throw new IllegalArgumentException("illegal maxActive " + maxActive);
            }

            if (getInitialSize() > maxActive) {
                throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
            }

            if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
                throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
            }

            if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
                throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
            }

            if (this.driverClass != null) {
                this.driverClass = driverClass.trim();
            }

            initFromSPIServiceLoader();
            //此时 driver 还没有初始化
            if (this.driver == null) {
                if (this.driverClass == null || this.driverClass.isEmpty()) {
                    this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
                }

                if (MockDriver.class.getName().equals(driverClass)) {
                    driver = MockDriver.instance;
                } else {
                    if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
                        throw new SQLException("url not set");
                    }
                    // 会调用这个方法获取 driver 实例
                    driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
                }
            } else {
                if (this.driverClass == null) {
                    this.driverClass = driver.getClass().getName();
                }
            }

            initCheck();

            initExceptionSorter();
            initValidConnectionChecker();
            validationQueryCheck();

            if (isUseGlobalDataSourceStat()) {
                dataSourceStat = JdbcDataSourceStat.getGlobal();
                if (dataSourceStat == null) {
                    dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
                    JdbcDataSourceStat.setGlobal(dataSourceStat);
                }
                if (dataSourceStat.getDbType() == null) {
                    dataSourceStat.setDbType(this.dbType);
                }
            } else {
                dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
            }
            dataSourceStat.setResetStatEnable(this.resetStatEnable);
            
            //初始化数据库连接池,其实就是个数组
            connections = new DruidConnectionHolder[maxActive];
            evictConnections = new DruidConnectionHolder[maxActive];
            keepAliveConnections = new DruidConnectionHolder[maxActive];

            SQLException connectError = null;

            if (createScheduler != null && asyncInit) {
                for (int i = 0; i < initialSize; ++i) {
                    submitCreateTask(true);
                }
            } else if (!asyncInit) {
                // init connections
                // 这个就是通过配置设置的初始化的连接的个数
                while (poolingCount < initialSize) {
                    try {
                        // 获取数据库连接
                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                        //将获取到的数据库连接放入到连接池中,也就是 connections 数组中
                        connections[poolingCount++] = holder;
                    } catch (SQLException ex) {
                        LOG.error("init datasource error, url: " + this.getUrl(), ex);
                        if (initExceptionThrow) {
                            connectError = ex;
                            break;
                        } else {
                            Thread.sleep(3000);
                        }
                    }
                }

                if (poolingCount > 0) {
                    poolingPeak = poolingCount;
                    poolingPeakTime = System.currentTimeMillis();
                }
            }

            createAndLogThread();
            //创建一个线程负责创建新的数据库连接
            createAndStartCreatorThread();
            createAndStartDestroyThread();

            initedLatch.await();
            init = true;

            initedTime = new Date();
            registerMbean();

            if (connectError != null && poolingCount == 0) {
                throw connectError;
            }

            if (keepAlive) {
                // async fill to minIdle
                if (createScheduler != null) {
                    for (int i = 0; i < minIdle; ++i) {
                        submitCreateTask(true);
                    }
                } else {
                    this.emptySignal();
                }
            }

        } catch (SQLException e) {
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (InterruptedException e) {
            throw new SQLException(e.getMessage(), e);
        } catch (RuntimeException e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (Error e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;

        } finally {
            // 设置 inited 为 true,当再次调用 init() 方法的时候就会直接返回。
            inited = true;
            lock.unlock();

            if (init && LOG.isInfoEnabled()) {
                String msg = "{dataSource-" + this.getID();

                if (this.name != null && !this.name.isEmpty()) {
                    msg += ",";
                    msg += this.name;
                }

                msg += "} inited";

                LOG.info(msg);
            }
        }
    }

        上面的代码量很多,但是我们目前先把目光放到主线代码中,那么很明显有几个重要的点

  • 获取 driver 实例 driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
  • 获取数据库连接 PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
  • 此外我们还知道了所谓连接池就是一个数组,它保存着一个个连接对象实例。

        首先看下如何获取 driver

 public static Driver createDriver(ClassLoader classLoader, String driverClassName) throws SQLException {
        Class<?> clazz = null;
        if (classLoader != null) {
            try {
                clazz = classLoader.loadClass(driverClassName);
            } catch (ClassNotFoundException e) {
                // skip
            }
        }

        if (clazz == null) {
            try {
                ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
                if (contextLoader != null) {
                    clazz = contextLoader.loadClass(driverClassName);
                }
            } catch (ClassNotFoundException e) {
                // skip
            }
        }

        if (clazz == null) {
            try {
                clazz = Class.forName(driverClassName);
            } catch (ClassNotFoundException e) {
                throw new SQLException(e.getMessage(), e);
            }
        }

        try {
            //通过反射获取实例,其实也就是 jdbc 获取 driver
            return (Driver) clazz.newInstance();
        } catch (IllegalAccessException e) {
            throw new SQLException(e.getMessage(), e);
        } catch (InstantiationException e) {
            throw new SQLException(e.getMessage(), e);
        }
    }

            可以看出,它跟使用 jdbc 时一样,都是通过反射获取到 driver 实例,然后我们再看一下如何获取一个连接

public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
        String url = this.getUrl();
        Properties connectProperties = getConnectProperties();

        String user;
        if (getUserCallback() != null) {
            user = getUserCallback().getName();
        } else {
            user = getUsername();
        }

        String password = getPassword();
        PasswordCallback passwordCallback = getPasswordCallback();

        if (passwordCallback != null) {
            if (passwordCallback instanceof DruidPasswordCallback) {
                DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;

                druidPasswordCallback.setUrl(url);
                druidPasswordCallback.setProperties(connectProperties);
            }

            char[] chars = passwordCallback.getPassword();
            if (chars != null) {
                password = new String(chars);
            }
        }

        Properties physicalConnectProperties = new Properties();
        if (connectProperties != null) {
            physicalConnectProperties.putAll(connectProperties);
        }

        if (user != null && user.length() != 0) {
            physicalConnectProperties.put("user", user);
        }

        if (password != null && password.length() != 0) {
            physicalConnectProperties.put("password", password);
        }

        Connection conn = null;

        long connectStartNanos = System.nanoTime();
        long connectedNanos, initedNanos, validatedNanos;

        Map<String, Object> variables = initVariants
                ? new HashMap<String, Object>()
                : null;
        Map<String, Object> globalVariables = initGlobalVariants
                ? new HashMap<String, Object>()
                : null;

        createStartNanosUpdater.set(this, connectStartNanos);
        creatingCountUpdater.incrementAndGet(this);
        try {
            // 这时physicalConnectProperties已经拼装好了配置信息,也获取到了url
            conn = createPhysicalConnection(url, physicalConnectProperties);
            connectedNanos = System.nanoTime();

            if (conn == null) {
                throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
            }

            initPhysicalConnection(conn, variables, globalVariables);
            initedNanos = System.nanoTime();

            validateConnection(conn);
            validatedNanos = System.nanoTime();

            setFailContinuous(false);
            setCreateError(null);
        } catch (SQLException ex) {
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } catch (RuntimeException ex) {
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } catch (Error ex) {
            createErrorCountUpdater.incrementAndGet(this);
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
        } finally {
            long nano = System.nanoTime() - connectStartNanos;
            createTimespan += nano;
            creatingCountUpdater.decrementAndGet(this);
        }

        return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
    }
    public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
        Connection conn;
        if (getProxyFilters().size() == 0) {
            // 这里是获取到我们刚刚实例化的 driver, 然后再调用它的 connect 方法
            conn = getDriver().connect(url, info);
        } else {
            conn = new FilterChainImpl(this).connection_connect(info);
        }

        createCountUpdater.incrementAndGet(this);

        return conn;
    }

        从上可以看出,获取数据库连接首先也是要获取到 driver 然后再调用它的 connect 方法。

        总的来说,上面代码的主线逻辑就是先实例化 driver, 然后通过 driver 获取一个数据库连接,这其实也就是 jdbc 的方式,只不过别让帮我们做了这件事情,我们不用再自己做了而已。并且DruidDataSource 在获取到连接之后,还会把这个连接存起来,以免每次请求连接都要重新从数据库获取一个连接。

        上一部分讲的是初始化方法 init(), 别忘了我们是讲 DruidDataSource 获取数据库连接方法的,如下

    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        // 执行初始化方法后,这时连接池里已经有了数据库连接了
        init();

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }

        我们需要看一下 getConnectionDirect(maxWaitMillis) 是怎么获取连接的

    public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;
        for (;;) {
            // handle notFullTimeoutRetry
            DruidPooledConnection poolableConnection;
            try {
                //很明显这个方法就是获取数据库连接的, 剩下的代码可以先不看,直接跳到getConnectionInternal
                poolableConnection = getConnectionInternal(maxWaitMillis);
            } catch (GetConnectionTimeoutException ex) {
                if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                    notFullTimeoutRetryCnt++;
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
                    }
                    continue;
                }
                throw ex;
            }

            。。。。。。。。。。。//省略下面的代码
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {

               。。。。。。。。。。//省略该方法前面的一些代码


                // 等待超时时间,默认是 -1
                if (maxWait > 0) {
                    holder = pollLast(nanos);
                } else {
                    holder = takeLast();
                }

                if (holder != null) {
                    activeCount++;
                    if (activeCount > activePeak) {
                        activePeak = activeCount;
                        activePeakTime = System.currentTimeMillis();
                    }
                }
            } catch (InterruptedException e) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw new SQLException(e.getMessage(), e);
            } catch (SQLException e) {
                connectErrorCountUpdater.incrementAndGet(this);
                throw e;
            } finally {
                lock.unlock();
            }

            break;
        }

        
        DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
        return poolalbeConnection;
    }

        我们在代码里看到 pollLast 和 takeLast 是获取数据库连接的,这两个方法差不多,pollLast()会加入时间的判断,但是核心逻辑都是一样的,所以我们跟一下默认的 takeLast 方法

DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
        try {
            //如果没有设置连接池初始化大小,则会重新创建一个连接放入连接池, 如果设置了初始化大小,则在调用init()方法的时候会创建一些连接放在连接池中保存。
            while (poolingCount == 0) {
                emptySignal(); // send signal to CreateThread create connection

                if (failFast && isFailContinuous()) {
                    throw new DataSourceNotAvailableException(createError);
                }

                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                    notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                }
                try {
                    notEmpty.await(); // signal by recycle or creator
                } finally {
                    notEmptyWaitThreadCount--;
                }
                notEmptyWaitCount++;

                if (!enable) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    if (disableException != null) {
                        throw disableException;
                    }

                    throw new DataSourceDisableException();
                }
            }
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }

        //将数据库连接池中的连接数减一
        decrementPoolingCount();
        //获取数据库连接池中的的一个连接
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;

        return last;
    }

        通过上面的分析,我们应该已经知道,当调用 Connection connection = dataSource.getConnection() 的时候,如果是第一次调用,则会在 init() 方法的时候创建一些连接放入数据库连接池里,然后获取连接的时候,直接从连接池中获取。

总结

        DruidDataSource 博大精深,我只是分析了其中一些基本的功能,还有很多代码没有分析,但是本文的主要目的是为了让大家了解 DataSource 是个啥玩意,我们现在应该知道了它就是管理数据库连接的,如果需要一个连接的话问他要就行了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐