PoolEntry 是 HikariCP 中对数据库物理连接的封装。 那我们现在探索问题的关键点就是:
我们先看下 HikariCP 中数据源、连接、连接池之间的关系。
连接池的初始化过程中 HikariCP 做了很多工作,如校验配置等。在此,我们只讨论连接的创建过程。在连接池的初始化过程中一共有 3 种创建连接的过程:
/**
* Creating new poolEntry. If maxLifetime is configured, create a future End-of-life task with 2.5% variance from
* the maxLifetime time to ensure there is no massive die-off of Connections in the pool.
*/
private PoolEntry createPoolEntry()
{
try {
final PoolEntry poolEntry = newPoolEntry();
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
// 对每个连接的 maxLifetime 设置一些偏差,避免大量连接同时失效
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(new MaxLifetimeTask(poolEntry), lifetime, MILLISECONDS));
}
final long keepaliveTime = config.getKeepaliveTime();
if (keepaliveTime > 0) {
// variance up to 10% of the heartbeat time
final long variance = ThreadLocalRandom.current().nextLong(keepaliveTime / 10);
final long heartbeatTime = keepaliveTime - variance;
poolEntry.setKeepalive(houseKeepingExecutorService.scheduleWithFixedDelay(new KeepaliveTask(poolEntry), heartbeatTime, heartbeatTime, MILLISECONDS));
}
return poolEntry;
}
catch (ConnectionSetupException e) {
if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause());
lastConnectionFailure.set(e);
}
}
catch (Exception e) {
if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
logger.debug("{} - Cannot acquire connection from data source", poolName, e);
}
}
return null;
}
我们可以看到,创建连接 PoolEntry 时,会注册两个异步延时任务:
evit = true
,如果连接不是使用中状态的话则关闭连接,调用 addBagItem(final int waiting) 方法;evit = true
同时在注册这两个异步延时任务时,注意到对两个异步任务的延迟时间都做了特殊处理,分别增加了一定范围的时间变化(MaxLifetimeTask, 2.5%;KeepaliveTask:10%)。主要是避免同时连接过期,连接同时进行心跳检测。
数据库物理连接的创建都是由 PoolBase#newConnection 完成的。代码具体内容如下:
/**
* Obtain connection from data source.
*
* @return a Connection connection
*/
private Connection newConnection() throws Exception
{
final long start = currentTime();
Connection connection = null;
try {
String username = config.getUsername();
String password = config.getPassword();
connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);
if (connection == null) {
throw new SQLTransientConnectionException("DataSource returned null unexpectedly");
}
setupConnection(connection);
lastConnectionFailure.set(null);
return connection;
}
catch (Exception e) {
if (connection != null) {
quietlyCloseConnection(connection, "(Failed to create/setup connection)");
}
else if (getLastConnectionFailure() == null) {
logger.debug("{} - Failed to create/setup connection: {}", poolName, e.getMessage());
}
lastConnectionFailure.set(e);
throw e;
}
finally {
// tracker will be null during failFast check
if (metricsTracker != null) {
metricsTracker.recordConnectionCreated(elapsedMillis(start));
}
}
}
当 HikariConfig 没有配置 dataSource 时,DataSource#getConnection 是由 hikari 中的实现类 DriverDataSource#getConnection 完成的,其代码如下:
@Override
public Connection getConnection(final String username, final String password) throws SQLException
{
final Properties cloned = (Properties) driverProperties.clone();
if (username != null) {
cloned.put(USER, username);
if (cloned.containsKey("username")) {
cloned.put("username", username);
}
}
if (password != null) {
cloned.put(PASSWORD, password);
}
return driver.connect(jdbcUrl, cloned);
}
创建的连接 PoolEntry 通过 ConcurrentBag#add 加入到了连接池中:
/**
* Add a new object to the bag for others to borrow.
*
* @param bagEntry an object to add to the bag
*/
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
综上,HikariCP 创建连接的时序图如下:
连接的创建开始于连接池的初始化。无论我们以 HikariConfig 还是 no-args 的方式配置,连接池的初始化都是一样的,这个阶段会在快速失败阶段和启动管家线程的方式进行连接的创建:
/**
* Construct a HikariPool with the specified configuration.
*
* @param config a HikariConfig instance
*/
public HikariPool(final HikariConfig config)
{
super(config);
this.connectionBag = new ConcurrentBag<>(this);
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
// 初始化管家定时执行服务
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
// 快速失败,这个阶段只创建了一个数据库连接
checkFailFast();
if (config.getMetricsTrackerFactory() != null) {
setMetricsTrackerFactory(config.getMetricsTrackerFactory());
}
else {
setMetricRegistry(config.getMetricRegistry());
}
// 设置健康检查
setHealthCheckRegistry(config.getHealthCheckRegistry());
// 注册 MBean
handleMBeans(this, true);
ThreadFactory threadFactory = config.getThreadFactory();
final int maxPoolSize = config.getMaximumPoolSize();
LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize);
this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue);
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
// 开始执行管家任务,开始管理数据库连接
// 在初始化阶段主要是添加数据库连接
// 在后期也负责关闭空闲连接
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {
addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
final long startTime = currentTime();
while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) {
quietlySleep(MILLISECONDS.toMillis(100));
}
addConnectionExecutor.setCorePoolSize(1);
addConnectionExecutor.setMaximumPoolSize(1);
}
}
checkFailFast 具体内容如下:
/**
* If initializationFailFast is configured, check that we have DB connectivity.
*
* @throws PoolInitializationException if fails to create or validate connection
* @see HikariConfig#setInitializationFailTimeout(long)
*/
private void checkFailFast()
{
final long initializationTimeout = config.getInitializationFailTimeout();
if (initializationTimeout < 0) {
return;
}
final long startTime = currentTime();
do {
final PoolEntry poolEntry = createPoolEntry();
if (poolEntry != null) {
if (config.getMinimumIdle() > 0) {
connectionBag.add(poolEntry);
logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
}
else {
quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
}
return;
}
if (getLastConnectionFailure() instanceof ConnectionSetupException) {
throwPoolInitializationException(getLastConnectionFailure().getCause());
}
quietlySleep(SECONDS.toMillis(1));
} while (elapsedMillis(startTime) < initializationTimeout);
if (initializationTimeout > 0) {
throwPoolInitializationException(getLastConnectionFailure());
}
}
从上可以看出,快速失败检查阶段创建了数据库物理连接实在创建连接池,此时如果创建数据库物理连接失败,则创建连接池的过程就会停止,不会进入真正的创建连接池的阶段。
另外,在连接池的构造过程中,我们可以看到创建了多个线程池:
addConnectionExecutor:用于物理连接的创建。
closeConnectionExecutor:用于物理连接的关闭。
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
addConnectionExecutor 采用的是 DiscardPolicy ,在任务满了的情况下丢弃被拒绝的任务,不会产生异常。任务队列的长度就是 maxPoolSize .
连接池的构造最后阶段即是开启了管家线程 HouseKeeper . 管家线程的主要功能就是管理线程池,创建、关闭连接。
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
private final class HouseKeeper implements Runnable
{
private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs);
@Override
public void run()
{
try {
// refresh values in case they changed via MBean
connectionTimeout = config.getConnectionTimeout();
validationTimeout = config.getValidationTimeout();
leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog;
final long idleTimeout = config.getIdleTimeout();
final long now = currentTime();
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
poolName, elapsedDisplayString(previous, now));
previous = now;
softEvictConnections();
return;
}
else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) {
// No point evicting for forward clock motion, this merely accelerates connection retirement anyway
logger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
}
previous = now;
// 关闭连接,维持最小连接数
String afterPrefix = "Pool ";
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState("Before cleanup ");
afterPrefix = "After cleanup ";
final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
int toRemove = notInUse.size() - config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
closeConnection(entry, "(connection has passed idleTimeout)");
toRemove--;
}
}
}
//
logPoolState(afterPrefix);
// 增加连接,维持最小连接数
fillPool(); // Try to maintain minimum connections
}
catch (Exception e) {
logger.error("Unexpected exception in housekeeping task", e);
}
}
}
其中 fillPool() 会通过 addConnectionExecutor 调用 PoolEntryCreator 进行连接的创建。代码如下:
/**
* Fill pool up from current idle connections (as they are perceived at the point of execution) to minimumIdle connections.
*/
private synchronized void fillPool()
{
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueueReadOnlyView.size();
if (connectionsToAdd <= 0) logger.debug("{} - Fill pool skipped, pool is at sufficient level.", poolName);
for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
}
}
从 fillPool() 代码可看出其主要职责就是维持连接池最小的连接数。