Browse Source

回滚

feature/big-screen
Maksim 5 years ago
parent
commit
ae73305f84
  1. 147
      designer-realize/src/main/java/com/fr/design/mainframe/socketio/DesignerSocketIO.java

147
designer-realize/src/main/java/com/fr/design/mainframe/socketio/DesignerSocketIO.java

@ -13,6 +13,7 @@ import com.fr.report.RemoteDesignConstants;
import com.fr.serialization.SerializerHelper; import com.fr.serialization.SerializerHelper;
import com.fr.stable.ArrayUtils; import com.fr.stable.ArrayUtils;
import com.fr.third.apache.log4j.spi.LoggingEvent; import com.fr.third.apache.log4j.spi.LoggingEvent;
import com.fr.third.guava.base.Optional;
import com.fr.workspace.WorkContext; import com.fr.workspace.WorkContext;
import com.fr.workspace.Workspace; import com.fr.workspace.Workspace;
import com.fr.workspace.base.WorkspaceConstants; import com.fr.workspace.base.WorkspaceConstants;
@ -35,19 +36,28 @@ public class DesignerSocketIO {
Disconnecting Disconnecting
} }
private static Socket socket = null; private static Optional<Socket> socketIO = Optional.absent();
private static Status status = Status.Disconnected; private static Status status = Status.Disconnected;
//维护一个当前工作环境的uri列表
private static String[] uri;
//维护一个关于uri列表的计数器
private static int count;
private static final Emitter.Listener printLog = new Emitter.Listener() {
@Override
public void call(Object... objects) {
if (ArrayUtils.isNotEmpty(objects)) {
try {
LoggingEvent event = SerializerHelper.deserialize((byte[]) objects[0]);
DesignerLogger.log(event);
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
}
};
public static void close() { public static void close() {
if (socket != null) { if (socketIO.isPresent()) {
status = Status.Disconnecting; status = Status.Disconnecting;
socket.close(); socketIO.get().close();
socket = null; socketIO = Optional.absent();
} }
} }
@ -56,90 +66,24 @@ public class DesignerSocketIO {
if (current.isLocal()) { if (current.isLocal()) {
return; return;
} }
//每当更换工作环境,更新uri列表,同时更新计数器count
try {
uri = getSocketUri();
} catch (IOException e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
count = 0;
//建立socket并注册监听
createSocket();
}
private static void createSocket(){
//根据uri和计数器建立连接,并注册监听
try { try {
if(count<uri.length) { String[] uri = getSocketUri(current);
socket = IO.socket(new URI(uri[count])); socketIO = Optional.of(IO.socket(new URI(uri[0])));
socket.on(WorkspaceConstants.WS_LOGRECORD, printLog); socketIO.get().on(WorkspaceConstants.WS_LOGRECORD, printLog);
socket.on(WorkspaceConstants.CONFIG_MODIFY, modifyConfig); socketIO.get().on(WorkspaceConstants.CONFIG_MODIFY, new Emitter.Listener() {
socket.on(Socket.EVENT_CONNECT_ERROR, failRetry);
socket.on(Socket.EVENT_DISCONNECT, disConnectHint);
socket.connect();
status = Status.Connected;
}else {
//表示所有的uri都连接不成功
FineLoggerFactory.getLogger().warn("All uris failed to connect");
}
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
private static String[] getSocketUri() throws IOException {
Workspace current = WorkContext.getCurrent();
URL url = new URL(current.getPath());
Integer[] ports = current.get(SocketInfoOperator.class).getPort();
WorkspaceConnection connection = current.getConnection();
String[] result = new String[ports.length];
for (int i = 0; i < ports.length; i++) {
result[i] = String.format("%s://%s:%s%s?%s=%s&%s=%s",
url.getProtocol(),
url.getHost(),
ports[i],
WorkspaceConstants.WS_NAMESPACE,
DecisionServiceConstants.WEB_SOCKET_TOKEN_NAME,
connection.getToken(),
RemoteDesignConstants.USER_LOCK_ID,
connection.getId());
}
return result;
}
//失败重试监听器:1、关闭失败的socket 2、计数器加1 3、调用创建socket方法
private static final Emitter.Listener failRetry = new Emitter.Listener() {
@Override
public void call(Object... args) {
status = Status.Disconnecting;
socket.close();
count++;
createSocket();
}
};
//日志输出监听器
private static final Emitter.Listener printLog = new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
if (ArrayUtils.isNotEmpty(objects)) { assert objects != null && objects.length == 1;
try { String param = (String) objects[0];
LoggingEvent event = SerializerHelper.deserialize((byte[]) objects[0]); EventDispatcher.fire(RemoteConfigEvent.EDIT, param);
DesignerLogger.log(event);
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
} }
}; });
socketIO.get().on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
//断开连接提醒监听器
private static final Emitter.Listener disConnectHint = new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
/* /*
* todo 远程心跳断开不一定 socket 断开 和远程紧密相关的业务都绑定在心跳上切换成心跳断开之后进行提醒 * todo 远程心跳断开不一定 socketio 断开 和远程紧密相关的业务都绑定在心跳上切换成心跳断开之后进行提醒
* socket 只用推日志和通知配置变更 * socketio 只用推日志和通知配置变更
*/ */
if (status != Status.Disconnecting) { if (status != Status.Disconnecting) {
try { try {
@ -161,16 +105,31 @@ public class DesignerSocketIO {
} }
status = Status.Disconnected; status = Status.Disconnected;
} }
}; });
socketIO.get().connect();
status = Status.Connected;
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
//配置变更监听器 private static String[] getSocketUri(Workspace current) throws IOException {
private static final Emitter.Listener modifyConfig = new Emitter.Listener() { URL url = new URL(current.getPath());
@Override Integer[] ports = WorkContext.getCurrent().get(SocketInfoOperator.class).getPort();
public void call(Object... objects) { WorkspaceConnection connection = WorkContext.getCurrent().getConnection();
assert objects != null && objects.length == 1; String[] result = new String[ports.length];
String param = (String) objects[0]; for (int i = 0; i < ports.length; i++) {
EventDispatcher.fire(RemoteConfigEvent.EDIT, param); result[i] = String.format("%s://%s:%s%s?%s=%s&%s=%s",
url.getProtocol(),
url.getHost(),
ports[i],
WorkspaceConstants.WS_NAMESPACE,
DecisionServiceConstants.WEB_SOCKET_TOKEN_NAME,
connection.getToken(),
RemoteDesignConstants.USER_LOCK_ID,
connection.getId());
} }
}; return result;
}
} }

Loading…
Cancel
Save