Browse Source

[DS-12131][master] Optimize the log printing of the master module acc… (#12152)

* [DS-12131][master] Optimize the log printing of the master module according to the log specification.
3.2.0-release
sgw 2 years ago committed by GitHub
parent
commit
165b9a58de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  2. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  3. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
  4. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
  5. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
  6. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
  7. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
  8. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
  9. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
  10. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  11. 66
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  12. 19
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  13. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  14. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  15. 11
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -128,12 +128,14 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
logger.info("{} tasks dispatch failed, will retry to dispatch", failedDispatchTasks.size());
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher.
if (fetchTaskNum == failedDispatchTasks.size()) {
logger.info("All tasks dispatch failed, will sleep a while to avoid the master cpu higher");
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
@ -209,6 +211,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to dispatch anymore
logger.info("Task {} is already finished, no need to dispatch, task instance id: {}",
taskInstance.getName(), taskInstance.getId());
return true;
}
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -114,7 +114,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
// is not belongs to the down worker ISSUE-10842.
context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
logger.error(String.format("execute command : %s error", command), ex);
logger.error("Execute command {} error", command, ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
@ -156,7 +156,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
logger.error("Send command to {} error, command: {}", host, command, ex);
retryCount--;
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java

@ -22,17 +22,23 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
@AutoService(StateEventHandler.class)
public class TaskRetryStateEventHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(TaskRetryStateEventHandler.class);
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
StateEvent stateEvent) throws StateEventHandleException {
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
logger.info("Handle task instance retry event, taskInstanceId: {}", taskStateEvent.getTaskInstanceId());
TaskMetrics.incTaskInstanceByState("retry");
Map<Long, TaskInstance> waitToRetryTaskInstanceMap = workflowExecuteRunnable.getWaitToRetryTaskInstanceMap();
TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(taskStateEvent.getTaskCode());

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java

@ -54,6 +54,10 @@ public class TaskStateEventHandler implements StateEventHandler {
throw new StateEventHandleError("Task state event handle error due to task state is null");
}
logger.info(
"Handle task instance state event, the current task instance state {} will be changed to {}",
task.getState(), taskStateEvent.getStatus());
Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();
if (task.getState().isFinished()) {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java

@ -50,6 +50,8 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler {
"Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
taskStateEvent.getTaskInstanceId())));
logger.info("Handle task instance state timout event, taskInstanceId: {}", taskStateEvent.getTaskInstanceId());
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java

@ -21,11 +21,17 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import com.google.auto.service.AutoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@AutoService(StateEventHandler.class)
public class TaskWaitTaskGroupStateHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(TaskWaitTaskGroupStateHandler.class);
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
logger.info("Handle task instance wait task group event, taskInstanceId: {}", stateEvent.getTaskInstanceId());
return workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent);
}

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java

