Browse Source

建立socket增加重试机制

feature/big-screen
Maksim 5 years ago
parent
commit
ce9b3c71b0
  1. 145
      designer-realize/src/main/java/com/fr/design/mainframe/socketio/DesignerSocketIO.java

145
designer-realize/src/main/java/com/fr/design/mainframe/socketio/DesignerSocketIO.java

@ -13,7 +13,6 @@ import com.fr.report.RemoteDesignConstants;
import com.fr.serialization.SerializerHelper; import com.fr.serialization.SerializerHelper;
import com.fr.stable.ArrayUtils; import com.fr.stable.ArrayUtils;
import com.fr.third.apache.log4j.spi.LoggingEvent; import com.fr.third.apache.log4j.spi.LoggingEvent;
import com.fr.third.guava.base.Optional;
import com.fr.workspace.WorkContext; import com.fr.workspace.WorkContext;
import com.fr.workspace.Workspace; import com.fr.workspace.Workspace;
import com.fr.workspace.base.WorkspaceConstants; import com.fr.workspace.base.WorkspaceConstants;
@ -36,28 +35,19 @@ public class DesignerSocketIO {
Disconnecting Disconnecting
} }
private static Optional<Socket> socketIO = Optional.absent(); private static Socket socket = null;
private static Status status = Status.Disconnected; private static Status status = Status.Disconnected;
//维护一个当前工作环境的uri列表
private static String[] uri;
//维护一个关于uri列表的计数器
private static int count;
private static final Emitter.Listener printLog = new Emitter.Listener() {
@Override
public void call(Object... objects) {
if (ArrayUtils.isNotEmpty(objects)) {
try {
LoggingEvent event = SerializerHelper.deserialize((byte[]) objects[0]);
DesignerLogger.log(event);
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
}
};
public static void close() { public static void close() {
if (socketIO.isPresent()) { if (socket != null) {
status = Status.Disconnecting; status = Status.Disconnecting;
socketIO.get().close(); socket.close();
socketIO = Optional.absent(); socket = null;
} }
} }
@ -66,24 +56,90 @@ public class DesignerSocketIO {
if (current.isLocal()) { if (current.isLocal()) {
return; return;
} }
//每当更换工作环境,更新uri列表,同时更新计数器count
try {
uri = getSocketUri();
} catch (IOException e) {
e.printStackTrace();
}
count = 0;
//建立socket并注册监听
CreateSocket();
}
private static void CreateSocket(){
//根据uri和计数器建立连接,并注册监听
try { try {
String[] uri = getSocketUri(current); if(count<uri.length) {
socketIO = Optional.of(IO.socket(new URI(uri[0]))); socket = IO.socket(new URI(uri[count]));
socketIO.get().on(WorkspaceConstants.WS_LOGRECORD, printLog); socket.on(WorkspaceConstants.WS_LOGRECORD, printLog);
socketIO.get().on(WorkspaceConstants.CONFIG_MODIFY, new Emitter.Listener() { socket.on(WorkspaceConstants.CONFIG_MODIFY, modifyConfig);
socket.on(Socket.EVENT_CONNECT_ERROR, failRetry);
socket.on(Socket.EVENT_DISCONNECT, disConnectHint);
socket.connect();
status = Status.Connected;
}else {
//表示所有的uri都连接不成功
FineLoggerFactory.getLogger().warn("所有URI均连接失败");
}
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
private static String[] getSocketUri() throws IOException {
Workspace current = WorkContext.getCurrent();
URL url = new URL(current.getPath());
Integer[] ports = current.get(SocketInfoOperator.class).getPort();
WorkspaceConnection connection = current.getConnection();
String[] result = new String[ports.length];
for (int i = 0; i < ports.length; i++) {
result[i] = String.format("%s://%s:%s%s?%s=%s&%s=%s",
url.getProtocol(),
url.getHost(),
ports[i],
WorkspaceConstants.WS_NAMESPACE,
DecisionServiceConstants.WEB_SOCKET_TOKEN_NAME,
connection.getToken(),
RemoteDesignConstants.USER_LOCK_ID,
connection.getId());
}
return result;
}
//失败重试监听器:1、关闭失败的socket 2、计数器加1 3、调用创建socket方法
private static final Emitter.Listener failRetry = new Emitter.Listener() {
@Override
public void call(Object... args) {
status = Status.Disconnecting;
socket.close();
count++;
CreateSocket();
}
};
//日志输出监听器
private static final Emitter.Listener printLog = new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
assert objects != null && objects.length == 1; if (ArrayUtils.isNotEmpty(objects)) {
String param = (String) objects[0]; try {
EventDispatcher.fire(RemoteConfigEvent.EDIT, param); LoggingEvent event = SerializerHelper.deserialize((byte[]) objects[0]);
DesignerLogger.log(event);
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
} }
}); }
socketIO.get().on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { }
};
//断开连接提醒监听器
private static final Emitter.Listener disConnectHint = new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
/* /*
* todo 远程心跳断开不一定 socketio 断开 和远程紧密相关的业务都绑定在心跳上切换成心跳断开之后进行提醒 * todo 远程心跳断开不一定 socketio 断开 和远程紧密相关的业务都绑定在心跳上切换成心跳断开之后进行提醒
* socketio 只用推日志和通知配置变更 * socket 只用推日志和通知配置变更
*/ */
if (status != Status.Disconnecting) { if (status != Status.Disconnecting) {
try { try {
@ -105,31 +161,16 @@ public class DesignerSocketIO {
} }
status = Status.Disconnected; status = Status.Disconnected;
} }
}); };
socketIO.get().connect();
status = Status.Connected;
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
private static String[] getSocketUri(Workspace current) throws IOException { //配置变更监听器
URL url = new URL(current.getPath()); private static final Emitter.Listener modifyConfig = new Emitter.Listener() {
Integer[] ports = WorkContext.getCurrent().get(SocketInfoOperator.class).getPort(); @Override
WorkspaceConnection connection = WorkContext.getCurrent().getConnection(); public void call(Object... objects) {
String[] result = new String[ports.length]; assert objects != null && objects.length == 1;
for (int i = 0; i < ports.length; i++) { String param = (String) objects[0];
result[i] = String.format("%s://%s:%s%s?%s=%s&%s=%s", EventDispatcher.fire(RemoteConfigEvent.EDIT, param);
url.getProtocol(),
url.getHost(),
ports[i],
WorkspaceConstants.WS_NAMESPACE,
DecisionServiceConstants.WEB_SOCKET_TOKEN_NAME,
connection.getToken(),
RemoteDesignConstants.USER_LOCK_ID,
connection.getId());
} }
return result; };
} }
}

Loading…
Cancel
Save