multiThreadFilterMap = new ConcurrentHashMap<>();
@Autowired
@@ -83,7 +82,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
return;
}
if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
- logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
+ log.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
return;
}
TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
@@ -111,10 +110,10 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onFailure(Throwable ex) {
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
- logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex);
+ log.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
- logger.info(
+ log.info(
"[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
processInstanceId);
}
@@ -124,10 +123,10 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onSuccess(Object result) {
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
- logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId);
+ log.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
- logger.info(
+ log.info(
"[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
processInstanceId);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
index b223c7e8c0..aa82178813 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
@@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+@Slf4j
public class MasterConnectionStateListener implements ConnectionListener {
- private static final Logger logger = LoggerFactory.getLogger(MasterConnectionStateListener.class);
-
private final MasterConfig masterConfig;
private final RegistryClient registryClient;
private final MasterConnectStrategy masterConnectStrategy;
@@ -46,7 +43,7 @@ public class MasterConnectionStateListener implements ConnectionListener {
@Override
public void onUpdate(ConnectionState state) {
- logger.info("Master received a {} event from registry, the current server state is {}", state,
+ log.info("Master received a {} event from registry, the current server state is {}", state,
ServerLifeCycleManager.getServerStatus());
switch (state) {
case CONNECTED:
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index c319d393ed..14e4fa24e9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -33,8 +33,8 @@ import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -43,10 +43,9 @@ import org.springframework.stereotype.Component;
* When the Master node startup, it will register in registry center. And start a {@link MasterHeartBeatTask} to update its metadata in registry.
*/
@Component
+@Slf4j
public class MasterRegistryClient implements AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);
-
@Autowired
private FailoverService failoverService;
@@ -92,29 +91,29 @@ public class MasterRegistryClient implements AutoCloseable {
* @param failover is failover
*/
public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) {
- logger.info("{} node deleted : {}", nodeType, path);
+ log.info("{} node deleted : {}", nodeType, path);
if (StringUtils.isEmpty(path)) {
- logger.error("server down error: empty path: {}, nodeType:{}", path, nodeType);
+ log.error("server down error: empty path: {}, nodeType:{}", path, nodeType);
return;
}
String serverHost = registryClient.getHostByEventDataPath(path);
if (StringUtils.isEmpty(serverHost)) {
- logger.error("server down error: unknown path: {}, nodeType:{}", path, nodeType);
+ log.error("server down error: unknown path: {}, nodeType:{}", path, nodeType);
return;
}
try {
if (!registryClient.exists(path)) {
- logger.info("path: {} not exists", path);
+ log.info("path: {} not exists", path);
}
// failover server
if (failover) {
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
- logger.error("{} server failover failed, host:{}", nodeType, serverHost, e);
+ log.error("{} server failover failed, host:{}", nodeType, serverHost, e);
}
}
@@ -126,17 +125,17 @@ public class MasterRegistryClient implements AutoCloseable {
* @param failover is failover
*/
public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
- logger.info("{} node deleted : {}", nodeType, path);
+ log.info("{} node deleted : {}", nodeType, path);
try {
String serverHost = null;
if (!StringUtils.isEmpty(path)) {
serverHost = registryClient.getHostByEventDataPath(path);
if (StringUtils.isEmpty(serverHost)) {
- logger.error("server down error: unknown path: {}", path);
+ log.error("server down error: unknown path: {}", path);
return;
}
if (!registryClient.exists(path)) {
- logger.info("path: {} not exists", path);
+ log.info("path: {} not exists", path);
}
}
// failover server
@@ -144,7 +143,7 @@ public class MasterRegistryClient implements AutoCloseable {
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
- logger.error("{} server failover failed", nodeType, e);
+ log.error("{} server failover failed", nodeType, e);
}
}
@@ -152,7 +151,7 @@ public class MasterRegistryClient implements AutoCloseable {
* Registry the current master server itself to registry.
*/
void registry() {
- logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
+ log.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
String masterRegistryPath = masterConfig.getMasterRegistryPath();
// remove before persist
@@ -160,7 +159,7 @@ public class MasterRegistryClient implements AutoCloseable {
registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat()));
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
- logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
+ log.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
@@ -168,20 +167,20 @@ public class MasterRegistryClient implements AutoCloseable {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
masterHeartBeatTask.start();
- logger.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress());
+ log.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress());
}
public void deregister() {
try {
registryClient.remove(masterConfig.getMasterRegistryPath());
- logger.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress());
+ log.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress());
if (masterHeartBeatTask != null) {
masterHeartBeatTask.shutdown();
}
registryClient.close();
} catch (Exception e) {
- logger.error("MasterServer remove registry path exception ", e);
+ log.error("MasterServer remove registry path exception ", e);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index 5a1c101e4f..513db20059 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -26,15 +26,13 @@ import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Strings;
+@Slf4j
public class MasterRegistryDataListener implements SubscribeListener {
- private static final Logger logger = LoggerFactory.getLogger(MasterRegistryDataListener.class);
-
private final MasterRegistryClient masterRegistryClient;
public MasterRegistryDataListener() {
@@ -60,7 +58,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
final String path = event.path();
switch (event.type()) {
case ADD:
- logger.info("master node added : {}", path);
+ log.info("master node added : {}", path);
break;
case REMOVE:
masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true);
@@ -75,10 +73,10 @@ public class MasterRegistryDataListener implements SubscribeListener {
final String path = event.path();
switch (event.type()) {
case ADD:
- logger.info("worker node added : {}", path);
+ log.info("worker node added : {}", path);
break;
case REMOVE:
- logger.info("worker node deleted : {}", path);
+ log.info("worker node deleted : {}", path);
masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true);
break;
default:
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
index 5ac361ddca..bbc38d35fe 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
@@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@@ -32,10 +32,9 @@ import org.springframework.stereotype.Service;
*/
@Service
@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true)
+@Slf4j
public class MasterStopStrategy implements MasterConnectStrategy {
- private final Logger logger = LoggerFactory.getLogger(MasterStopStrategy.class);
-
@Autowired
private RegistryClient registryClient;
@Autowired
@@ -49,7 +48,7 @@ public class MasterStopStrategy implements MasterConnectStrategy {
@Override
public void reconnect() {
- logger.warn("The current connect strategy is stop, so the master will not reconnect to registry");
+ log.warn("The current connect strategy is stop, so the master will not reconnect to registry");
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
index 4556500db6..6929317ebd 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
@@ -32,8 +32,8 @@ import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import java.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@@ -43,10 +43,9 @@ import org.springframework.stereotype.Service;
*/
@Service
@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "waiting")
+@Slf4j
public class MasterWaitingStrategy implements MasterConnectStrategy {
- private final Logger logger = LoggerFactory.getLogger(MasterWaitingStrategy.class);
-
@Autowired
private MasterConfig masterConfig;
@Autowired
@@ -67,7 +66,7 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
clearMasterResource();
Duration maxWaitingTime = masterConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
try {
- logger.info("Master disconnect from registry will try to reconnect in {} s",
+ log.info("Master disconnect from registry will try to reconnect in {} s",
maxWaitingTime.getSeconds());
registryClient.connectUntilTimeout(maxWaitingTime);
} catch (RegistryException ex) {
@@ -78,15 +77,15 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
String errorMessage = String.format(
"Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
ServerLifeCycleManager.getServerStatus());
- logger.error(errorMessage, e);
+ log.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
} catch (RegistryException ex) {
String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server";
- logger.error(errorMessage, ex);
+ log.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
} catch (Exception ex) {
String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server";
- logger.error(errorMessage, ex);
+ log.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
}
}
@@ -94,19 +93,19 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
@Override
public void reconnect() {
if (ServerLifeCycleManager.isRunning()) {
- logger.info("no need to reconnect, as the current server status is running");
+ log.info("no need to reconnect, as the current server status is running");
} else {
try {
ServerLifeCycleManager.recoverFromWaiting();
reStartMasterResource();
- logger.info("Recover from waiting success, the current server status is {}",
+ log.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
String errorMessage =
String.format(
"Recover from waiting failed, the current server status is %s, will stop the server",
ServerLifeCycleManager.getServerStatus());
- logger.error(errorMessage, e);
+ log.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
}
}
@@ -120,19 +119,19 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
private void clearMasterResource() {
// close the worker resource, if close failed should stop the worker server
masterRPCServer.close();
- logger.warn("Master closed RPC server due to lost registry connection");
+ log.warn("Master closed RPC server due to lost registry connection");
workflowEventQueue.clearWorkflowEventQueue();
- logger.warn("Master clear workflow event queue due to lost registry connection");
+ log.warn("Master clear workflow event queue due to lost registry connection");
processInstanceExecCacheManager.clearCache();
- logger.warn("Master clear process instance cache due to lost registry connection");
+ log.warn("Master clear process instance cache due to lost registry connection");
stateWheelExecuteThread.clearAllTasks();
- logger.warn("Master clear all state wheel task due to lost registry connection");
+ log.warn("Master clear all state wheel task due to lost registry connection");
}
private void reStartMasterResource() {
// reopen the resource, if reopen failed should stop the worker server
masterRPCServer.start();
- logger.warn("Master restarted RPC server due to reconnect to registry");
+ log.warn("Master restarted RPC server due to reconnect to registry");
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index c6151866c0..d04b7d0510 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -62,17 +62,16 @@ import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
+@Slf4j
public class ServerNodeManager implements InitializingBean {
- private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class);
-
private final Lock masterLock = new ReentrantLock();
private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
@@ -156,7 +155,7 @@ public class ServerNodeManager implements InitializingBean {
// sync worker node info
refreshWorkerNodesAndGroupMappings();
} catch (Exception e) {
- logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
+ log.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
}
}
}
@@ -186,17 +185,17 @@ public class ServerNodeManager implements InitializingBean {
final String workerAddress = parts[parts.length - 1];
// todo: update workerNodeInfo
- logger.debug("received subscribe event : {}", event);
+ log.debug("received subscribe event : {}", event);
if (type == Type.ADD) {
- logger.info("Worker: {} added, currentNode : {}", path, workerAddress);
+ log.info("Worker: {} added, currentNode : {}", path, workerAddress);
} else if (type == Type.REMOVE) {
- logger.info("Worker node : {} down.", path);
+ log.info("Worker node : {} down.", path);
alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) {
syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
}
} catch (Exception ex) {
- logger.error("WorkerGroupListener capture data change and get data failed", ex);
+ log.error("WorkerGroupListener capture data change and get data failed", ex);
}
}
}
@@ -220,16 +219,16 @@ public class ServerNodeManager implements InitializingBean {
if (registryClient.isMasterPath(path)) {
try {
if (type.equals(Type.ADD)) {
- logger.info("master node : {} added.", path);
+ log.info("master node : {} added.", path);
updateMasterNodes();
}
if (type.equals(Type.REMOVE)) {
- logger.info("master node : {} down.", path);
+ log.info("master node : {} down.", path);
updateMasterNodes();
alertDao.sendServerStoppedAlert(1, path, "MASTER");
}
} catch (Exception ex) {
- logger.error("MasterNodeListener capture data change and get data failed.", ex);
+ log.error("MasterNodeListener capture data change and get data failed.", ex);
}
}
}
@@ -246,7 +245,7 @@ public class ServerNodeManager implements InitializingBean {
List masterNodeList = registryClient.getServerList(NodeType.MASTER);
syncMasterNodes(currentNodes, masterNodeList);
} catch (Exception e) {
- logger.error("update master nodes error", e);
+ log.error("update master nodes error", e);
} finally {
registryClient.releaseLock(nodeLock);
}
@@ -312,9 +311,9 @@ public class ServerNodeManager implements InitializingBean {
totalSlot = nodes.size();
currentSlot = index;
} else {
- logger.warn("Current master is not in active master list");
+ log.warn("Current master is not in active master list");
}
- logger.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
+ log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
} finally {
masterLock.unlock();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 8255e9a3b0..1a070c10af 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -32,8 +32,8 @@ import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -41,10 +41,9 @@ import org.springframework.stereotype.Service;
* Master RPC Server, used to send/receive request to other system.
*/
@Service
+@Slf4j
public class MasterRPCServer implements AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(MasterRPCServer.class);
-
private NettyRemotingServer nettyRemotingServer;
@Autowired
@@ -81,7 +80,7 @@ public class MasterRPCServer implements AutoCloseable {
private TaskExecuteStartProcessor taskExecuteStartProcessor;
public void start() {
- logger.info("Starting Master RPC Server...");
+ log.info("Starting Master RPC Server...");
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
@@ -98,21 +97,21 @@ public class MasterRPCServer implements AutoCloseable {
workflowExecutingDataRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor);
- // logger server
+ // log server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
- logger.info("Started Master RPC Server...");
+ log.info("Started Master RPC Server...");
}
@Override
public void close() {
- logger.info("Closing Master RPC Server...");
+ log.info("Closing Master RPC Server...");
this.nettyRemotingServer.close();
- logger.info("Closed Master RPC Server...");
+ log.info("Closed Master RPC Server...");
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index a36f01c534..305d0ee680 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -26,16 +26,15 @@ import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
+@Slf4j
public class EventExecuteService extends BaseDaemonThread {
- private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);
-
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@@ -54,9 +53,9 @@ public class EventExecuteService extends BaseDaemonThread {
@Override
public synchronized void start() {
- logger.info("Master Event execute service starting");
+ log.info("Master Event execute service starting");
super.start();
- logger.info("Master Event execute service started");
+ log.info("Master Event execute service started");
}
@Override
@@ -67,11 +66,11 @@ public class EventExecuteService extends BaseDaemonThread {
streamTaskEventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (InterruptedException interruptedException) {
- logger.warn("Master event service interrupted, will exit this loop", interruptedException);
+ log.warn("Master event service interrupted, will exit this loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
- logger.error("Master event execute service error", e);
+ log.error("Master event execute service error", e);
}
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 2ae65c7cb8..068a8c4c3a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -24,16 +24,15 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
+@Slf4j
public class FailoverExecuteThread extends BaseDaemonThread {
- private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class);
-
@Autowired
private MasterConfig masterConfig;
@@ -49,9 +48,9 @@ public class FailoverExecuteThread extends BaseDaemonThread {
@Override
public synchronized void start() {
- logger.info("Master failover thread staring");
+ log.info("Master failover thread staring");
super.start();
- logger.info("Master failover thread stared");
+ log.info("Master failover thread stared");
}
@Override
@@ -68,7 +67,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
// This kind of check may only need to be executed when a master server start
masterFailoverService.checkMasterFailover();
} catch (Exception e) {
- logger.error("Master failover thread execute error", e);
+ log.error("Master failover thread execute error", e);
} finally {
ThreadUtils.sleep(masterConfig.getFailoverInterval().toMillis());
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 42bbe075ab..e688e758d1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -53,8 +53,8 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -62,10 +62,9 @@ import org.springframework.stereotype.Service;
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@Service
+@Slf4j
public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
-
@Autowired
private ProcessService processService;
@@ -130,16 +129,16 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Override
public synchronized void start() {
- logger.info("Master schedule bootstrap starting..");
+ log.info("Master schedule bootstrap starting..");
super.start();
workflowEventLooper.start();
- logger.info("Master schedule bootstrap started...");
+ log.info("Master schedule bootstrap started...");
}
@Override
public void close() {
- logger.info("Master schedule bootstrap stopping...");
- logger.info("Master schedule bootstrap stopped...");
+ log.info("Master schedule bootstrap stopping...");
+ log.info("Master schedule bootstrap stopped...");
}
/**
@@ -151,7 +150,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
try {
if (!ServerLifeCycleManager.isRunning()) {
// the current server is not at running status, cannot consume command.
- logger.warn("The current server {} is not at running status, cannot consumes commands.",
+ log.warn("The current server {} is not at running status, cannot consumes commands.",
this.masterAddress);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
@@ -159,7 +158,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
- logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
+ log.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
@@ -183,7 +182,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
- logger.error(
+ log.error(
"The workflow instance is already been cached, this case shouldn't be happened");
}
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
@@ -205,11 +204,11 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
}
});
} catch (InterruptedException interruptedException) {
- logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
+ log.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
- logger.error("Master schedule workflow error", e);
+ log.error("Master schedule workflow error", e);
// sleep for 1s here to avoid the database down cause the exception boom
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
@@ -218,7 +217,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
private List command2ProcessInstance(List commands) throws InterruptedException {
long commandTransformStartTime = System.currentTimeMillis();
- logger.info("Master schedule bootstrap transforming command to ProcessInstance, commandSize: {}",
+ log.info("Master schedule bootstrap transforming command to ProcessInstance, commandSize: {}",
commands.size());
List processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
@@ -231,18 +230,18 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
// by only one master
SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
- logger.info("Master handle command {} skip, slot check state: {}", command.getId(),
+ log.info("Master handle command {} skip, slot check state: {}", command.getId(),
slotCheckState);
return;
}
ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
if (processInstance != null) {
processInstances.add(processInstance);
- logger.info("Master handle command {} end, create process instance {}", command.getId(),
+ log.info("Master handle command {} end, create process instance {}", command.getId(),
processInstance.getId());
}
} catch (Exception e) {
- logger.error("Master handle command {} error ", command.getId(), e);
+ log.error("Master handle command {} error ", command.getId(), e);
commandService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
@@ -252,7 +251,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
// make sure to finish handling command each time before next scan
latch.await();
- logger.info(
+ log.info(
"Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
ProcessInstanceMetrics
@@ -266,7 +265,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
int thisMasterSlot = serverNodeManager.getSlot();
int masterCount = serverNodeManager.getMasterSize();
if (masterCount <= 0) {
- logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
+ log.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
return Collections.emptyList();
}
int pageSize = masterConfig.getFetchCommandNum();
@@ -274,7 +273,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
long cost = System.currentTimeMillis() - scheduleStartTime;
- logger.info(
+ log.info(
"Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}",
result.size(), cost, thisMasterSlot, masterCount);
ProcessInstanceMetrics.recordCommandQueryTime(cost);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 94fefea634..1b2c603b8e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -41,9 +41,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.PostConstruct;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@@ -56,10 +55,9 @@ import org.springframework.stereotype.Component;
* 4. timeout process check
*/
@Component
+@Slf4j
public class StateWheelExecuteThread extends BaseDaemonThread {
- private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
-
/**
* ProcessInstance timeout check list, element is the processInstanceId.
*/
@@ -109,12 +107,12 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
checkTask4State();
checkProcess4Timeout();
} catch (Exception e) {
- logger.error("state wheel thread check error:", e);
+ log.error("state wheel thread check error:", e);
}
try {
Thread.sleep(checkInterval);
} catch (InterruptedException e) {
- logger.error("state wheel thread sleep error, will close the loop", e);
+ log.error("state wheel thread sleep error, will close the loop", e);
Thread.currentThread().interrupt();
break;
}
@@ -123,13 +121,13 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
processInstanceTimeoutCheckList.add(processInstance.getId());
- logger.info("Success add workflow instance {} into timeout check list", processInstance.getId());
+ log.info("Success add workflow instance {} into timeout check list", processInstance.getId());
}
public void removeProcess4TimeoutCheck(int processInstanceId) {
boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
if (removeFlag) {
- logger.info("Success remove workflow instance {} from timeout check list", processInstanceId);
+ log.info("Success remove workflow instance {} from timeout check list", processInstanceId);
}
}
@@ -143,27 +141,27 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn(
+ log.warn(
"Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
processInstanceTimeoutCheckList.remove(processInstanceId);
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance == null) {
- logger.warn("Check workflow timeout failed, the workflowInstance is null");
+ log.warn("Check workflow timeout failed, the workflowInstance is null");
continue;
}
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(),
(long) processInstance.getTimeout()
* Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
- logger.info("Workflow instance {} timeout, adding timeout event", processInstance.getId());
+ log.info("Workflow instance {} timeout, adding timeout event", processInstance.getId());
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
- logger.info("Workflow instance timeout, added timeout event");
+ log.info("Workflow instance timeout, added timeout event");
}
} catch (Exception ex) {
- logger.error("Check workflow instance timeout error");
+ log.error("Check workflow instance timeout error");
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
@@ -172,72 +170,72 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
public void addTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
- logger.info("Adding task instance into timeout check list");
+ log.info("Adding task instance into timeout check list");
if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
- logger.warn("Task instance is already in timeout check list");
+ log.warn("Task instance is already in timeout check list");
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
- logger.error("Failed to add task instance into timeout check list, taskDefinition is null");
+ log.error("Failed to add task instance into timeout check list, taskDefinition is null");
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
taskInstanceTimeoutCheckList.add(taskInstanceKey);
- logger.info("Timeout flag is open, added task instance into timeout check list");
+ log.info("Timeout flag is open, added task instance into timeout check list");
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceTimeoutCheckList.add(taskInstanceKey);
- logger.info("task instance is dependTask orSubProcess, added task instance into timeout check list");
+ log.info("task instance is dependTask orSubProcess, added task instance into timeout check list");
}
}
public void removeTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
- logger.info("remove task instance from timeout check list");
+ log.info("remove task instance from timeout check list");
}
public void addTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
- logger.info("Adding task instance into retry check list");
+ log.info("Adding task instance into retry check list");
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceRetryCheckList.contains(taskInstanceKey)) {
- logger.warn("Task instance is already in retry check list");
+ log.warn("Task instance is already in retry check list");
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
- logger.error("Add task instance into retry check list error, taskDefinition is null");
+ log.error("Add task instance into retry check list error, taskDefinition is null");
return;
}
taskInstanceRetryCheckList.add(taskInstanceKey);
- logger.info("[WorkflowInstance-{}][TaskInstanceKey-{}:{}] Added task instance into retry check list",
+ log.info("[WorkflowInstance-{}][TaskInstanceKey-{}:{}] Added task instance into retry check list",
processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
}
public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
- logger.info("remove task instance from retry check list");
+ log.info("remove task instance from retry check list");
}
public void addTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
- logger.info("Adding task instance into state check list");
+ log.info("Adding task instance into state check list");
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
- logger.warn("Task instance is already in state check list");
+ log.warn("Task instance is already in state check list");
return;
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceStateCheckList.add(taskInstanceKey);
- logger.info("Added task instance into state check list");
+ log.info("Added task instance into state check list");
}
}
public void removeTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
taskInstanceStateCheckList.remove(taskInstanceKey);
- logger.info("Removed task instance from state check list");
+ log.info("Removed task instance from state check list");
}
public void clearAllTasks() {
@@ -260,7 +258,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
WorkflowExecuteRunnable workflowExecuteThread =
processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn(
+ log.warn(
"Check task instance timeout failed, can not find workflowExecuteThread from cache manager, will remove this check task");
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
@@ -268,7 +266,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
Optional taskInstanceOptional =
workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
- logger.warn(
+ log.warn(
"Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
+ "will remove this check task",
taskCode);
@@ -281,13 +279,13 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
(long) taskInstance.getTaskDefine().getTimeout()
* Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
- logger.info("Task instance is timeout, adding task timeout event and remove the check");
+ log.info("Task instance is timeout, adding task timeout event and remove the check");
addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
}
}
} catch (Exception ex) {
- logger.error("Check task timeout error, taskInstanceKey: {}", taskInstanceKey, ex);
+ log.error("Check task timeout error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
@@ -309,7 +307,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn(
+ log.warn(
"Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
+ "will remove this check task");
taskInstanceRetryCheckList.remove(taskInstanceKey);
@@ -321,7 +319,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance.getState().isReadyStop()) {
- logger.warn(
+ log.warn(
"The process instance is ready to stop, will send process stop event and remove the check task");
addProcessStopEvent(processInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
@@ -329,7 +327,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
}
if (!taskInstanceOptional.isPresent()) {
- logger.warn(
+ log.warn(
"Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
@@ -344,7 +342,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
// reset taskInstance endTime and state
// todo relative function: TaskInstance.retryTaskIntervalOverTime,
// WorkflowExecuteThread.cloneRetryTaskInstance
- logger.info("[TaskInstanceKey-{}:{}]The task instance can retry, will retry this task instance",
+ log.info("[TaskInstanceKey-{}:{}]The task instance can retry, will retry this task instance",
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
taskInstance.setEndTime(null);
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
@@ -353,7 +351,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
taskInstanceRetryCheckList.remove(taskInstanceKey);
}
} catch (Exception ex) {
- logger.error("Check task retry error, taskInstanceKey: {}", taskInstanceKey, ex);
+ log.error("Check task retry error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
@@ -373,7 +371,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
WorkflowExecuteRunnable workflowExecuteThread =
processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
- logger.warn(
+ log.warn(
"Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task");
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
@@ -381,7 +379,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
Optional taskInstanceOptional =
workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
- logger.warn(
+ log.warn(
"Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event");
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
@@ -392,7 +390,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
}
addTaskStateChangeEvent(taskInstance);
} catch (Exception ex) {
- logger.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex);
+ log.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index d4761b04a9..0d6eabfad1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -75,17 +75,14 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
/**
* stream task execute
*/
+@Slf4j
public class StreamTaskExecuteRunnable implements Runnable {
- private static final Logger logger = LoggerFactory.getLogger(StreamTaskExecuteRunnable.class);
-
protected MasterConfig masterConfig;
protected ProcessService processService;
@@ -168,13 +165,13 @@ public class StreamTaskExecuteRunnable implements Runnable {
dispatcher.dispatch(executionContext);
dispatchSuccess = true;
} catch (ExecuteException e) {
- logger.error("Master dispatch task to worker error, taskInstanceId: {}, worker: {}",
+ log.error("Master dispatch task to worker error, taskInstanceId: {}, worker: {}",
taskInstance.getId(),
executionContext.getHost(),
e);
}
if (!dispatchSuccess) {
- logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
+ log.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
taskInstance.getId(),
executionContext.getHost());
@@ -187,7 +184,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
// set started flag
taskRunnableStatus = TaskRunnableStatus.STARTED;
- logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
+ log.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
taskInstance.getId(),
executionContext.getHost());
}
@@ -198,7 +195,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
public boolean addTaskEvent(TaskEvent taskEvent) {
if (taskInstance.getId() != taskEvent.getTaskInstanceId()) {
- logger.info("state event would be abounded, taskInstanceId:{}, eventType:{}, state:{}",
+ log.info("state event would be abounded, taskInstanceId:{}, eventType:{}, state:{}",
taskEvent.getTaskInstanceId(), taskEvent.getEvent(), taskEvent.getState());
return false;
}
@@ -215,7 +212,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
*/
public void handleEvents() {
if (!isStart()) {
- logger.info(
+ log.info(
"The stream task instance is not started, will not handle its state event, current state event size: {}",
taskEvents.size());
return;
@@ -226,23 +223,23 @@ public class StreamTaskExecuteRunnable implements Runnable {
taskEvent = this.taskEvents.peek();
LoggerUtils.setTaskInstanceIdMDC(taskEvent.getTaskInstanceId());
- logger.info("Begin to handle state event, {}", taskEvent);
+ log.info("Begin to handle state event, {}", taskEvent);
if (this.handleTaskEvent(taskEvent)) {
this.taskEvents.remove(taskEvent);
}
} catch (StateEventHandleError stateEventHandleError) {
- logger.error("State event handle error, will remove this event: {}", taskEvent, stateEventHandleError);
+ log.error("State event handle error, will remove this event: {}", taskEvent, stateEventHandleError);
this.taskEvents.remove(taskEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleException stateEventHandleException) {
- logger.error("State event handle error, will retry this event: {}",
+ log.error("State event handle error, will retry this event: {}",
taskEvent,
stateEventHandleException);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep
// in the stateEvents queue.
- logger.error("State event handle error, get a unknown exception, will retry this event: {}",
+ log.error("State event handle error, get a unknown exception, will retry this event: {}",
taskEvent,
e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
@@ -339,7 +336,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
// verify tenant is null
if (tenant == null) {
- logger.error("tenant not exists,task instance id : {}", taskInstance.getId());
+ log.error("tenant not exists,task instance id : {}", taskInstance.getId());
return null;
}
@@ -429,7 +426,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
if (taskInstance.getState().isFinished()) {
streamTaskInstanceExecCacheManager.removeByTaskInstanceId(taskInstance.getId());
- logger.info("The stream task instance is finish, taskInstanceId:{}, state:{}", taskInstance.getId(),
+ log.info("The stream task instance is finish, taskInstanceId:{}, state:{}", taskInstance.getId(),
taskEvent.getState());
}
@@ -439,7 +436,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
private void measureTaskState(TaskEvent taskEvent) {
if (taskEvent == null || taskEvent.getState() == null) {
// the event is broken
- logger.warn("The task event is broken..., taskEvent: {}", taskEvent);
+ log.warn("The task event is broken..., taskEvent: {}", taskEvent);
return;
}
if (taskEvent.getState().isFinished()) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteThreadPool.java
index 29977317c4..7c9508f5ed 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteThreadPool.java
@@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import javax.annotation.PostConstruct;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
@@ -34,10 +34,9 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
* Used to execute {@link StreamTaskExecuteRunnable}.
*/
@Component
+@Slf4j
public class StreamTaskExecuteThreadPool extends ThreadPoolTaskExecutor {
- private static final Logger logger = LoggerFactory.getLogger(StreamTaskExecuteThreadPool.class);
-
@Autowired
private MasterConfig masterConfig;
@@ -63,14 +62,14 @@ public class StreamTaskExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onFailure(Throwable ex) {
LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
- logger.error("Stream task instance events handle failed", ex);
+ log.error("Stream task instance events handle failed", ex);
LoggerUtils.removeTaskInstanceIdMDC();
}
@Override
public void onSuccess(Object result) {
LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
- logger.info("Stream task instance is finished.");
+ log.info("Stream task instance is finished.");
LoggerUtils.removeTaskInstanceIdMDC();
}
});
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
index 3466b92bc3..eb978bc766 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
@@ -35,16 +35,15 @@ import java.util.Map;
import javax.annotation.PostConstruct;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
+@Slf4j
public class WorkflowEventLooper extends BaseDaemonThread {
- private final Logger logger = LoggerFactory.getLogger(WorkflowEventLooper.class);
-
@Autowired
private WorkflowEventQueue workflowEventQueue;
@@ -66,9 +65,9 @@ public class WorkflowEventLooper extends BaseDaemonThread {
@Override
public synchronized void start() {
- logger.info("WorkflowEventLooper thread starting");
+ log.info("WorkflowEventLooper thread starting");
super.start();
- logger.info("WorkflowEventLooper thread started");
+ log.info("WorkflowEventLooper thread started");
}
public void run() {
@@ -77,25 +76,25 @@ public class WorkflowEventLooper extends BaseDaemonThread {
try {
workflowEvent = workflowEventQueue.poolEvent();
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
- logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
+ log.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
WorkflowEventHandler workflowEventHandler =
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
workflowEventHandler.handleWorkflowEvent(workflowEvent);
} catch (InterruptedException e) {
- logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
+ log.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
Thread.currentThread().interrupt();
break;
} catch (WorkflowEventHandleException workflowEventHandleException) {
- logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
+ log.error("Handle workflow event failed, will add this event to event queue again, event: {}",
workflowEvent, workflowEventHandleException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (WorkflowEventHandleError workflowEventHandleError) {
- logger.error("Handle workflow event error, will drop this event, event: {}",
+ log.error("Handle workflow event error, will drop this event, event: {}",
workflowEvent,
workflowEventHandleError);
} catch (Exception unknownException) {
- logger.error(
+ log.error(
"Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
workflowEvent, unknownException);
workflowEventQueue.addEvent(workflowEvent);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7c75a6517f..5ebbf8c8ff 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -119,9 +119,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import com.google.common.collect.Lists;
@@ -130,10 +129,9 @@ import com.google.common.collect.Sets;
/**
* Workflow execute task, used to execute a workflow instance.
*/
+@Slf4j
public class WorkflowExecuteRunnable implements Callable {
- private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
-
private final ProcessService processService;
private final CommandService commandService;
@@ -290,7 +288,7 @@ public class WorkflowExecuteRunnable implements Callable {
*/
public void handleEvents() {
if (!isStart()) {
- logger.info(
+ log.info(
"The workflow instance is not started, will not handle its state event, current state event size: {}",
stateEvents);
return;
@@ -312,21 +310,21 @@ public class WorkflowExecuteRunnable implements Callable {
StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
.orElseThrow(() -> new StateEventHandleError(
"Cannot find handler for the given state event"));
- logger.info("Begin to handle state event, {}", stateEvent);
+ log.info("Begin to handle state event, {}", stateEvent);
if (stateEventHandler.handleStateEvent(this, stateEvent)) {
this.stateEvents.remove(stateEvent);
}
} catch (StateEventHandleError stateEventHandleError) {
- logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
+ log.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
this.stateEvents.remove(stateEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleException stateEventHandleException) {
- logger.error("State event handle error, will retry this event: {}",
+ log.error("State event handle error, will retry this event: {}",
stateEvent,
stateEventHandleException);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleFailure stateEventHandleFailure) {
- logger.error("State event handle failed, will move event to the tail: {}",
+ log.error("State event handle failed, will move event to the tail: {}",
stateEvent,
stateEventHandleFailure);
this.stateEvents.remove(stateEvent);
@@ -335,7 +333,7 @@ public class WorkflowExecuteRunnable implements Callable {
} catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep
// in the stateEvents queue.
- logger.error("State event handle error, get a unknown exception, will retry this event: {}",
+ log.error("State event handle error, get a unknown exception, will retry this event: {}",
stateEvent,
e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
@@ -359,7 +357,7 @@ public class WorkflowExecuteRunnable implements Callable {
public boolean addStateEvent(StateEvent stateEvent) {
if (processInstance.getId() != stateEvent.getProcessInstanceId()) {
- logger.info("state event would be abounded :{}", stateEvent);
+ log.info("state event would be abounded :{}", stateEvent);
return false;
}
this.stateEvents.add(stateEvent);
@@ -377,29 +375,29 @@ public class WorkflowExecuteRunnable implements Callable {
public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
- logger.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId());
+ log.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId());
TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
- logger.info("Success force start taskGroupQueue: {}", taskGroupQueue.getId());
+ log.info("Success force start taskGroupQueue: {}", taskGroupQueue.getId());
return true;
}
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
- logger.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId());
+ log.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId());
boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
if (acquireTaskGroup) {
TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH);
- logger.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId());
+ log.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId());
return true;
}
- logger.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId());
+ log.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId());
return false;
} else {
- logger.info(
+ log.info(
"Failed to wake up the taskGroupQueue: {}, since the taskGroupQueue is not in queue, will no need to wake up.",
taskGroupQueue);
return true;
@@ -417,7 +415,7 @@ public class WorkflowExecuteRunnable implements Callable {
}
public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
- logger.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState());
+ log.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState());
try {
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
@@ -439,7 +437,7 @@ public class WorkflowExecuteRunnable implements Callable {
}
} else if (taskInstance.taskCanRetry() && !processInstance.getState().isReadyStop()) {
// retry task
- logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
+ log.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().isFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@@ -458,12 +456,12 @@ public class WorkflowExecuteRunnable implements Callable {
// todo: when the task instance type is pause, then it should not in completeTaskMap
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
- logger.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}",
+ log.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}",
taskInstance.getTaskCode(),
taskInstance.getState());
this.updateProcessInstanceState();
} catch (Exception ex) {
- logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex);
+ log.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex);
// remove the task from complete map, so that we can finish in the next time.
completeTaskMap.remove(taskInstance.getTaskCode());
throw ex;
@@ -505,14 +503,14 @@ public class WorkflowExecuteRunnable implements Callable {
}
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
- logger.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}",
+ log.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}",
taskInstance.getTaskCode(),
taskInstance.getId());
return;
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
- logger.info(
+ log.info(
"Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}",
processInstance.getId(), newTaskInstance.getTaskCode(),
newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(),
@@ -530,7 +528,7 @@ public class WorkflowExecuteRunnable implements Callable {
* update process instance
*/
public void refreshProcessInstance(int processInstanceId) {
- logger.info("process instance update: {}", processInstanceId);
+ log.info("process instance update: {}", processInstanceId);
ProcessInstance newProcessInstance = processService.findProcessInstanceById(processInstanceId);
// just update the processInstance field(this is soft copy)
BeanUtils.copyProperties(newProcessInstance, processInstance);
@@ -544,10 +542,10 @@ public class WorkflowExecuteRunnable implements Callable {
* update task instance
*/
public void refreshTaskInstance(int taskInstanceId) {
- logger.info("task instance update: {} ", taskInstanceId);
+ log.info("task instance update: {} ", taskInstanceId);
TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId);
if (taskInstance == null) {
- logger.error("can not find task instance, id:{}", taskInstanceId);
+ log.error("can not find task instance, id:{}", taskInstanceId);
return;
}
processService.packageTaskInstance(taskInstance, processInstance);
@@ -628,7 +626,7 @@ public class WorkflowExecuteRunnable implements Callable {
public void processBlock() {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendProcessBlockingAlert(processInstance, projectUser);
- logger.info("processInstance {} block alert send successful!", processInstance.getId());
+ log.info("processInstance {} block alert send successful!", processInstance.getId());
}
public boolean processComplementData() {
@@ -648,23 +646,23 @@ public class WorkflowExecuteRunnable implements Callable {
} else if (processInstance.getState().isFinished()) {
endProcess();
if (complementListDate.isEmpty()) {
- logger.info("process complement end. process id:{}", processInstance.getId());
+ log.info("process complement end. process id:{}", processInstance.getId());
return true;
}
int index = complementListDate.indexOf(scheduleDate);
if (index >= complementListDate.size() - 1 || !processInstance.getState().isSuccess()) {
- logger.info("process complement end. process id:{}", processInstance.getId());
+ log.info("process complement end. process id:{}", processInstance.getId());
// complement data ends || no success
return true;
}
- logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
+ log.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
processInstance.getId(), processInstance.getScheduleTime(), complementListDate);
scheduleDate = complementListDate.get(index + 1);
}
// the next process complement
int create = this.createComplementDataCommand(scheduleDate);
if (create > 0) {
- logger.info("create complement data command successfully.");
+ log.info("create complement data command successfully.");
}
return true;
}
@@ -719,7 +717,7 @@ public class WorkflowExecuteRunnable implements Callable {
public WorkflowSubmitStatue call() {
if (isStart()) {
// This case should not been happened
- logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
+ log.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
}
@@ -728,21 +726,21 @@ public class WorkflowExecuteRunnable implements Callable {
if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
buildFlowDag();
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
- logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
+ log.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
initTaskQueue();
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
- logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
+ log.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
submitPostNode(null);
workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
- logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
+ log.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
return WorkflowSubmitStatue.SUCCESS;
} catch (Exception e) {
- logger.error("Start workflow error", e);
+ log.error("Start workflow error", e);
return WorkflowSubmitStatue.FAILED;
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
@@ -834,12 +832,12 @@ public class WorkflowExecuteRunnable implements Callable {
ProcessDag processDag = generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeCodeList,
processInstance.getTaskDependType());
if (processDag == null) {
- logger.error("ProcessDag is null");
+ log.error("ProcessDag is null");
return;
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
- logger.info("Build dag success, dag: {}", dag);
+ log.info("Build dag success, dag: {}", dag);
}
/**
@@ -854,7 +852,7 @@ public class WorkflowExecuteRunnable implements Callable {
errorTaskMap.clear();
if (!isNewProcessInstance()) {
- logger.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}",
+ log.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}",
processInstance.getRunTimes(),
processInstance.getRecovery());
List validTaskInstanceList =
@@ -863,12 +861,12 @@ public class WorkflowExecuteRunnable implements Callable {
for (TaskInstance task : validTaskInstanceList) {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
- logger.info(
+ log.info(
"Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}",
task.getTaskCode(),
task.getState());
if (validTaskMap.containsKey(task.getTaskCode())) {
- logger.warn(
+ log.warn(
"Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
task.getTaskCode());
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
@@ -884,7 +882,7 @@ public class WorkflowExecuteRunnable implements Callable {
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
- logger.info("TaskInstance is already complete.");
+ log.info("TaskInstance is already complete.");
completeTaskMap.put(task.getTaskCode(), task.getId());
continue;
}
@@ -894,7 +892,7 @@ public class WorkflowExecuteRunnable implements Callable {
}
if (task.taskCanRetry()) {
if (task.getState().isNeedFaultTolerance()) {
- logger.info("TaskInstance needs fault tolerance, will be added to standby list.");
+ log.info("TaskInstance needs fault tolerance, will be added to standby list.");
task.setFlag(Flag.NO);
taskInstanceDao.updateTaskInstance(task);
@@ -902,7 +900,7 @@ public class WorkflowExecuteRunnable implements Callable {
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
addTaskToStandByList(tolerantTaskInstance);
} else {
- logger.info("Retry taskInstance, taskState: {}", task.getState());
+ log.info("Retry taskInstance, taskState: {}", task.getState());
retryTaskInstance(task);
}
continue;
@@ -916,7 +914,7 @@ public class WorkflowExecuteRunnable implements Callable {
}
clearDataIfExecuteTask();
} else {
- logger.info("The current workflowInstance is a newly running workflowInstance");
+ log.info("The current workflowInstance is a newly running workflowInstance");
}
if (processInstance.isComplementData() && complementListDate.isEmpty()) {
@@ -941,7 +939,7 @@ public class WorkflowExecuteRunnable implements Callable {
if (cmdParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
complementListDate = CronUtils.getSelfScheduleDateList(cmdParam);
}
- logger.info(" process definition code:{} complement data: {}",
+ log.info(" process definition code:{} complement data: {}",
processInstance.getProcessDefinitionCode(), complementListDate);
if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) {
@@ -958,7 +956,7 @@ public class WorkflowExecuteRunnable implements Callable {
}
}
}
- logger.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}",
+ log.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}",
dependFailedTaskSet,
completeTaskMap,
errorTaskMap);
@@ -985,7 +983,7 @@ public class WorkflowExecuteRunnable implements Callable {
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
- logger.error("Submit standby task failed!, taskCode: {}, taskName: {}",
+ log.error("Submit standby task failed!, taskCode: {}, taskName: {}",
taskInstance.getTaskCode(),
taskInstance.getName());
return Optional.empty();
@@ -1019,7 +1017,7 @@ public class WorkflowExecuteRunnable implements Callable {
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
- logger.info(
+ log.info(
"Submitted task will not be dispatch right now because the first time to try to acquire" +
" task group failed, taskInstanceName: {}, taskGroupId: {}",
taskInstance.getName(), taskGroupId);
@@ -1029,7 +1027,7 @@ public class WorkflowExecuteRunnable implements Callable {
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
- logger.error("Dispatch standby process {} task {} failed", processInstance.getName(),
+ log.error("Dispatch standby process {} task {} failed", processInstance.getName(),
taskInstance.getName());
return Optional.empty();
}
@@ -1058,7 +1056,7 @@ public class WorkflowExecuteRunnable implements Callable {
}
return Optional.of(taskInstance);
} catch (Exception e) {
- logger.error("Submit standby task {} error, taskCode: {}", taskInstance.getName(),
+ log.error("Submit standby task {} error, taskCode: {}", taskInstance.getName(),
taskInstance.getTaskCode(), e);
return Optional.empty();
} finally {
@@ -1079,7 +1077,7 @@ public class WorkflowExecuteRunnable implements Callable {
nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command());
} catch (Exception e) {
// Do we need to catch this exception?
- logger.error("notify process host update", e);
+ log.error("notify process host update", e);
}
}
@@ -1125,7 +1123,7 @@ public class WorkflowExecuteRunnable implements Callable {
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
if (taskNode == null) {
- logger.error("Clone retry taskInstance error because taskNode is null, taskCode:{}",
+ log.error("Clone retry taskInstance error because taskNode is null, taskCode:{}",
taskInstance.getTaskCode());
return null;
}
@@ -1153,7 +1151,7 @@ public class WorkflowExecuteRunnable implements Callable