Browse Source

Pull request #5777: REPORT-71021 恢复创建线程启停机制

Merge in CORE/base-third from ~RINOUX/base-third:release/11.0 to release/11.0

* commit '1d18148c9765a8d3bd17addfb308708ba830e438':
  REPORT-71021 恢复创建线程启停机制
release/11.0
rinoux 3 years ago
parent
commit
ab35235c48
  1. 2
      fine-druid/readme.MD
  2. 173
      fine-druid/src/main/java/com/fr/third/alibaba/druid/pool/DruidDataSource.java

2
fine-druid/readme.MD

@ -1,4 +1,5 @@
# Alibaba Druid # Alibaba Druid
- FineReport更新时间 `2022-04-27` - FineReport更新时间 `2022-04-27`
- Druid版本 1.2.9 - Druid版本 1.2.9
- [github地址](https://github.com/alibaba/druid) - [github地址](https://github.com/alibaba/druid)
@ -14,3 +15,4 @@
| 1.2.9 | 2022-05-05 | MysqlUtils.getLastPacketReceivedTimeMs根据类加载器区分连接实现等,不在使用全局变量 | | 1.2.9 | 2022-05-05 | MysqlUtils.getLastPacketReceivedTimeMs根据类加载器区分连接实现等,不在使用全局变量 |
| 1.2.9 | 2022-05-05 | com.fr.third.alibaba.druid.util.Utils.loadClass改为优先从线程类加载器加载类 | | 1.2.9 | 2022-05-05 | com.fr.third.alibaba.druid.util.Utils.loadClass改为优先从线程类加载器加载类 |
| 1.2.9 | 2022-05-05 | 恢复com.fr.third.alibaba.druid.pool.DruidDataSourceFactory对hibernate配置属性的支持 | | 1.2.9 | 2022-05-05 | 恢复com.fr.third.alibaba.druid.pool.DruidDataSourceFactory对hibernate配置属性的支持 |
| 1.2.9 | 2022-05-10 | 恢复DruidDataSource中的创建线程启停机制 |

173
fine-druid/src/main/java/com/fr/third/alibaba/druid/pool/DruidDataSource.java

@ -134,6 +134,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
private volatile Future<?> createSchedulerFuture; private volatile Future<?> createSchedulerFuture;
private CreateConnectionThread createConnectionThread; private CreateConnectionThread createConnectionThread;
private PeriodDetectionThread periodDetectionThread;
private DestroyConnectionThread destroyConnectionThread; private DestroyConnectionThread destroyConnectionThread;
private LogStatsThread logStatsThread; private LogStatsThread logStatsThread;
private int createTaskCount; private int createTaskCount;
@ -141,7 +142,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
private volatile long createTaskIdSeed = 1L; private volatile long createTaskIdSeed = 1L;
private long[] createTasks; private long[] createTasks;
private final CountDownLatch initedLatch = new CountDownLatch(2); private CountDownLatch initedLatch = new CountDownLatch(2);
private volatile boolean enable = true; private volatile boolean enable = true;
@ -161,6 +162,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
public static ThreadLocal<Long> waitNanosLocal = new ThreadLocal<Long>(); public static ThreadLocal<Long> waitNanosLocal = new ThreadLocal<Long>();
private boolean logDifferentThread = true; private boolean logDifferentThread = true;
private volatile boolean keepAlive = false; private volatile boolean keepAlive = false;
private SQLException initException = null;
private boolean asyncInit = false; private boolean asyncInit = false;
protected boolean killWhenSocketReadTimeout = false; protected boolean killWhenSocketReadTimeout = false;
protected boolean checkExecuteTime = false; protected boolean checkExecuteTime = false;
@ -526,6 +528,45 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} }
} }
private synchronized void doSomethingBeforeCreationThreadBreak() {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this) + this.getUrl();
createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.setStarted(false);
String destroyName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this) + this.getUrl();
if (destroyConnectionThread != null) {
if (!destroyConnectionThread.isInterrupted()) {
destroyConnectionThread.interrupt();
}
}
destroyConnectionThread = new DestroyConnectionThread(destroyName);
destroyConnectionThread.setStarted(false);
initedLatch = new CountDownLatch(2);
}
private void checkThread() throws SQLException {
if (createConnectionThread == null) {
throw new IllegalStateException("createConnectionThread not start!");
}
if (destroyConnectionThread == null) {
throw new IllegalStateException("destroyConnectionThread not start!");
}
if (!createConnectionThread.isStarted() && !destroyConnectionThread.isStarted()) {
synchronized (this) {//线程安全问题,加个双检锁
if (!createConnectionThread.isStarted() && !destroyConnectionThread.isStarted()) {
createConnectionThread.setStarted(true);
createConnectionThread.start();
destroyConnectionThread.setStarted(true);
destroyConnectionThread.start();
try {
initedLatch.await();
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
}
}
}
}
}
public boolean isKillWhenSocketReadTimeout() { public boolean isKillWhenSocketReadTimeout() {
return killWhenSocketReadTimeout; return killWhenSocketReadTimeout;
} }
@ -798,6 +839,11 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} }
public void init() throws SQLException { public void init() throws SQLException {
if (initException != null) {
LOG.error("{dataSource-" + this.getID() + "} init error", initException);
throw initException;
}
if (inited) { if (inited) {
return; return;
} }
@ -944,6 +990,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
createAndLogThread(); createAndLogThread();
createAndStartCreatorThread(); createAndStartCreatorThread();
createAndStartDestroyThread(); createAndStartDestroyThread();
createAndStartDetectThread();
initedLatch.await(); initedLatch.await();
init = true; init = true;
@ -968,16 +1015,13 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} catch (SQLException e) { } catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e); LOG.error("{dataSource-" + this.getID() + "} init error", e);
initException = e;
throw e; throw e;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e); throw new SQLException(e.getMessage(), e);
} catch (RuntimeException e){ } catch (Throwable e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e); initException = new SQLException(e.getMessage());
throw e;
} catch (Error e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e; throw e;
} finally { } finally {
inited = true; inited = true;
lock.unlock(); lock.unlock();
@ -1087,7 +1131,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
protected void createAndStartCreatorThread() { protected void createAndStartCreatorThread() {
if (createScheduler == null) { if (createScheduler == null) {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this) + this.getUrl();
createConnectionThread = new CreateConnectionThread(threadName); createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.start(); createConnectionThread.start();
return; return;
@ -1096,6 +1140,15 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
initedLatch.countDown(); initedLatch.countDown();
} }
private void createAndStartDetectThread() {
if (createScheduler == null) {
String threadName = "Druid-ConnectionPool-Detection-" + System.identityHashCode(this) + this.getUrl();
periodDetectionThread = new PeriodDetectionThread(threadName);
periodDetectionThread.start();
}
}
/** /**
* load filters from SPI ServiceLoader * load filters from SPI ServiceLoader
* *
@ -1181,21 +1234,21 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
return; return;
} }
String errorMessage = ""; String infoMessage = "";
if (testOnBorrow) { if (isTestOnBorrow()) {
errorMessage += "testOnBorrow is true, "; infoMessage += "testOnBorrow is true, ";
} }
if (testOnReturn) { if (isTestOnReturn()) {
errorMessage += "testOnReturn is true, "; infoMessage += "testOnReturn is true, ";
} }
if (testWhileIdle) { if (isTestWhileIdle()) {
errorMessage += "testWhileIdle is true, "; infoMessage += "testWhileIdle is true, ";
} }
LOG.error(errorMessage + "validationQuery not set"); LOG.info(infoMessage + "validationQuery not set");
} }
protected void resolveDriver() throws SQLException { protected void resolveDriver() throws SQLException {
@ -1402,6 +1455,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init(); init();
checkThread();
if (filters.size() > 0) { if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this); FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis); return filterChain.dataSource_connect(this, maxWaitMillis);
@ -1756,7 +1810,9 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
buf.append(", createErrorCount ").append(createErrorCount); buf.append(", createErrorCount ").append(createErrorCount);
} }
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList(); JdbcDataSourceStat sourceStat = this.getDataSourceStat();
if (sourceStat != null) {
List<JdbcSqlStatValue> sqlList = sourceStat.getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) { for (int i = 0; i < sqlList.size(); ++i) {
if (i != 0) { if (i != 0) {
buf.append('\n'); buf.append('\n');
@ -1764,10 +1820,12 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
buf.append(", "); buf.append(", ");
} }
JdbcSqlStatValue sql = sqlList.get(i); JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount ").append(sql.getRunningCount()); buf.append("runningSqlCount ");
buf.append(sql.getRunningCount());
buf.append(" : "); buf.append(" : ");
buf.append(sql.getSql()); buf.append(sql.getSql());
} }
}
String errorMessage = buf.toString(); String errorMessage = buf.toString();
@ -1863,8 +1921,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
dataSourceLock.lock(); dataSourceLock.lock();
try { try {
emptySignal(); emptySignal();
} } finally {
finally {
dataSourceLock.unlock(); dataSourceLock.unlock();
} }
} }
@ -2097,6 +2154,10 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
destroyConnectionThread.interrupt(); destroyConnectionThread.interrupt();
} }
if (periodDetectionThread != null) {
periodDetectionThread.interrupt();
}
if (createSchedulerFuture != null) { if (createSchedulerFuture != null) {
createSchedulerFuture.cancel(true); createSchedulerFuture.cancel(true);
} }
@ -2769,6 +2830,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} }
public class CreateConnectionThread extends Thread { public class CreateConnectionThread extends Thread {
private volatile boolean started = true;
public CreateConnectionThread(String name) { public CreateConnectionThread(String name) {
super(name); super(name);
@ -2829,6 +2891,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
if ((!closing) && (!closed)) { if ((!closing) && (!closed)) {
LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e); LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
} }
DruidDataSource.this.doSomethingBeforeCreationThreadBreak();
break; break;
} finally { } finally {
lock.unlock(); lock.unlock();
@ -2838,9 +2901,13 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
try { try {
connection = createPhysicalConnection(); connection = createPhysicalConnection();
} catch (SQLException e) { } catch (SQLException | RuntimeException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode() if (e instanceof SQLException) {
+ ", state " + e.getSQLState(), e); LOG.error("create connection error, url: " + jdbcUrl + ", errorCode " + ((SQLException) e).getErrorCode()
+ ", state " + ((SQLException) e).getSQLState(), e);
} else {
LOG.error("create connection error", e);
}
errorCount++; errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
@ -2861,17 +2928,16 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
try { try {
Thread.sleep(timeBetweenConnectErrorMillis); Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) { } catch (InterruptedException ignore) {
break;
} }
DruidDataSource.this.doSomethingBeforeCreationThreadBreak();
break;
} }
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) { } catch (Error e) {
LOG.error("create connection Error", e); LOG.error("create connection Error", e);
setFailContinuous(true); setFailContinuous(true);
DruidDataSource.this.doSomethingBeforeCreationThreadBreak();
break; break;
} }
@ -2892,10 +2958,48 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} }
} }
} }
public boolean isStarted() {
return started;
}
public void setStarted(boolean started) {
this.started = started;
}
}
//周期性检查生产线程状态,因为在终止生产线程的时候,为了不让生产线程疯狂重试数据库,只是生成了一个生产线程,但是并没有start,需要一个守护线程
//周期性检查线程状态,帮助其启动。
private class PeriodDetectionThread extends Thread {
public PeriodDetectionThread(String name) {
super(name);
this.setDaemon(true);
}
public void run() {
while (true) {
synchronized (DruidDataSource.this) {
//生产线程发生了切换,并且有线程在等待连接,需要主动唤醒生产线程,否则由getConnection方法来唤醒生产线程
if (!createConnectionThread.started && !destroyConnectionThread.started && notEmptyWaitThreadCount > 0) {
createConnectionThread.setStarted(true);
createConnectionThread.start();
destroyConnectionThread.setStarted(true);
destroyConnectionThread.start();
}
}
try {
Thread.sleep(30000);
} catch (InterruptedException ignore) {
break;
}
}
}
} }
public class DestroyConnectionThread extends Thread { public class DestroyConnectionThread extends Thread {
private volatile boolean started = true;
public DestroyConnectionThread(String name) { public DestroyConnectionThread(String name) {
super(name); super(name);
this.setDaemon(true); this.setDaemon(true);
@ -2928,6 +3032,13 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} }
} }
public boolean isStarted() {
return started;
}
public void setStarted(boolean started) {
this.started = started;
}
} }
public class DestroyTask implements Runnable { public class DestroyTask implements Runnable {
@ -3049,7 +3160,9 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
return removeCount; return removeCount;
} }
/** Instance key */ /**
* Instance key
*/
protected String instanceKey = null; protected String instanceKey = null;
public Reference getReference() throws NamingException { public Reference getReference() throws NamingException {

Loading…
Cancel
Save