@ -38,6 +38,7 @@ public class WorkflowBlockStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleError {
logger.info("Handle workflow instance state block event");
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
if (!taskInstanceOptional.isPresent()) {

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java

@ -22,11 +22,17 @@ import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import com.google.auto.service.AutoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@AutoService(StateEventHandler.class)
public class WorkflowTimeoutStateEventHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(WorkflowTimeoutStateEventHandler.class);
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
logger.info("Handle workflow instance timeout event");
ProcessInstanceMetrics.incProcessInstanceByState("timeout");
workflowExecuteRunnable.processTimeout();
return true;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@ -136,12 +136,14 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
try {
if (!ServerLifeCycleManager.isRunning()) {
// the current server is not at running status, cannot consume command.
logger.warn("The current server {} is not at running status, cannot consumes commands.", this.masterAddress);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
// todo: if the workflow event queue is much, we need to handle the back pressure
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
logger.warn("The current server {} is overload, cannot consumes commands.", this.masterAddress);
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -120,13 +120,13 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
processInstanceTimeoutCheckList.add(processInstance.getId());
logger.info("Success add workflow instance into timeout check list");
logger.info("Success add workflow instance {} into timeout check list", processInstance.getId());
}
public void removeProcess4TimeoutCheck(int processInstanceId) {
boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
if (removeFlag) {
logger.info("Success remove workflow instance from timeout check list");
logger.info("Success remove workflow instance {} from timeout check list", processInstanceId);
}
}
@ -154,7 +154,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
(long) processInstance.getTimeout()
* Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
logger.info("Workflow instance timeout, adding timeout event");
logger.info("Workflow instance {} timeout, adding timeout event", processInstance.getId());
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
logger.info("Workflow instance timeout, added timeout event");

66
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -457,7 +457,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}",
logger.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}",
taskInstance.getTaskCode(),
taskInstance.getId());
return;
@ -465,7 +465,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
logger.info(
"failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
"Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}",
processInstance.getId(), newTaskInstance.getTaskCode(),
newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(),
newTaskInstance.getRetryInterval());
@ -789,7 +789,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
ProcessDag processDag = generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeCodeList,
processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
logger.error("ProcessDag is null");
return;
}
// generate process dag
@ -822,6 +822,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
task.getTaskCode(),
task.getState());
if (validTaskMap.containsKey(task.getTaskCode())) {
logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
task.getTaskCode());
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) {
@ -829,14 +831,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
processService.updateTaskInstance(task);
continue;
}
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
task.getTaskCode());
}
validTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
logger.info("TaskInstance is already complete.");
completeTaskMap.put(task.getTaskCode(), task.getId());
continue;
}
@ -846,6 +847,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
if (task.taskCanRetry()) {
if (task.getState().isNeedFaultTolerance()) {
logger.info("TaskInstance needs fault tolerance, will be added to standby list.");
task.setFlag(Flag.NO);
processService.updateTaskInstance(task);
@ -853,6 +855,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task);
addTaskToStandByList(tolerantTaskInstance);
} else {
logger.info("Retry taskInstance, taskState: {}", task.getState());
retryTaskInstance(task);
}
continue;
@ -934,15 +937,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(),
processInstance.getName(),
taskInstance.getId(),
logger.error("Submit standby task failed!, taskCode: {}, taskName: {}",
taskInstance.getTaskCode(),
taskInstance.getName());
return Optional.empty();
}
// in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
if (validTaskMap.containsKey(taskInstance.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode());
if (taskInstance.getId() != oldTaskInstanceId) {
@ -969,19 +971,16 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try to acquire task group failed",
taskInstance.getName());
logger.info("Submitted task will not be dispatch right now because the first time to try to acquire" +
" task group failed, taskInstanceName: {}, taskGroupId: {}",
taskInstance.getName(), taskGroupId);
return Optional.of(taskInstance);
}
}
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!",
processInstance.getId(),
processInstance.getName(),
taskInstance.getId(),
taskInstance.getName());
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), taskInstance.getName());
return Optional.empty();
}
taskProcessor.action(TaskAction.RUN);
@ -1009,11 +1008,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
return Optional.of(taskInstance);
} catch (Exception e) {
logger.error("submit standby task error, taskCode: {}, taskInstanceId: {}",
taskInstance.getTaskCode(),
taskInstance.getId(),
e);
logger.error("Submit standby task {} error, taskCode: {}", taskInstance.getName(),
taskInstance.getTaskCode(), e);
return Optional.empty();
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
@ -1076,7 +1075,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
if (taskNode == null) {
logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
logger.error("Clone retry taskInstance error because taskNode is null, taskCode:{}",
taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
@ -1103,7 +1103,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode()));
if (taskNode == null) {
logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
logger.error("Clone tolerant taskInstance error because taskNode is null, taskCode:{}",
taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
@ -1347,11 +1348,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
if (task.getId() != null && completeTaskMap.containsKey(task.getTaskCode())) {
logger.info("task {} has already run success", task.getName());
logger.info("Task has already run success, taskName: {}", task.getName());
continue;
}
if (task.getState().isKill()) {
logger.info("task {} stopped, the state is {}", task.getName(), task.getState());
logger.info("Task is be stopped, the state is {}, taskInstanceId: {}", task.getState(), task.getId());
continue;
}
@ -1403,8 +1404,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
}
logger.info("taskCode: {} completeDependTaskList: {}", taskCode,
Arrays.toString(completeTaskMap.keySet().toArray()));
logger.info("The dependTasks of task all success, currentTaskCode: {}, dependTaskCodes: {}",
taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
return DependResult.SUCCESS;
}
@ -1436,6 +1437,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
List<String> nextTaskList =
DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
if (!nextTaskList.contains(nextNodeName)) {
logger.info("DependTask is a condition task, and its next condition branch does not hava current task, " +
"dependTaskCode: {}, currentTaskCode: {}", dependNodeName, nextNodeName
);
return false;
}
} else {
@ -1719,10 +1723,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
public void addTaskToStandByList(TaskInstance taskInstance) {
if (readyToSubmitTaskQueue.contains(taskInstance)) {
logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
logger.warn("Task already exists in ready submit queue, no need to add again, task code:{}",
taskInstance.getTaskCode());
return;
}
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
logger.info("Add task to stand by list, task name:{}, task id:{}, task code:{}",
taskInstance.getName(),
taskInstance.getId(),
taskInstance.getTaskCode());
@ -1807,8 +1812,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying",
task.getName());
logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
task.getName(), task.getId());
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
@ -1824,6 +1829,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
logger.info("The dependResult of task {} is success, so ready to submit to execute", task.getName());
Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;

19
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -158,10 +158,17 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
ProcessInstance processInstance = entry.getKey();
TaskInstance taskInstance = entry.getValue();
String address = NetUtils.getAddr(masterConfig.getListenPort());
if (processInstance.getHost().equalsIgnoreCase(address)) {
this.notifyMyself(processInstance, taskInstance);
} else {
this.notifyProcess(finishProcessInstance, processInstance, taskInstance);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstance.getId());
if (processInstance.getHost().equalsIgnoreCase(address)) {
logger.info("Process host is local master, will notify it");
this.notifyMyself(processInstance, taskInstance);
} else {
logger.info("Process host is remote master, will notify it");
this.notifyProcess(finishProcessInstance, processInstance, taskInstance);
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
@ -190,8 +197,8 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
TaskInstance taskInstance) {
String processInstanceHost = processInstance.getHost();
if (Strings.isNullOrEmpty(processInstanceHost)) {
logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(),
taskInstance.getId());
logger.error("Process {} host is empty, cannot notify task {} now, taskId: {}", processInstance.getName(),
taskInstance.getName(), taskInstance.getId());
return;
}
WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -304,6 +304,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
logger.info("Task state changes to {}", TaskExecutionStatus.FAILURE);
taskInstance.setState(TaskExecutionStatus.FAILURE);
processService.saveTaskInstance(taskInstance);
return null;
@ -418,7 +419,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
int ruleId = dataQualityParameters.getRuleId();
DqRule dqRule = processService.getDqRule(ruleId);
if (dqRule == null) {
logger.error("can not get DqRule by id {}", ruleId);
logger.error("Can not get dataQuality rule by id {}", ruleId);
return;
}
@ -428,7 +429,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
List<DqRuleInputEntry> ruleInputEntryList = processService.getRuleInputEntry(ruleId);
if (CollectionUtils.isEmpty(ruleInputEntryList)) {
logger.error("{} rule input entry list is empty ", ruleId);
logger.error("Rule input entry list is empty, ruleId: {}", ruleId);
return;
}
List<DqRuleExecuteSql> executeSqlList = processService.getDqExecuteSql(ruleId);
@ -603,9 +604,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if (tenant == null) {
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
logger.error("Tenant does not exists");
return true;
}
return false;

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -102,18 +102,18 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
this.initQueue();
}
if (taskInstance.getState().isFinished()) {
logger.info("submit task , but task [{}] state [{}] is already finished. ", taskInstance.getName(),
taskInstance.getState());
logger.info("Task {} has already finished, no need to submit to task queue, taskState: {}",
taskInstance.getName(), taskInstance.getState());
return true;
}
// task cannot be submitted because its execution state is RUNNING or DELAY.
if (taskInstance.getState() == TaskExecutionStatus.RUNNING_EXECUTION
|| taskInstance.getState() == TaskExecutionStatus.DELAY_EXECUTION) {
logger.info("submit task, but the status of the task {} is already running or delayed.",
taskInstance.getName());
logger.info("Task {} is already running or delayed, no need to submit to task queue, taskState: {}",
taskInstance.getName(), taskInstance.getState());
return true;
}
logger.info("task ready to dispatch to worker: taskInstanceId: {}", taskInstance.getId());
logger.info("Task {} is ready to dispatch to worker", taskInstance.getName());
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
@ -122,17 +122,17 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
if (taskExecutionContext == null) {
logger.error("task get taskExecutionContext fail: {}", taskInstance);
logger.error("Get taskExecutionContext fail, task: {}", taskInstance);
return false;
}
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId());
logger.info("Task {} is submitted to priority queue success by master", taskInstance.getName());
return true;
} catch (Exception e) {
logger.error("submit task error", e);
logger.error("Task {} is submitted to priority queue error", taskInstance.getName(), e);
return false;
}
}

11
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java

@ -44,17 +44,6 @@ public class RpcTest {
userService = rpcClient.create(IUserService.class, host);
}
@Test
public void sendTest() {
Integer result = userService.hi(3);
Assert.assertSame(4, result);
result = userService.hi(4);
Assert.assertSame(5, result);
userService.say("sync");
userService.callBackIsFalse("async no call back");
userService.hi(999999);
}
@After
public void after() {
NettyClient.getInstance().close();

Loading…
Cancel
Save