Browse Source

Merge pull request #71 in CORE/base-third from ~ABEL.CHEN/base-third:release/9.0 to release/9.0

* commit 'c9b7d98df8823e4781955ab7aaeb140b1080e280':
  中断废弃的线程
  destory 线程也需要停止
  druidDatasource断线重连机制有点问题
release/9.0
abel.chen 6 years ago
parent
commit
ea9cecd0ff
  1. 154
      fine-druid/src/com/fr/third/alibaba/druid/pool/DruidDataSource.java

154
fine-druid/src/com/fr/third/alibaba/druid/pool/DruidDataSource.java

@ -15,6 +15,43 @@
*/ */
package com.fr.third.alibaba.druid.pool; package com.fr.third.alibaba.druid.pool;
import java.io.Closeable;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.JMException;
import javax.management.MBeanRegistration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import javax.sql.ConnectionEvent;
import javax.sql.ConnectionEventListener;
import javax.sql.ConnectionPoolDataSource;
import javax.sql.PooledConnection;
import com.fr.third.alibaba.druid.Constants; import com.fr.third.alibaba.druid.Constants;
import com.fr.third.alibaba.druid.TransactionTimeoutException; import com.fr.third.alibaba.druid.TransactionTimeoutException;
import com.fr.third.alibaba.druid.VERSION; import com.fr.third.alibaba.druid.VERSION;
@ -58,41 +95,6 @@ import com.fr.third.alibaba.druid.util.Utils;
import com.fr.third.alibaba.druid.wall.WallFilter; import com.fr.third.alibaba.druid.wall.WallFilter;
import com.fr.third.alibaba.druid.wall.WallProviderStatValue; import com.fr.third.alibaba.druid.wall.WallProviderStatValue;
import javax.management.JMException;
import javax.management.MBeanRegistration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import javax.sql.ConnectionEvent;
import javax.sql.ConnectionEventListener;
import javax.sql.ConnectionPoolDataSource;
import javax.sql.PooledConnection;
import java.io.Closeable;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* @author ljw [ljw2083@alibaba-inc.com] * @author ljw [ljw2083@alibaba-inc.com]
* @author wenshao [szujobs@hotmail.com] * @author wenshao [szujobs@hotmail.com]
@ -142,7 +144,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
private LogStatsThread logStatsThread; private LogStatsThread logStatsThread;
private int createTaskCount; private int createTaskCount;
private final CountDownLatch initedLatch = new CountDownLatch(2); private CountDownLatch initedLatch = new CountDownLatch(2);
private volatile boolean enable = true; private volatile boolean enable = true;
@ -166,8 +168,6 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
private volatile boolean keepAlive = false; private volatile boolean keepAlive = false;
private static final boolean logActiveCount = "true".equals(System.getProperty("frDruidLogActiveCount"));
public DruidDataSource() { public DruidDataSource() {
this(false); this(false);
} }
@ -603,6 +603,35 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
this.connectProperties = properties; this.connectProperties = properties;
} }
private synchronized void createThreadChange() {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this) + this.getUrl();
createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.setStarted(false);
String destroyName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this) + this.getUrl();
if (destroyConnectionThread != null) {
if (!destroyConnectionThread.isInterrupted()) {
destroyConnectionThread.interrupt();
}
}
destroyConnectionThread = new DestroyConnectionThread(destroyName);
destroyConnectionThread.setStarted(false);
initedLatch = new CountDownLatch(2);
}
private void checkThread() throws SQLException {
if (!createConnectionThread.isStarted() && !destroyConnectionThread.isStarted()) {
createConnectionThread.setStarted(true);
createConnectionThread.start();
destroyConnectionThread.setStarted(true);
destroyConnectionThread.start();
try {
initedLatch.await();
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
}
}
}
public void init() throws SQLException { public void init() throws SQLException {
if (inited) { if (inited) {
return; return;
@ -818,7 +847,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
protected void createAndStartCreatorThread() { protected void createAndStartCreatorThread() {
if (createScheduler == null) { if (createScheduler == null) {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this) + this.getUrl();
createConnectionThread = new CreateConnectionThread(threadName); createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.start(); createConnectionThread.start();
return; return;
@ -1056,7 +1085,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init(); init();
checkThread();
if (filters.size() > 0) { if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this); FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis); return filterChain.dataSource_connect(this, maxWaitMillis);
@ -1167,7 +1196,6 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
lock.lock(); lock.lock();
try { try {
activeCount--; activeCount--;
logActiveCount(false);
discardCount++; discardCount++;
if (activeCount <= minIdle) { if (activeCount <= minIdle) {
@ -1219,7 +1247,6 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
if (holder != null) { if (holder != null) {
activeCount++; activeCount++;
logActiveCount(true);
if (activeCount > activePeak) { if (activeCount > activePeak) {
activePeak = activeCount; activePeak = activeCount;
activePeakTime = System.currentTimeMillis(); activePeakTime = System.currentTimeMillis();
@ -1251,8 +1278,9 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
.append(", active " + activeCount)// .append(", active " + activeCount)//
.append(", maxActive " + maxActive)// .append(", maxActive " + maxActive)//
; ;
JdbcDataSourceStat sourceStat = this.getDataSourceStat();
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList(); if (sourceStat != null) {
List<JdbcSqlStatValue> sqlList = sourceStat.getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) { for (int i = 0; i < sqlList.size(); ++i) {
if (i != 0) { if (i != 0) {
buf.append('\n'); buf.append('\n');
@ -1265,6 +1293,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
buf.append(" : "); buf.append(" : ");
buf.append(sql.getSql()); buf.append(sql.getSql());
} }
}
String errorMessage = buf.toString(); String errorMessage = buf.toString();
throw new GetConnectionTimeoutException(errorMessage); throw new GetConnectionTimeoutException(errorMessage);
} }
@ -1409,7 +1438,6 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
lock.lock(); lock.lock();
try { try {
activeCount--; activeCount--;
logActiveCount(false);
closeCount++; closeCount++;
} finally { } finally {
lock.unlock(); lock.unlock();
@ -1428,7 +1456,6 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
lock.lockInterruptibly(); lock.lockInterruptibly();
try { try {
activeCount--; activeCount--;
logActiveCount(false);
closeCount++; closeCount++;
result = putLast(holder, lastActiveTimeMillis); result = putLast(holder, lastActiveTimeMillis);
@ -2017,9 +2044,6 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
LOG.error("create connection error", e); LOG.error("create connection error", e);
// unknow fatal exception // unknow fatal exception
setFailContinuous(true); setFailContinuous(true);
if (breakAfterAcquireFailure) {
break;
}
continue; continue;
} catch (Error e) { } catch (Error e) {
lock.lock(); lock.lock();
@ -2049,6 +2073,7 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} }
public class CreateConnectionThread extends Thread { public class CreateConnectionThread extends Thread {
private volatile boolean started = true;
public CreateConnectionThread(String name) { public CreateConnectionThread(String name) {
super(name); super(name);
@ -2132,13 +2157,12 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} catch (InterruptedException interruptEx) { } catch (InterruptedException interruptEx) {
break; break;
} }
DruidDataSource.this.createThreadChange();
break;
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOG.error("create connection error", e); LOG.error("create connection error", e);
setFailContinuous(true); setFailContinuous(true);
if (breakAfterAcquireFailure) {
break;
}
continue; continue;
} catch (Error e) { } catch (Error e) {
LOG.error("create connection error", e); LOG.error("create connection error", e);
@ -2159,9 +2183,18 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
errorCount = 0; // reset errorCount errorCount = 0; // reset errorCount
} }
} }
public boolean isStarted() {
return started;
}
public void setStarted(boolean started) {
this.started = started;
}
} }
public class DestroyConnectionThread extends Thread { public class DestroyConnectionThread extends Thread {
private volatile boolean started = true;
public DestroyConnectionThread(String name) { public DestroyConnectionThread(String name) {
super(name); super(name);
@ -2195,6 +2228,13 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
} }
} }
public boolean isStarted() {
return started;
}
public void setStarted(boolean started) {
this.started = started;
}
} }
public class DestroyTask implements Runnable { public class DestroyTask implements Runnable {
@ -2313,7 +2353,9 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
return removeCount; return removeCount;
} }
/** Instance key */ /**
* Instance key
*/
protected String instanceKey = null; protected String instanceKey = null;
public Reference getReference() throws NamingException { public Reference getReference() throws NamingException {
@ -3129,10 +3171,4 @@ public class DruidDataSource extends DruidAbstractDataSource implements DruidDat
public boolean isClosed() { public boolean isClosed() {
return this.closed; return this.closed;
} }
private void logActiveCount(boolean isIncrease){
if (logActiveCount){
LOG.debug("activeCount " + (isIncrease ? "increase" : "decrease") + "current count is : " + activeCount, new Throwable());
}
}
} }

Loading…
Cancel
Save