Browse Source

Merge pull request #235 in CORE/base-third from release/10.0 to feature/10.0

* commit 'aa96c0380e1e574b176e93fcddc33b96cf71b761':
  DEC-8173 脏数据导致quartz启动失败
  JB注解
  REPORT-17530 【CRM10.0】20190527宕机风险问题:12w条clientnode 多线程下, clientbox 无法清除无效数据
research/11.0
Harrison 6 years ago
parent
commit
1a2b319c24
  1. 5
      fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java
  2. 49
      fine-socketio/src/com/fr/third/socketio/handler/ClientHead.java

5
fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

@ -982,7 +982,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
} }
for (TriggerKey triggerKey : misfiredTriggers) { for (TriggerKey triggerKey : misfiredTriggers) {
try {
OperableTrigger trig = OperableTrigger trig =
retrieveTrigger(conn, triggerKey); retrieveTrigger(conn, triggerKey);
@ -994,6 +994,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime) if (trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
earliestNewTime = trig.getNextFireTime().getTime(); earliestNewTime = trig.getNextFireTime().getTime();
} catch (JobPersistenceException e) {
getLog().error(e.getMessage());
}
} }
return new RecoverMisfiredJobsResult( return new RecoverMisfiredJobsResult(

49
fine-socketio/src/com/fr/third/socketio/handler/ClientHead.java

@ -86,13 +86,28 @@ public class ClientHead {
channels.put(Transport.WEBSOCKET, new TransportState()); channels.put(Transport.WEBSOCKET, new TransportState());
} }
public void bindChannel(Channel channel, Transport transport) {
/**
* 这里多线程下会有问题
* 因为 channel 不会变 但是 state 可能因为多线程下调用 bindChannel
* 从而导致 state.update 致使真正 add()
* channel state 得数据不一致
*
* 解决方法
* 1. 加锁
* 2. clientbox 加一个子线程 定时清除 disconnected true 的值
*
* @param channel 频道
* @param transport 协议
*/
public synchronized void bindChannel(Channel channel, Transport transport) {
log.debug("binding channel: {} to transport: {}", channel, transport); log.debug("binding channel: {} to transport: {}", channel, transport);
TransportState state = channels.get(transport); TransportState state = channels.get(transport);
Channel prevChannel = state.update(channel); Channel prevChannel = state.update(channel);
if (prevChannel != null) { if (prevChannel != null) {
clientsBox.remove(prevChannel); clientsBox.remove(prevChannel);
prevChannel.disconnect();
} }
clientsBox.add(channel, this); clientsBox.add(channel, this);
@ -122,16 +137,7 @@ public class ClientHead {
public void schedulePingTimeout() { public void schedulePingTimeout() {
SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId); SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId);
disconnectScheduler.schedule(key, new Runnable() { disconnectScheduler.schedule(key, new TimeOutTask(this), configuration.getPingTimeout() + configuration.getPingInterval(), TimeUnit.MILLISECONDS);
@Override
public void run() {
ClientHead client = clientsBox.get(sessionId);
if (client != null) {
client.disconnect();
log.debug("{} removed due to ping timeout", sessionId);
}
}
}, configuration.getPingTimeout() + configuration.getPingInterval(), TimeUnit.MILLISECONDS);
} }
public ChannelFuture send(Packet packet, Transport transport) { public ChannelFuture send(Packet packet, Transport transport) {
@ -152,6 +158,8 @@ public class ClientHead {
public void removeNamespaceClient(NamespaceClient client) { public void removeNamespaceClient(NamespaceClient client) {
namespaceClients.remove(client.getNamespace()); namespaceClients.remove(client.getNamespace());
// 清空到最后一个的时候, 断开连接。 调用 initializer.disconnect
if (namespaceClients.isEmpty()) { if (namespaceClients.isEmpty()) {
disconnectableHub.onDisconnect(this); disconnectableHub.onDisconnect(this);
} }
@ -268,4 +276,23 @@ public class ClientHead {
return lastBinaryPacket; return lastBinaryPacket;
} }
private class TimeOutTask implements Runnable{
ClientHead client;
public TimeOutTask(ClientHead client) {
assert client != null;
this.client = client;
}
@Override
public void run() {
if (client != null && !client.disconnected.get()) {
client.disconnect();
log.debug("{} removed due to ping timeout", client.getSessionId());
}
}
}
} }

Loading…
Cancel
Save