From 5355927b79bb1b54b7a6b574726c0b349b01e8bd Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 24 Nov 2020 16:59:26 +0800 Subject: [PATCH 01/14] [FIX-#4083][server]fix taskInstance state change error Concurrent processing of ack message and result message causes the execution sequence to be wrong # this close # 4083 --- .../consumer/TaskPriorityQueueConsumer.java | 178 ++++---- .../processor/queue/TaskResponseService.java | 90 ++-- .../service/process/ProcessService.java | 405 +++++++++++------- 3 files changed, 389 insertions(+), 284 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index fb9f10d705..822e493076 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.master.consumer; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -33,10 +33,24 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.EnumUtils; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; -import org.apache.dolphinscheduler.server.entity.*; +import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskPriority; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -44,21 +58,27 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.Stream; - /** * TaskUpdateQueue consumer */ @Component -public class TaskPriorityQueueConsumer extends Thread{ +public class TaskPriorityQueueConsumer extends Thread { /** * logger of TaskUpdateQueueConsumer @@ -91,7 +111,7 @@ public class TaskPriorityQueueConsumer extends Thread{ private MasterConfig masterConfig; @PostConstruct - public void init(){ + public void init() { super.setName("TaskUpdateQueueConsumerThread"); super.start(); } @@ -99,12 +119,12 @@ public class TaskPriorityQueueConsumer extends Thread{ @Override public void run() { List failedDispatchTasks = new ArrayList<>(); - while (Stopper.isRunning()){ + while (Stopper.isRunning()) { try { int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); failedDispatchTasks.clear(); - for(int i = 0; i < fetchTaskNum; i++){ - if(taskPriorityQueue.size() <= 0){ + for (int i = 0; i < fetchTaskNum; i++) { + if (taskPriorityQueue.size() <= 0) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } @@ -112,62 +132,62 @@ public class TaskPriorityQueueConsumer extends Thread{ String taskPriorityInfo = taskPriorityQueue.take(); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); boolean dispatchResult = dispatch(taskPriority.getTaskId()); - if(!dispatchResult){ + if (!dispatchResult) { failedDispatchTasks.add(taskPriorityInfo); } } - for(String dispatchFailedTask : failedDispatchTasks){ + for (String dispatchFailedTask : failedDispatchTasks) { taskPriorityQueue.put(dispatchFailedTask); } - }catch (Exception e){ - logger.error("dispatcher task error",e); + } catch (Exception e) { + logger.error("dispatcher task error", e); } } } - /** * dispatch task * * @param taskInstanceId taskInstanceId * @return result */ - private boolean dispatch(int taskInstanceId){ + private boolean dispatch(int taskInstanceId) { boolean result = false; try { TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); - if (taskInstanceIsFinalState(taskInstanceId)){ + if (taskInstanceIsFinalState(taskInstanceId)) { // when task finish, ignore this task, there is no need to dispatch anymore return true; - }else{ + } else { result = dispatcher.dispatch(executionContext); } } catch (ExecuteException e) { - logger.error("dispatch error",e); + logger.error("dispatch error", e); } return result; } - /** * taskInstance is final state * success,failure,kill,stop,pause,threadwaiting is final state + * * @param taskInstanceId taskInstanceId * @return taskInstance is final state */ - public Boolean taskInstanceIsFinalState(int taskInstanceId){ + public Boolean taskInstanceIsFinalState(int taskInstanceId) { TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); return taskInstance.getState().typeIsFinished(); } /** * get TaskExecutionContext + * * @param taskInstanceId taskInstanceId * @return TaskExecutionContext */ - protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId){ + protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId) { TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId); // task type @@ -181,12 +201,12 @@ public class TaskPriorityQueueConsumer extends Thread{ // verify tenant is null if (verifyTenantIsNull(tenant, taskInstance)) { - processService.changeTaskState(ExecutionStatus.FAILURE, - taskInstance.getStartTime(), - taskInstance.getHost(), - null, - null, - taskInstance.getId()); + processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE, + taskInstance.getStartTime(), + taskInstance.getHost(), + null, + null, + taskInstance.getId()); return null; } // set queue for process instance, user-specified queue takes precedence over tenant queue @@ -196,50 +216,47 @@ public class TaskPriorityQueueConsumer extends Thread{ taskInstance.setExecutePath(getExecLocalPath(taskInstance)); taskInstance.setResources(getResourceFullNames(taskNode)); - SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext(); SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); - // SQL task - if (taskType == TaskType.SQL){ + if (taskType == TaskType.SQL) { setSQLTaskRelation(sqlTaskExecutionContext, taskNode); } // DATAX task - if (taskType == TaskType.DATAX){ + if (taskType == TaskType.DATAX) { setDataxTaskRelation(dataxTaskExecutionContext, taskNode); } - // procedure task - if (taskType == TaskType.PROCEDURE){ + if (taskType == TaskType.PROCEDURE) { setProcedureTaskRelation(procedureTaskExecutionContext, taskNode); } - if (taskType == TaskType.SQOOP){ - setSqoopTaskRelation(sqoopTaskExecutionContext,taskNode); + if (taskType == TaskType.SQOOP) { + setSqoopTaskRelation(sqoopTaskExecutionContext, taskNode); } - return TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) - .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) - .buildSQLTaskRelatedInfo(sqlTaskExecutionContext) - .buildDataxTaskRelatedInfo(dataxTaskExecutionContext) - .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext) - .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext) - .create(); + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) + .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) + .buildSQLTaskRelatedInfo(sqlTaskExecutionContext) + .buildDataxTaskRelatedInfo(dataxTaskExecutionContext) + .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext) + .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext) + .create(); } /** * set procedure task relation + * * @param procedureTaskExecutionContext procedureTaskExecutionContext - * @param taskNode taskNode + * @param taskNode taskNode */ private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) { ProcedureParameters procedureParameters = JSONUtils.parseObject(taskNode.getParams(), ProcedureParameters.class); @@ -250,8 +267,9 @@ public class TaskPriorityQueueConsumer extends Thread{ /** * set datax task relation + * * @param dataxTaskExecutionContext dataxTaskExecutionContext - * @param taskNode taskNode + * @param taskNode taskNode */ private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) { DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class); @@ -259,25 +277,24 @@ public class TaskPriorityQueueConsumer extends Thread{ DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); - - if (dataSource != null){ + if (dataSource != null) { dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); } - if (dataTarget != null){ + if (dataTarget != null) { dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); } } - /** * set sqoop task relation + * * @param sqoopTaskExecutionContext sqoopTaskExecutionContext - * @param taskNode taskNode + * @param taskNode taskNode */ private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { SqoopParameters sqoopParameters = JSONUtils.parseObject(taskNode.getParams(), SqoopParameters.class); @@ -290,13 +307,13 @@ public class TaskPriorityQueueConsumer extends Thread{ DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); - if (dataSource != null){ + if (dataSource != null) { sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); } - if (dataTarget != null){ + if (dataTarget != null) { sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); @@ -306,8 +323,9 @@ public class TaskPriorityQueueConsumer extends Thread{ /** * set SQL task relation + * * @param sqlTaskExecutionContext sqlTaskExecutionContext - * @param taskNode taskNode + * @param taskNode taskNode */ private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) { SqlParameters sqlParameters = JSONUtils.parseObject(taskNode.getParams(), SqlParameters.class); @@ -317,20 +335,20 @@ public class TaskPriorityQueueConsumer extends Thread{ // whether udf type boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) - && StringUtils.isNotEmpty(sqlParameters.getUdfs()); + && StringUtils.isNotEmpty(sqlParameters.getUdfs()); - if (udfTypeFlag){ + if (udfTypeFlag) { String[] udfFunIds = sqlParameters.getUdfs().split(","); int[] udfFunIdsArray = new int[udfFunIds.length]; - for(int i = 0 ; i < udfFunIds.length;i++){ - udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]); + for (int i = 0; i < udfFunIds.length; i++) { + udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]); } List udfFuncList = processService.queryUdfFunListByIds(udfFunIdsArray); - Map udfFuncMap = new HashMap<>(); - for(UdfFunc udfFunc : udfFuncList) { + Map udfFuncMap = new HashMap<>(); + for (UdfFunc udfFunc : udfFuncList) { String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF); - udfFuncMap.put(udfFunc,tenantCode); + udfFuncMap.put(udfFunc, tenantCode); } sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap); @@ -342,25 +360,25 @@ public class TaskPriorityQueueConsumer extends Thread{ * * @return execute local path */ - private String getExecLocalPath(TaskInstance taskInstance){ + private String getExecLocalPath(TaskInstance taskInstance) { return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId()); + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); } - /** - * whehter tenant is null - * @param tenant tenant + * whehter tenant is null + * + * @param tenant tenant * @param taskInstance taskInstance * @return result */ private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { - if(tenant == null){ + if (tenant == null) { logger.error("tenant not exists,process instance id : {},task instance id : {}", - taskInstance.getProcessInstance().getId(), - taskInstance.getId()); + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); return true; } return false; @@ -369,8 +387,8 @@ public class TaskPriorityQueueConsumer extends Thread{ /** * get resource map key is full name and value is tenantCode */ - private Map getResourceFullNames(TaskNode taskNode) { - Map resourceMap = new HashMap<>(); + private Map getResourceFullNames(TaskNode taskNode) { + Map resourceMap = new HashMap<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { @@ -382,7 +400,7 @@ public class TaskPriorityQueueConsumer extends Thread{ if (CollectionUtils.isNotEmpty(oldVersionResources)) { oldVersionResources.forEach( - (t)->resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)) + (t) -> resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)) ); } @@ -395,7 +413,7 @@ public class TaskPriorityQueueConsumer extends Thread{ List resources = processService.listResourceByIds(resourceIds); resources.forEach( - (t)->resourceMap.put(t.getFullName(),processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)) + (t) -> resourceMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)) ); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 65315c7bd9..de746b05aa 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.processor.queue; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; @@ -25,18 +24,22 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import io.netty.channel.Channel; + /** * task manager */ @@ -65,21 +68,20 @@ public class TaskResponseService { */ private Thread taskResponseWorker; - @PostConstruct - public void start(){ + public void start() { this.taskResponseWorker = new TaskResponseWorker(); this.taskResponseWorker.setName("TaskResponseWorker"); this.taskResponseWorker.start(); } @PreDestroy - public void stop(){ + public void stop() { this.taskResponseWorker.interrupt(); - if(!eventQueue.isEmpty()){ + if (!eventQueue.isEmpty()) { List remainEvents = new ArrayList<>(eventQueue.size()); eventQueue.drainTo(remainEvents); - for(TaskResponseEvent event : remainEvents){ + for (TaskResponseEvent event : remainEvents) { this.persist(event); } } @@ -90,16 +92,15 @@ public class TaskResponseService { * * @param taskResponseEvent taskResponseEvent */ - public void addResponse(TaskResponseEvent taskResponseEvent){ + public void addResponse(TaskResponseEvent taskResponseEvent) { try { eventQueue.put(taskResponseEvent); } catch (InterruptedException e) { - logger.error("put task : {} error :{}", taskResponseEvent,e); + logger.error("put task : {} error :{}", taskResponseEvent, e); Thread.currentThread().interrupt(); } } - /** * task worker thread */ @@ -108,16 +109,16 @@ public class TaskResponseService { @Override public void run() { - while (Stopper.isRunning()){ + while (Stopper.isRunning()) { try { // if not task , blocking here TaskResponseEvent taskResponseEvent = eventQueue.take(); persist(taskResponseEvent); - } catch (InterruptedException e){ + } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; - } catch (Exception e){ - logger.error("persist task error",e); + } catch (Exception e) { + logger.error("persist task error", e); } } logger.info("TaskResponseWorker stopped"); @@ -126,51 +127,52 @@ public class TaskResponseService { /** * persist taskResponseEvent + * * @param taskResponseEvent taskResponseEvent */ - private void persist(TaskResponseEvent taskResponseEvent){ + private void persist(TaskResponseEvent taskResponseEvent) { Event event = taskResponseEvent.getEvent(); Channel channel = taskResponseEvent.getChannel(); - switch (event){ + switch (event) { case ACK: try { TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - if (taskInstance != null){ - processService.changeTaskState(taskResponseEvent.getState(), - taskResponseEvent.getStartTime(), - taskResponseEvent.getWorkerAddress(), - taskResponseEvent.getExecutePath(), - taskResponseEvent.getLogPath(), - taskResponseEvent.getTaskInstanceId()); + if (taskInstance != null && ExecutionStatus.RUNNING_EXECUTION.getCode() >= taskInstance.getState().getCode()) { + processService.changeTaskState(taskInstance, taskResponseEvent.getState(), + taskResponseEvent.getStartTime(), + taskResponseEvent.getWorkerAddress(), + taskResponseEvent.getExecutePath(), + taskResponseEvent.getLogPath(), + taskResponseEvent.getTaskInstanceId()); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId()); + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); channel.writeAndFlush(taskAckCommand.convert2Command()); - }catch (Exception e){ - logger.error("worker ack master error",e); - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1); + } catch (Exception e) { + logger.error("worker ack master error", e); + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1); channel.writeAndFlush(taskAckCommand.convert2Command()); } break; case RESULT: try { TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - if (taskInstance != null){ - processService.changeTaskState(taskResponseEvent.getState(), - taskResponseEvent.getEndTime(), - taskResponseEvent.getProcessId(), - taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId(), - taskResponseEvent.getVarPool() - ); + if (taskInstance != null) { + processService.changeTaskState(taskInstance, taskResponseEvent.getState(), + taskResponseEvent.getEndTime(), + taskResponseEvent.getProcessId(), + taskResponseEvent.getAppIds(), + taskResponseEvent.getTaskInstanceId(), + taskResponseEvent.getVarPool() + ); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId()); + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); channel.writeAndFlush(taskResponseCommand.convert2Command()); - }catch (Exception e){ - logger.error("worker response master error",e); - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1); + } catch (Exception e) { + logger.error("worker response master error", e); + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1); channel.writeAndFlush(taskResponseCommand.convert2Command()); } break; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 7160168188..9aa9a2ff9a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -79,8 +79,6 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; -import org.apache.commons.lang.ArrayUtils; - import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -93,6 +91,8 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang.ArrayUtils; + import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,11 +111,11 @@ public class ProcessService { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.DELAY_EXECUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal()}; + private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal()}; @Autowired private UserMapper userMapper; @@ -154,14 +154,15 @@ public class ProcessService { private TenantMapper tenantMapper; @Autowired - private ProjectMapper projectMapper; + private ProjectMapper projectMapper; /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction - * @param logger logger - * @param host host + * + * @param logger logger + * @param host host * @param validThreadNum validThreadNum - * @param command found command + * @param command found command * @return process instance */ @Transactional(rollbackFor = Exception.class) @@ -187,6 +188,7 @@ public class ProcessService { /** * save error command, and delete original command + * * @param command command * @param message message */ @@ -199,7 +201,8 @@ public class ProcessService { /** * set process waiting thread - * @param command command + * + * @param command command * @param processInstance processInstance * @return process instance */ @@ -216,7 +219,8 @@ public class ProcessService { /** * check thread num - * @param command command + * + * @param command command * @param validThreadNum validThreadNum * @return if thread is enough */ @@ -227,6 +231,7 @@ public class ProcessService { /** * insert one command + * * @param command command * @return create result */ @@ -240,6 +245,7 @@ public class ProcessService { /** * find one command from queue list + * * @return command */ public Command findOneCommand() { @@ -248,15 +254,16 @@ public class ProcessService { /** * check the input command exists in queue list + * * @param command command * @return create command result */ public Boolean verifyIsNeedCreateCommand(Command command) { Boolean isNeedCreate = true; - Map cmdTypeMap = new HashMap(); - cmdTypeMap.put(CommandType.REPEAT_RUNNING,1); - cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS,1); - cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS,1); + Map cmdTypeMap = new HashMap(); + cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1); + cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1); + cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1); CommandType commandType = command.getCommandType(); if (cmdTypeMap.containsKey(commandType)) { @@ -265,7 +272,7 @@ public class ProcessService { List commands = commandMapper.selectList(null); // for all commands - for (Command tmpCommand:commands) { + for (Command tmpCommand : commands) { if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) { ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam()); if (tempObj != null && processInstanceId == tempObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt()) { @@ -275,11 +282,12 @@ public class ProcessService { } } } - return isNeedCreate; + return isNeedCreate; } /** * find process instance detail by id + * * @param processId processId * @return process instance */ @@ -289,6 +297,7 @@ public class ProcessService { /** * get task node list by definitionId + * * @param defineId * @return */ @@ -313,6 +322,7 @@ public class ProcessService { /** * find process instance by id + * * @param processId processId * @return process instance */ @@ -322,6 +332,7 @@ public class ProcessService { /** * find process define by id. + * * @param processDefinitionId processDefinitionId * @return process definition */ @@ -331,6 +342,7 @@ public class ProcessService { /** * delete work process instance by id + * * @param processInstanceId processInstanceId * @return delete process instance result */ @@ -340,6 +352,7 @@ public class ProcessService { /** * delete all sub process by parent instance id + * * @param processInstanceId processInstanceId * @return delete all sub process instance result */ @@ -358,9 +371,10 @@ public class ProcessService { /** * remove task log file + * * @param processInstanceId processInstanceId */ - public void removeTaskLogFile(Integer processInstanceId){ + public void removeTaskLogFile(Integer processInstanceId) { LogClientService logClient = null; @@ -389,7 +403,7 @@ public class ProcessService { // remove task log from loggerserver logClient.removeTaskLog(ip, port, taskLogPath); } - }finally { + } finally { if (logClient != null) { logClient.close(); } @@ -398,6 +412,7 @@ public class ProcessService { /** * calculate sub process number in the process define. + * * @param processDefinitionId processDefinitionId * @return process thread num count */ @@ -409,8 +424,9 @@ public class ProcessService { /** * recursive query sub process definition id by parent id. + * * @param parentId parentId - * @param ids ids + * @param ids ids */ public void recurseFindSubProcessId(int parentId, List ids) { ProcessDefinition processDefinition = processDefineMapper.selectById(parentId); @@ -428,7 +444,7 @@ public class ProcessService { if (parameterJson.get(CMDPARAM_SUB_PROCESS_DEFINE_ID) != null) { SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class); ids.add(subProcessParam.getProcessDefinitionId()); - recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(),ids); + recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids); } } @@ -440,7 +456,8 @@ public class ProcessService { * sub work process instance need not to create recovery command. * create recovery waiting thread command and delete origin command at the same time. * if the recovery command is exists, only update the field update_time - * @param originCommand originCommand + * + * @param originCommand originCommand * @param processInstance processInstance */ public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { @@ -457,17 +474,17 @@ public class ProcessService { // process instance quit by "waiting thread" state if (originCommand == null) { Command command = new Command( - CommandType.RECOVER_WAITTING_THREAD, - processInstance.getTaskDependType(), - processInstance.getFailureStrategy(), - processInstance.getExecutorId(), - processInstance.getProcessDefinitionId(), - JSONUtils.toJsonString(cmdParam), - processInstance.getWarningType(), - processInstance.getWarningGroupId(), - processInstance.getScheduleTime(), - processInstance.getWorkerGroup(), - processInstance.getProcessInstancePriority() + CommandType.RECOVER_WAITTING_THREAD, + processInstance.getTaskDependType(), + processInstance.getFailureStrategy(), + processInstance.getExecutorId(), + processInstance.getProcessDefinitionId(), + JSONUtils.toJsonString(cmdParam), + processInstance.getWarningType(), + processInstance.getWarningGroupId(), + processInstance.getScheduleTime(), + processInstance.getWorkerGroup(), + processInstance.getProcessInstancePriority() ); saveCommand(command); return; @@ -491,7 +508,8 @@ public class ProcessService { /** * get schedule time from command - * @param command command + * + * @param command command * @param cmdParam cmdParam map * @return date */ @@ -507,9 +525,10 @@ public class ProcessService { /** * generate a new work process instance from command. + * * @param processDefinition processDefinition - * @param command command - * @param cmdParam cmdParam map + * @param command command + * @param cmdParam cmdParam map * @return process instance */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, @@ -543,10 +562,10 @@ public class ProcessService { processInstance.setConnects(processDefinition.getConnects()); // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + getCommandTypeIfComplement(processInstance, command), + processInstance.getScheduleTime())); //copy process define json to process instance processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); @@ -564,8 +583,9 @@ public class ProcessService { * there is tenant id in definition, use the tenant of the definition. * if there is not tenant id in the definiton or the tenant not exist * use definition creator's tenant. + * * @param tenantId tenantId - * @param userId userId + * @param userId userId * @return tenant */ public Tenant getTenantForProcess(int tenantId, int userId) { @@ -587,15 +607,16 @@ public class ProcessService { /** * check command parameters is valid - * @param command command + * + * @param command command * @param cmdParam cmdParam map * @return whether command param is valid */ private Boolean checkCmdParam(Command command, Map cmdParam) { if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) { if (cmdParam == null - || !cmdParam.containsKey(Constants.CMDPARAM_START_NODE_NAMES) - || cmdParam.get(Constants.CMDPARAM_START_NODE_NAMES).isEmpty()) { + || !cmdParam.containsKey(Constants.CMDPARAM_START_NODE_NAMES) + || cmdParam.get(Constants.CMDPARAM_START_NODE_NAMES).isEmpty()) { logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType()); return false; } @@ -605,8 +626,9 @@ public class ProcessService { /** * construct process instance according to one command. + * * @param command command - * @param host host + * @param host host * @return process instance */ private ProcessInstance constructProcessInstance(Command command, String host) { @@ -654,7 +676,7 @@ public class ProcessService { //reset command parameter if (processInstance.getCommandParam() != null) { Map processCmdParam = JSONUtils.toMap(processInstance.getCommandParam()); - for (Map.Entry entry: processCmdParam.entrySet()) { + for (Map.Entry entry : processCmdParam.entrySet()) { if (!cmdParam.containsKey(entry.getKey())) { cmdParam.put(entry.getKey(), entry.getValue()); } @@ -696,7 +718,7 @@ public class ProcessService { initTaskInstance(this.findTaskInstanceById(taskId)); } cmdParam.put(Constants.CMDPARAM_RECOVERY_START_NODE_STRING, - String.join(Constants.COMMA, convertIntListToString(failedList))); + String.join(Constants.COMMA, convertIntListToString(failedList))); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); processInstance.setRunTimes(runTime + 1); break; @@ -709,7 +731,7 @@ public class ProcessService { cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING); List suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); List stopNodeList = findTaskIdByInstanceState(processInstance.getId(), - ExecutionStatus.KILL); + ExecutionStatus.KILL); suspendedNodeList.addAll(stopNodeList); for (Integer taskId : suspendedNodeList) { // initialize the pause state @@ -761,8 +783,9 @@ public class ProcessService { /** * return complement data if the process start with complement data + * * @param processInstance processInstance - * @param command command + * @param command command * @return command type */ private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) { @@ -775,9 +798,10 @@ public class ProcessService { /** * initialize complement data parameters + * * @param processDefinition processDefinition - * @param processInstance processInstance - * @param cmdParam cmdParam + * @param processInstance processInstance + * @param cmdParam cmdParam */ private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, @@ -787,14 +811,14 @@ public class ProcessService { } Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE), - YYYY_MM_DD_HH_MM_SS); + YYYY_MM_DD_HH_MM_SS); if (Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(startComplementTime); } processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); } @@ -802,6 +826,7 @@ public class ProcessService { * set sub work process parameters. * handle sub work process instance, update relation table and command parameters * set sub work process flag, extends parent work process command parameters + * * @param subProcessInstance subProcessInstance * @return process instance */ @@ -813,7 +838,7 @@ public class ProcessService { Map paramMap = JSONUtils.toMap(cmdParam); // write sub process id into cmd param. if (paramMap.containsKey(CMDPARAM_SUB_PROCESS) - && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))) { + && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))) { paramMap.remove(CMDPARAM_SUB_PROCESS); paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId())); subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap)); @@ -826,7 +851,7 @@ public class ProcessService { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if (parentInstance != null) { subProcessInstance.setGlobalParams( - joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); + joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); this.saveProcessInstance(subProcessInstance); } else { logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); @@ -846,8 +871,9 @@ public class ProcessService { /** * join parent global params into sub process. * only the keys doesn't in sub process global would be joined. + * * @param parentGlobalParams parentGlobalParams - * @param subGlobalParams subGlobalParams + * @param subGlobalParams subGlobalParams * @return global params join */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) { @@ -855,7 +881,7 @@ public class ProcessService { List parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class); List subPropertyList = JSONUtils.toList(subGlobalParams, Property.class); - Map subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); + Map subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); for (Property parent : parentPropertyList) { if (!subMap.containsKey(parent.getProp())) { @@ -867,6 +893,7 @@ public class ProcessService { /** * initialize task instance + * * @param taskInstance taskInstance */ private void initTaskInstance(TaskInstance taskInstance) { @@ -885,19 +912,20 @@ public class ProcessService { /** * submit task to db * submit sub process to command + * * @param taskInstance taskInstance * @return task instance */ @Transactional(rollbackFor = Exception.class) - public TaskInstance submitTask(TaskInstance taskInstance){ + public TaskInstance submitTask(TaskInstance taskInstance) { ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); logger.info("start submit task : {}, instance id:{}, state: {}", - taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); + taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); //submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); if (task == null) { logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", - taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); + taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); return task; } if (!task.getState().typeIsFinished()) { @@ -905,7 +933,7 @@ public class ProcessService { } logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", - taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); + taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task; } @@ -914,8 +942,9 @@ public class ProcessService { * consider o * repeat running does not generate new sub process instance * set map {parent instance id, task instance id, 0(child instance id)} + * * @param parentInstance parentInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { @@ -942,8 +971,9 @@ public class ProcessService { /** * find previous task work process map. + * * @param parentProcessInstance parentProcessInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance, @@ -961,14 +991,15 @@ public class ProcessService { } } logger.info("sub process instance is not found,parent task:{},parent instance:{}", - parentTask.getId(), parentProcessInstance.getId()); + parentTask.getId(), parentProcessInstance.getId()); return null; } /** * create sub work process command + * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { if (!task.isSubProcess()) { @@ -994,6 +1025,7 @@ public class ProcessService { /** * complement data needs transform parent parameter to child. + * * @param instanceMap * @param parentProcessInstance * @return @@ -1015,6 +1047,7 @@ public class ProcessService { /** * create sub work process command + * * @param parentProcessInstance * @param childInstance * @param instanceMap @@ -1031,23 +1064,24 @@ public class ProcessService { String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance); return new Command( - commandType, - TaskDependType.TASK_POST, - parentProcessInstance.getFailureStrategy(), - parentProcessInstance.getExecutorId(), - childDefineId, - processParam, - parentProcessInstance.getWarningType(), - parentProcessInstance.getWarningGroupId(), - parentProcessInstance.getScheduleTime(), - task.getWorkerGroup(), - parentProcessInstance.getProcessInstancePriority() + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + childDefineId, + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + task.getWorkerGroup(), + parentProcessInstance.getProcessInstancePriority() ); } /** * initialize sub work flow state * child instance state would be initialized when 'recovery from pause/stop/failure' + * * @param childInstance */ private void initSubInstanceState(ProcessInstance childInstance) { @@ -1076,8 +1110,9 @@ public class ProcessService { /** * update sub process definition + * * @param parentProcessInstance parentProcessInstance - * @param childDefinitionId childDefinitionId + * @param childDefinitionId childDefinitionId */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) { ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId()); @@ -1091,7 +1126,8 @@ public class ProcessService { /** * submit task to mysql - * @param taskInstance taskInstance + * + * @param taskInstance taskInstance * @param processInstance processInstance * @return task instance */ @@ -1104,7 +1140,7 @@ public class ProcessService { } else { if (processInstanceState != ExecutionStatus.READY_STOP - && processInstanceState != ExecutionStatus.READY_PAUSE) { + && processInstanceState != ExecutionStatus.READY_PAUSE) { // failure task set invalid taskInstance.setFlag(Flag.NO); updateTaskInstance(taskInstance); @@ -1140,6 +1176,7 @@ public class ProcessService { /** * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task executed by ip1},${ip2}... * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low. + * * @param taskInstance taskInstance * @return task zk queue str */ @@ -1155,12 +1192,12 @@ public class ProcessService { StringBuilder sb = new StringBuilder(100); sb.append(processInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE) - .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE) - .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE) - .append(taskInstance.getId()).append(Constants.UNDERLINE) - .append(taskInstance.getWorkerGroup()); + .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE) + .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getId()).append(Constants.UNDERLINE) + .append(taskInstance.getWorkerGroup()); - return sb.toString(); + return sb.toString(); } /** @@ -1171,20 +1208,20 @@ public class ProcessService { * return stop if work process state is ready stop * if all of above are not satisfied, return submit success * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstanceState processInstanceState * @return process instance state */ public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) { ExecutionStatus state = taskInstance.getState(); if ( - // running, delayed or killed - // the task already exists in task queue - // return state - state == ExecutionStatus.RUNNING_EXECUTION - || state == ExecutionStatus.DELAY_EXECUTION - || state == ExecutionStatus.KILL - || checkTaskExistsInTaskQueue(taskInstance) + // running, delayed or killed + // the task already exists in task queue + // return state + state == ExecutionStatus.RUNNING_EXECUTION + || state == ExecutionStatus.DELAY_EXECUTION + || state == ExecutionStatus.KILL + || checkTaskExistsInTaskQueue(taskInstance) ) { return state; } @@ -1193,7 +1230,7 @@ public class ProcessService { if (processInstanceState == ExecutionStatus.READY_PAUSE) { state = ExecutionStatus.PAUSE; } else if (processInstanceState == ExecutionStatus.READY_STOP - || !checkProcessStrategy(taskInstance)) { + || !checkProcessStrategy(taskInstance)) { state = ExecutionStatus.KILL; } else { state = ExecutionStatus.SUBMITTED_SUCCESS; @@ -1202,7 +1239,8 @@ public class ProcessService { } /** - * check process instance strategy + * check process instance strategy + * * @param taskInstance taskInstance * @return check strategy result */ @@ -1224,6 +1262,7 @@ public class ProcessService { /** * check the task instance existing in queue + * * @param taskInstance taskInstance * @return whether taskinstance exists queue */ @@ -1239,6 +1278,7 @@ public class ProcessService { /** * create a new process instance + * * @param processInstance processInstance */ public void createProcessInstance(ProcessInstance processInstance) { @@ -1250,6 +1290,7 @@ public class ProcessService { /** * insert or update work process instance to data base + * * @param processInstance processInstance */ public void saveProcessInstance(ProcessInstance processInstance) { @@ -1267,6 +1308,7 @@ public class ProcessService { /** * insert or update command + * * @param command command * @return save command result */ @@ -1279,7 +1321,8 @@ public class ProcessService { } /** - * insert or update task instance + * insert or update task instance + * * @param taskInstance taskInstance * @return save task instance result */ @@ -1293,6 +1336,7 @@ public class ProcessService { /** * insert task instance + * * @param taskInstance taskInstance * @return create task instance result */ @@ -1303,6 +1347,7 @@ public class ProcessService { /** * update task instance + * * @param taskInstance taskInstance * @return update task instance result */ @@ -1313,7 +1358,8 @@ public class ProcessService { /** * delete a command by id - * @param id id + * + * @param id id */ public void delCommandById(int id) { commandMapper.deleteById(id); @@ -1321,6 +1367,7 @@ public class ProcessService { /** * find task instance by id + * * @param taskId task id * @return task intance */ @@ -1330,6 +1377,7 @@ public class ProcessService { /** * package task instance,associate processInstance and processDefine + * * @param taskInstId taskInstId * @return task instance */ @@ -1351,8 +1399,9 @@ public class ProcessService { /** * get id list by task state + * * @param instanceId instanceId - * @param state state + * @param state state * @return task instance states */ public List findTaskIdByInstanceState(int instanceId, ExecutionStatus state) { @@ -1361,6 +1410,7 @@ public class ProcessService { /** * find valid task list by process definition id + * * @param processInstanceId processInstanceId * @return task instance list */ @@ -1370,6 +1420,7 @@ public class ProcessService { /** * find previous task list by work process id + * * @param processInstanceId processInstanceId * @return task instance list */ @@ -1379,6 +1430,7 @@ public class ProcessService { /** * update work process instance map + * * @param processInstanceMap processInstanceMap * @return update process instance result */ @@ -1388,21 +1440,23 @@ public class ProcessService { /** * create work process instance map + * * @param processInstanceMap processInstanceMap * @return create process instance result */ public int createWorkProcessInstanceMap(ProcessInstanceMap processInstanceMap) { Integer count = 0; if (processInstanceMap != null) { - return processInstanceMapMapper.insert(processInstanceMap); + return processInstanceMapMapper.insert(processInstanceMap); } return count; } /** * find work process map by parent process id and parent task id. + * * @param parentWorkProcessId parentWorkProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance map */ public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { @@ -1411,6 +1465,7 @@ public class ProcessService { /** * delete work process map by parent process id + * * @param parentWorkProcessId parentWorkProcessId * @return delete process map result */ @@ -1421,8 +1476,9 @@ public class ProcessService { /** * find sub process instance + * * @param parentProcessId parentProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance */ public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) { @@ -1437,6 +1493,7 @@ public class ProcessService { /** * find parent process instance + * * @param subProcessId subProcessId * @return process instance */ @@ -1452,18 +1509,18 @@ public class ProcessService { /** * change task state - * @param state state - * @param startTime startTime - * @param host host + * + * @param state state + * @param startTime startTime + * @param host host * @param executePath executePath - * @param logPath logPath - * @param taskInstId taskInstId + * @param logPath logPath + * @param taskInstId taskInstId */ - public void changeTaskState(ExecutionStatus state, Date startTime, String host, + public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host, String executePath, String logPath, int taskInstId) { - TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); taskInstance.setState(state); taskInstance.setStartTime(startTime); taskInstance.setHost(host); @@ -1474,6 +1531,7 @@ public class ProcessService { /** * update process instance + * * @param processInstance processInstance * @return update process instance result */ @@ -1483,13 +1541,14 @@ public class ProcessService { /** * update the process instance + * * @param processInstanceId processInstanceId - * @param processJson processJson - * @param globalParams globalParams - * @param scheduleTime scheduleTime - * @param flag flag - * @param locations locations - * @param connects connects + * @param processJson processJson + * @param globalParams globalParams + * @param scheduleTime scheduleTime + * @param flag flag + * @param locations locations + * @param connects connects * @return update process instance result */ public int updateProcessInstance(Integer processInstanceId, String processJson, @@ -1509,18 +1568,18 @@ public class ProcessService { /** * change task state - * @param state state - * @param endTime endTime + * + * @param state state + * @param endTime endTime * @param taskInstId taskInstId - * @param varPool varPool + * @param varPool varPool */ - public void changeTaskState(ExecutionStatus state, + public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstId, String varPool) { - TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); taskInstance.setPid(processId); taskInstance.setAppLink(appIds); taskInstance.setState(state); @@ -1531,6 +1590,7 @@ public class ProcessService { /** * convert integer list to string list + * * @param intList intList * @return string list */ @@ -1547,6 +1607,7 @@ public class ProcessService { /** * query schedule by id + * * @param id id * @return schedule */ @@ -1556,6 +1617,7 @@ public class ProcessService { /** * query Schedule by processDefinitionId + * * @param processDefinitionId processDefinitionId * @see Schedule */ @@ -1565,6 +1627,7 @@ public class ProcessService { /** * query need failover process instance + * * @param host host * @return process instance list */ @@ -1574,6 +1637,7 @@ public class ProcessService { /** * process need failover process instance + * * @param processInstance processInstance */ @Transactional(rollbackFor = RuntimeException.class) @@ -1593,16 +1657,18 @@ public class ProcessService { /** * query all need failover task instances by host + * * @param host host * @return task instance list */ public List queryNeedFailoverTaskInstances(String host) { return taskInstanceMapper.queryByHostAndStatus(host, - stateArray); + stateArray); } /** * find data source by id + * * @param id id * @return datasource */ @@ -1612,8 +1678,9 @@ public class ProcessService { /** * update process instance state by id + * * @param processInstanceId processInstanceId - * @param executionStatus executionStatus + * @param executionStatus executionStatus * @return update process result */ public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { @@ -1625,6 +1692,7 @@ public class ProcessService { /** * find process instance by the task id + * * @param taskId taskId * @return process instance */ @@ -1638,6 +1706,7 @@ public class ProcessService { /** * find udf function list by id list string + * * @param ids ids * @return udf function list */ @@ -1647,51 +1716,55 @@ public class ProcessService { /** * find tenant code by resource name - * @param resName resource name + * + * @param resName resource name * @param resourceType resource type * @return tenant code */ - public String queryTenantCodeByResName(String resName,ResourceType resourceType){ + public String queryTenantCodeByResName(String resName, ResourceType resourceType) { // in order to query tenant code successful although the version is older - String fullName = resName.startsWith("/") ? resName : String.format("/%s",resName); + String fullName = resName.startsWith("/") ? resName : String.format("/%s", resName); return resourceMapper.queryTenantCodeByResourceName(fullName, resourceType.ordinal()); } /** * find schedule list by process define id. + * * @param ids ids * @return schedule list */ public List selectAllByProcessDefineId(int[] ids) { return scheduleMapper.selectAllByProcessDefineArray( - ids); + ids); } /** * get dependency cycle by work process define id and scheduler fire time - * @param masterId masterId + * + * @param masterId masterId * @param processDefinitionId processDefinitionId - * @param scheduledFireTime the time the task schedule is expected to trigger + * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency * @throws Exception if error throws Exception */ public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception { - List list = getCycleDependencies(masterId,new int[]{processDefinitionId},scheduledFireTime); + List list = getCycleDependencies(masterId, new int[] {processDefinitionId}, scheduledFireTime); return list.size() > 0 ? list.get(0) : null; } /** * get dependency cycle list by work process define id list and scheduler fire time - * @param masterId masterId - * @param ids ids + * + * @param masterId masterId + * @param ids ids * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency list * @throws Exception if error throws Exception */ - public List getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception { - List cycleDependencyList = new ArrayList(); - if(ArrayUtils.isEmpty(ids)){ + public List getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception { + List cycleDependencyList = new ArrayList(); + if (ArrayUtils.isEmpty(ids)) { logger.warn("ids[] is empty!is invalid!"); return cycleDependencyList; } @@ -1706,13 +1779,13 @@ public class ProcessService { List list; List schedules = this.selectAllByProcessDefineId(ids); // for all scheduling information - for (Schedule depSchedule:schedules) { + for (Schedule depSchedule : schedules) { strCrontab = depSchedule.getCrontab(); depCronExpression = CronUtils.parse2CronExpression(strCrontab); depCron = CronUtils.parse2Cron(strCrontab); CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron); if (cycleEnum == null) { - logger.error("{} is not valid",strCrontab); + logger.error("{} is not valid", strCrontab); continue; } Calendar calendar = Calendar.getInstance(); @@ -1720,16 +1793,16 @@ public class ProcessService { /*case MINUTE: calendar.add(Calendar.MINUTE,-61);*/ case HOUR: - calendar.add(Calendar.HOUR,-25); + calendar.add(Calendar.HOUR, -25); break; case DAY: - calendar.add(Calendar.DATE,-32); + calendar.add(Calendar.DATE, -32); break; case WEEK: - calendar.add(Calendar.DATE,-32); + calendar.add(Calendar.DATE, -32); break; case MONTH: - calendar.add(Calendar.MONTH,-13); + calendar.add(Calendar.MONTH, -13); break; default: logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name()); @@ -1744,7 +1817,7 @@ public class ProcessService { } if (list.size() >= 1) { start = list.get(list.size() - 1); - CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(),start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum); + CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(), start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum); cycleDependencyList.add(dependency); } @@ -1754,44 +1827,48 @@ public class ProcessService { /** * find last scheduler process instance in the date interval + * * @param definitionId definitionId * @param dateInterval dateInterval * @return process instance */ public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) { return processInstanceMapper.queryLastSchedulerProcess(definitionId, - dateInterval.getStartTime(), - dateInterval.getEndTime()); + dateInterval.getStartTime(), + dateInterval.getEndTime()); } /** * find last manual process instance interval + * * @param definitionId process definition id * @param dateInterval dateInterval * @return process instance */ public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) { return processInstanceMapper.queryLastManualProcess(definitionId, - dateInterval.getStartTime(), - dateInterval.getEndTime()); + dateInterval.getStartTime(), + dateInterval.getEndTime()); } /** * find last running process instance - * @param definitionId process definition id - * @param startTime start time - * @param endTime end time + * + * @param definitionId process definition id + * @param startTime start time + * @param endTime end time * @return process instance */ public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) { return processInstanceMapper.queryLastRunningProcess(definitionId, - startTime, - endTime, - stateArray); + startTime, + endTime, + stateArray); } /** * query user queue by process instance id + * * @param processInstanceId processInstanceId * @return queue */ @@ -1811,6 +1888,7 @@ public class ProcessService { /** * get task worker group + * * @param taskInstance taskInstance * @return workerGroupId */ @@ -1832,6 +1910,7 @@ public class ProcessService { /** * get have perm project list + * * @param userId userId * @return project list */ @@ -1851,6 +1930,7 @@ public class ProcessService { /** * get have perm project ids + * * @param userId userId * @return project ids */ @@ -1865,11 +1945,12 @@ public class ProcessService { /** * list unauthorized udf function - * @param userId user id - * @param needChecks data source id array + * + * @param userId user id + * @param needChecks data source id array * @return unauthorized udf function list */ - public List listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType) { + public List listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType) { List resultList = new ArrayList(); if (Objects.nonNull(needChecks) && needChecks.length > 0) { @@ -1889,7 +1970,7 @@ public class ProcessService { originResSet.removeAll(authorizedUdfFiles); break; case DATASOURCE: - Set authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet()); + Set authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); originResSet.removeAll(authorizedDatasources); break; case UDF: @@ -1908,6 +1989,7 @@ public class ProcessService { /** * get user by user id + * * @param userId user id * @return User */ @@ -1917,6 +1999,7 @@ public class ProcessService { /** * get resource by resoruce id + * * @param resoruceId resource id * @return Resource */ @@ -1926,6 +2009,7 @@ public class ProcessService { /** * list resources by ids + * * @param resIds resIds * @return resource list */ @@ -1935,6 +2019,7 @@ public class ProcessService { /** * format task app id in task instance + * * @param taskInstance * @return */ @@ -1946,9 +2031,9 @@ public class ProcessService { return ""; } return String.format("%s_%s_%s", - definition.getId(), - processInstanceById.getId(), - taskInstance.getId()); + definition.getId(), + processInstanceById.getId(), + taskInstance.getId()); } } From b20a3d5474d5f2c66cf6a9d7eac44255b772c8e5 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 24 Nov 2020 17:15:48 +0800 Subject: [PATCH 02/14] code style --- .../dolphinscheduler/service/process/ProcessService.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 9aa9a2ff9a..38f9573c99 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -91,8 +91,6 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang.ArrayUtils; - import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1764,7 +1762,7 @@ public class ProcessService { */ public List getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception { List cycleDependencyList = new ArrayList(); - if (ArrayUtils.isEmpty(ids)) { + if (null == ids || ids.length == 0) { logger.warn("ids[] is empty!is invalid!"); return cycleDependencyList; } From 6c1a2ebeab13c425bd3667162e8d3ab857326e3d Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 24 Nov 2020 22:49:39 +0800 Subject: [PATCH 03/14] add taskResponseTest --- .../processor/queue/TaskResponseService.java | 2 +- .../queue/TaskResponseServiceTest.java | 97 ++++++++++++------- 2 files changed, 62 insertions(+), 37 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index de746b05aa..2678532b99 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -138,7 +138,7 @@ public class TaskResponseService { case ACK: try { TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - if (taskInstance != null && ExecutionStatus.RUNNING_EXECUTION.getCode() >= taskInstance.getState().getCode()) { + if (taskInstance != null && ExecutionStatus.SUCCESS.getCode() != taskInstance.getState().getCode()) { processService.changeTaskState(taskInstance, taskResponseEvent.getState(), taskResponseEvent.getStartTime(), taskResponseEvent.getWorkerAddress(), diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index a9fdb58520..06bac173c7 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -14,55 +14,80 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.processor.queue; +package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; -import org.apache.dolphinscheduler.server.zk.SpringZKServer; -import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; -import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; -import org.junit.Assert; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Date; + +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; -import java.util.Date; +import io.netty.channel.Channel; -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class, - CuratorZookeeperClient.class}) +@RunWith(MockitoJUnitRunner.class) public class TaskResponseServiceTest { - @Autowired - private TaskResponseService taskResponseService; + @Mock(name = "processService") + private ProcessService processService; - @Test - public void testAdd(){ - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(), - "", "", "", 1,null); - taskResponseService.addResponse(taskResponseEvent); - Assert.assertTrue(taskResponseService.getEventQueue().size() == 1); - try { - Thread.sleep(10); - } catch (InterruptedException ignore) { - } - //after sleep, inner worker will take the event - Assert.assertTrue(taskResponseService.getEventQueue().size() == 0); + @InjectMocks + TaskResponseService taskResponseService; + + @Mock + private Channel channel; + + private TaskResponseEvent ackEvent; + + private TaskResponseEvent resultEvent; + + private TaskInstance taskInstance; + + @Before + public void before() { + taskResponseService.start(); + + ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, + new Date(), + "127.*.*.*", + "path", + "logPath", + 22, + channel); + + resultEvent = TaskResponseEvent.newResult(ExecutionStatus.SUCCESS, + new Date(), + 1, + "ids", + 22, + "varPol", + channel); + + taskInstance = new TaskInstance(); + taskInstance.setId(22); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); } @Test - public void testStop(){ - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(), - "", "", "", 1,null); - taskResponseService.addResponse(taskResponseEvent); + public void testAddResponse() { + Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance); + Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null); + taskResponseService.addResponse(ackEvent); + taskResponseService.addResponse(resultEvent); + } + + @After + public void after() { taskResponseService.stop(); - Assert.assertTrue(taskResponseService.getEventQueue().size() == 0); } + } From 9ef6b70696b7e4ede8c4026fbfdf72f4f78139fb Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 24 Nov 2020 23:13:01 +0800 Subject: [PATCH 04/14] add taskResponseTest --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index f58a47e897..8621bedb1d 100644 --- a/pom.xml +++ b/pom.xml @@ -835,6 +835,7 @@ **/server/master/SubProcessTaskTest.java **/server/master/processor/TaskAckProcessorTest.java **/server/master/processor/TaskKillResponseProcessorTest.java + **/server/master/processor/queue/TaskResponseServiceTest.java **/server/register/ZookeeperNodeManagerTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java From 858c4f32875f2294bac5f1eae11142568bb0f367 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 09:20:58 +0800 Subject: [PATCH 05/14] code smell --- .../master/processor/queue/TaskResponseServiceTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 06bac173c7..3d5dc0c87b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Date; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -83,10 +84,16 @@ public class TaskResponseServiceTest { Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null); taskResponseService.addResponse(ackEvent); taskResponseService.addResponse(resultEvent); + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } } @After public void after() { + Assert.assertEquals(0, taskResponseService.getEventQueue().size()); taskResponseService.stop(); } From 8286af79d3a77a5aa7b03e1241f56d38d7712f35 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 09:38:58 +0800 Subject: [PATCH 06/14] Time is too small and the task is not finished --- .../server/master/processor/queue/TaskResponseServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 3d5dc0c87b..969618a9b2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -85,7 +85,7 @@ public class TaskResponseServiceTest { taskResponseService.addResponse(ackEvent); taskResponseService.addResponse(resultEvent); try { - Thread.sleep(1000); + Thread.sleep(10000); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } From a164d5f990c988cac7b45a2952f4d6eee5d17e44 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 10:23:54 +0800 Subject: [PATCH 07/14] Time is too small and the task is not finished --- .../processor/queue/TaskResponseServiceTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 969618a9b2..d60c9c192c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -84,15 +84,15 @@ public class TaskResponseServiceTest { Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null); taskResponseService.addResponse(ackEvent); taskResponseService.addResponse(resultEvent); - try { - Thread.sleep(10000); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } } @After public void after() { + try { + Thread.sleep(20000); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } Assert.assertEquals(0, taskResponseService.getEventQueue().size()); taskResponseService.stop(); } From c6a686a71078638a5ebcdecb30fe5e173deac96d Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 13:27:04 +0800 Subject: [PATCH 08/14] Time is too small and the task is not finished --- .../server/master/processor/queue/TaskResponseServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index d60c9c192c..0e8156a44a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -89,7 +89,7 @@ public class TaskResponseServiceTest { @After public void after() { try { - Thread.sleep(20000); + Thread.sleep(60000); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } From b3991d9dee307191f200a6b660d7264d874d3cb9 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 13:43:14 +0800 Subject: [PATCH 09/14] Time is too small and the task is not finished --- .../processor/queue/TaskResponseServiceTest.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 0e8156a44a..c67f477b6d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -88,11 +88,17 @@ public class TaskResponseServiceTest { @After public void after() { - try { - Thread.sleep(60000); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); + + long startTime = System.currentTimeMillis(); + + long maxWaitTime = 3 * 60 * 1000; + + while (System.currentTimeMillis() - maxWaitTime - startTime < 0) { + if (taskResponseService.getEventQueue().size() == 0) { + return; + } } + Assert.assertEquals(0, taskResponseService.getEventQueue().size()); taskResponseService.stop(); } From 2a9e789f68d6aba2bdd8a8073ac0abb4b36cd51e Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 14:47:00 +0800 Subject: [PATCH 10/14] test --- .../master/processor/queue/TaskResponseServiceTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index c67f477b6d..584371d379 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -93,10 +93,8 @@ public class TaskResponseServiceTest { long maxWaitTime = 3 * 60 * 1000; - while (System.currentTimeMillis() - maxWaitTime - startTime < 0) { - if (taskResponseService.getEventQueue().size() == 0) { - return; - } + while (taskResponseService.getEventQueue().size() != 0) { + } Assert.assertEquals(0, taskResponseService.getEventQueue().size()); From d5769999d557b834621604930654a3baabdf5372 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 15:16:19 +0800 Subject: [PATCH 11/14] remove assert --- .../processor/queue/TaskResponseServiceTest.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 584371d379..c8c5614bff 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -88,16 +88,6 @@ public class TaskResponseServiceTest { @After public void after() { - - long startTime = System.currentTimeMillis(); - - long maxWaitTime = 3 * 60 * 1000; - - while (taskResponseService.getEventQueue().size() != 0) { - - } - - Assert.assertEquals(0, taskResponseService.getEventQueue().size()); taskResponseService.stop(); } From cc14b24326acbbbecc1a93188a31e817667d9ea6 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 18:11:55 +0800 Subject: [PATCH 12/14] test --- .../queue/TaskResponseServiceTest.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index c8c5614bff..52df493264 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -42,7 +42,7 @@ public class TaskResponseServiceTest { private ProcessService processService; @InjectMocks - TaskResponseService taskResponseService; + TaskResponseService taskRspService; @Mock private Channel channel; @@ -55,7 +55,7 @@ public class TaskResponseServiceTest { @Before public void before() { - taskResponseService.start(); + taskRspService.start(); ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(), @@ -82,13 +82,21 @@ public class TaskResponseServiceTest { public void testAddResponse() { Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance); Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null); - taskResponseService.addResponse(ackEvent); - taskResponseService.addResponse(resultEvent); + taskRspService.addResponse(ackEvent); + taskRspService.addResponse(resultEvent); } @After public void after() { - taskResponseService.stop(); + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + Assert.assertEquals(0, taskRspService.getEventQueue().size()); + taskRspService.stop(); + } } From 0586e7b16004364e69e634e99dab8454ed187bed Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Wed, 25 Nov 2020 22:03:26 +0800 Subject: [PATCH 13/14] test --- .../master/processor/queue/TaskResponseServiceTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 52df493264..5d10f849c5 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Date; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -88,15 +87,7 @@ public class TaskResponseServiceTest { @After public void after() { - - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - Assert.assertEquals(0, taskRspService.getEventQueue().size()); taskRspService.stop(); - } } From c9022680214ab80548102f4d1cf12a041635ecb5 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Thu, 26 Nov 2020 11:01:34 +0800 Subject: [PATCH 14/14] fix task instance status judgment error --- .../server/master/processor/queue/TaskResponseService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 2678532b99..51ecf454ea 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -138,7 +138,7 @@ public class TaskResponseService { case ACK: try { TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - if (taskInstance != null && ExecutionStatus.SUCCESS.getCode() != taskInstance.getState().getCode()) { + if (taskInstance != null && !taskInstance.getState().typeIsFinished()) { processService.changeTaskState(taskInstance, taskResponseEvent.getState(), taskResponseEvent.getStartTime(), taskResponseEvent.getWorkerAddress(),