Browse Source

adapt to code style

pull/3/MERGE
Zhou Zheng 4 years ago
parent
commit
31cd1f6cb2
  1. 27
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
  2. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
  3. 41
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  4. 26
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
  5. 7
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  6. 53
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  7. 90
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
  8. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  9. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  10. 14
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

27
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java

@ -14,8 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.controller; package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.TaskInstanceService;
@ -24,18 +27,28 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.GetMapping;
import springfox.documentation.annotations.ApiIgnore; import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import java.util.Map; import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR; import io.swagger.annotations.Api;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
/** /**
* task instance controller * task instance controller

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java

@ -123,7 +123,7 @@ public class TaskCountDto {
// remove the specified state // remove the specified state
public void removeStateFromCountList(ExecutionStatus status) { public void removeStateFromCountList(ExecutionStatus status) {
for(TaskStateCount count : this.taskCountDtos) { for (TaskStateCount count : this.taskCountDtos) {
if (count.getTaskStateType().equals(status)) { if (count.getTaskStateType().equals(status)) {
this.taskCountDtos.remove(count); this.taskCountDtos.remove(count);
break; break;
@ -131,7 +131,7 @@ public class TaskCountDto {
} }
} }
public List<TaskStateCount> getTaskCountDtos(){ public List<TaskStateCount> getTaskCountDtos() {
return taskCountDtos; return taskCountDtos;
} }

41
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -14,35 +14,58 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import com.sun.org.apache.bcel.internal.generic.BREAKPOINT;
import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.text.ParseException;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* executor service * executor service
*/ */

26
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java

@ -14,14 +14,28 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.controller; package org.apache.dolphinscheduler.api.controller;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -32,16 +46,6 @@ import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
/** /**
* task instance controller test * task instance controller test
*/ */

7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
@ -52,12 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import java.text.MessageFormat;
import java.util.*;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.when;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@RunWith(MockitoJUnitRunner.Silent.class) @RunWith(MockitoJUnitRunner.Silent.class)

53
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -75,31 +75,34 @@ public enum ExecutionStatus {
} }
} }
/** /**
* status is success * status is success
* @return status *
*/ * @return status
public boolean typeIsSuccess() { */
return this == SUCCESS || this == FORCED_SUCCESS; public boolean typeIsSuccess() {
} return this == SUCCESS || this == FORCED_SUCCESS;
}
/**
* status is failure /**
* @return status * status is failure
*/ *
public boolean typeIsFailure() { * @return status
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL; */
} public boolean typeIsFailure() {
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
/** }
* status is finished
* @return status /**
*/ * status is finished
public boolean typeIsFinished() { *
* @return status
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() */
|| typeIsStop(); public boolean typeIsFinished() {
}
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
|| typeIsStop();
}
/** /**
* status is waiting thread * status is waiting thread

90
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java

@ -14,8 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.utils.placeholder; package org.apache.dolphinscheduler.common.utils.placeholder;
import static org.apache.commons.lang.time.DateUtils.addDays;
import static org.apache.dolphinscheduler.common.Constants.PARAMETER_FORMAT_DATE;
import static org.apache.dolphinscheduler.common.Constants.PARAMETER_FORMAT_TIME;
import static org.apache.dolphinscheduler.common.utils.DateUtils.format;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -23,54 +29,50 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.apache.dolphinscheduler.common.Constants.PARAMETER_FORMAT_DATE;
import static org.apache.dolphinscheduler.common.Constants.PARAMETER_FORMAT_TIME;
import static org.apache.dolphinscheduler.common.utils.DateUtils.format;
import static org.apache.commons.lang.time.DateUtils.addDays;
/** /**
* business time utils * business time utils
*/ */
public class BusinessTimeUtils { public class BusinessTimeUtils {
private BusinessTimeUtils() { private BusinessTimeUtils() {
throw new IllegalStateException("BusinessTimeUtils class"); throw new IllegalStateException("BusinessTimeUtils class");
} }
/**
* get business time in parameters by different command types /**
* * get business time in parameters by different command types
* @param commandType command type *
* @param runTime run time or schedule time * @param commandType command type
* @return business time * @param runTime run time or schedule time
*/ * @return business time
public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime) { */
Date businessDate = runTime; public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime) {
switch (commandType) { Date businessDate = runTime;
case COMPLEMENT_DATA: switch (commandType) {
break; case COMPLEMENT_DATA:
case START_PROCESS: break;
case START_CURRENT_TASK_PROCESS: case START_PROCESS:
case RECOVER_TOLERANCE_FAULT_PROCESS: case START_CURRENT_TASK_PROCESS:
case RECOVER_SUSPENDED_PROCESS: case RECOVER_TOLERANCE_FAULT_PROCESS:
case START_FAILURE_TASK_PROCESS: case RECOVER_SUSPENDED_PROCESS:
case REPEAT_RUNNING: case START_FAILURE_TASK_PROCESS:
case RESUME_FROM_FORCED_SUCCESS: case REPEAT_RUNNING:
case SCHEDULER: case RESUME_FROM_FORCED_SUCCESS:
default: case SCHEDULER:
businessDate = addDays(new Date(), -1); default:
if (runTime != null){ businessDate = addDays(new Date(), -1);
/** if (runTime != null) {
* If there is a scheduled time, take the scheduling time. Recovery from failed nodes, suspension of recovery, re-run for scheduling /**
*/ * If there is a scheduled time, take the scheduling time. Recovery from failed nodes, suspension of recovery, re-run for scheduling
businessDate = addDays(runTime, -1); */
} businessDate = addDays(runTime, -1);
break; }
break;
}
Date businessCurrentDate = addDays(businessDate, 1);
Map<String, String> result = new HashMap<>();
result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE));
result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE));
result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME));
return result;
} }
Date businessCurrentDate = addDays(businessDate, 1);
Map<String, String> result = new HashMap<>();
result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE));
result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE));
result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME));
return result;
}
} }

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -14,26 +14,31 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.utils;
package org.apache.dolphinscheduler.dao.utils;
import com.amazonaws.services.simpleworkflow.model.TaskList;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* dag tools * dag tools
*/ */

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -684,37 +684,38 @@ public class MasterExecThread implements Runnable {
* determine whether the dependencies of the task node are complete * determine whether the dependencies of the task node are complete
* @return DependResult * @return DependResult
*/ */
@SuppressWarnings("checkstyle:WhitespaceAround")
private DependResult isTaskDepsComplete(String taskName) { private DependResult isTaskDepsComplete(String taskName) {
Collection<String> startNodes = dag.getBeginNode(); Collection<String> startNodes = dag.getBeginNode();
// if vertex,returns true directly // if vertex,returns true directly
if(startNodes.contains(taskName)){ if (startNodes.contains(taskName)) {
return DependResult.SUCCESS; return DependResult.SUCCESS;
} }
TaskNode taskNode = dag.getNode(taskName); TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList(); List<String> depNameList = taskNode.getDepList();
for(String depsNode : depNameList ){ for (String depsNode : depNameList) {
if(!dag.containsNode(depsNode) if (!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode) || forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)){ || skipTaskNodeList.containsKey(depsNode)) {
continue; continue;
} }
// all the dependencies must be fully completed // all the dependencies must be fully completed
if(!completeTaskList.containsKey(depsNode)){ if (!completeTaskList.containsKey(depsNode)) {
return DependResult.WAITING; return DependResult.WAITING;
} }
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.WAITING; return DependResult.WAITING;
} }
// ignore task state if current task is condition // ignore task state if current task is condition
if(taskNode.isConditionsTask()){ if (taskNode.isConditionsTask()) {
continue; continue;
} }
if(!dependTaskSuccess(depsNode, taskName)){ if (!dependTaskSuccess(depsNode, taskName)) {
return DependResult.FAILED; return DependResult.FAILED;
} }
} }
@ -730,18 +731,18 @@ public class MasterExecThread implements Runnable {
* @param nextNodeName * @param nextNodeName
* @return * @return
*/ */
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){ private boolean dependTaskSuccess(String dependNodeName, String nextNodeName) {
TaskNode tmpNode = dag.getNode(dependNodeName); TaskNode tmpNode = dag.getNode(dependNodeName);
if(tmpNode.isConditionsTask()){ if (tmpNode.isConditionsTask()) {
//condition task need check the branch to run //condition task need check the branch to run
List<String> nextTaskList = parseConditionTask(dependNodeName); List<String> nextTaskList = parseConditionTask(dependNodeName);
if(!nextTaskList.contains(nextNodeName)){ if (!nextTaskList.contains(nextNodeName)) {
return false; return false;
} }
}else { } else {
ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState(); ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
if(depTaskState.typeIsFailure()){ if (depTaskState.typeIsFailure()) {
return false; return false;
} }
} }
@ -1062,17 +1063,17 @@ public class MasterExecThread implements Runnable {
// failure priority is higher than pause // failure priority is higher than pause
// if a task fails, other suspended tasks need to be reset kill // if a task fails, other suspended tasks need to be reset kill
// check if there exists forced success nodes in errorTaskList // check if there exists forced success nodes in errorTaskList
if(errorTaskList.size() > 0){ if (errorTaskList.size() > 0) {
for(Map.Entry<String, TaskInstance> entry: completeTaskList.entrySet()) { for (Map.Entry<String, TaskInstance> entry : completeTaskList.entrySet()) {
TaskInstance completeTask = entry.getValue(); TaskInstance completeTask = entry.getValue();
if(completeTask.getState()== ExecutionStatus.PAUSE){ if (completeTask.getState() == ExecutionStatus.PAUSE) {
completeTask.setState(ExecutionStatus.KILL); completeTask.setState(ExecutionStatus.KILL);
completeTaskList.put(entry.getKey(), completeTask); completeTaskList.put(entry.getKey(), completeTask);
processService.updateTaskInstance(completeTask); processService.updateTaskInstance(completeTask);
} }
} }
for(Map.Entry<String, TaskInstance> entry: errorTaskList.entrySet()) { for (Map.Entry<String, TaskInstance> entry : errorTaskList.entrySet()) {
TaskInstance errorTask = entry.getValue(); TaskInstance errorTask = entry.getValue();
TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId()); TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId());
if (currentTask == null) { if (currentTask == null) {
@ -1089,7 +1090,7 @@ public class MasterExecThread implements Runnable {
} }
} }
} }
if(canSubmitTaskToQueue()){ if (canSubmitTaskToQueue()) {
submitStandByTask(); submitStandByTask();
} }
try { try {

14
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -14,19 +14,20 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_EMPTY_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -98,11 +99,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.*;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
@ -1372,7 +1368,7 @@ public class ProcessService {
* @param taskType task type * @param taskType task type
* @return task instance id list * @return task instance id list
*/ */
public List<Integer> findTaskIdByInstanceStatusAndType(int processInstanceId, ExecutionStatus[] states, TaskType taskType){ public List<Integer> findTaskIdByInstanceStatusAndType(int processInstanceId, ExecutionStatus[] states, TaskType taskType) {
int[] stateArray = new int[states.length]; int[] stateArray = new int[states.length];
for (int i = 0; i < states.length; i++) { for (int i = 0; i < states.length; i++) {
stateArray[i] = states[i].ordinal(); stateArray[i] = states[i].ordinal();
@ -1388,7 +1384,7 @@ public class ProcessService {
* @param taskType task type * @param taskType task type
* @return task instance id list * @return task instance id list
*/ */
public List<Integer> findTaskIdInSubProcessByStatusAndType(int taskId, ExecutionStatus[] states, TaskType taskType){ public List<Integer> findTaskIdInSubProcessByStatusAndType(int taskId, ExecutionStatus[] states, TaskType taskType) {
int[] stateArray = new int[states.length]; int[] stateArray = new int[states.length];
for (int i = 0; i < states.length; i++) { for (int i = 0; i < states.length; i++) {
stateArray[i] = states[i].ordinal(); stateArray[i] = states[i].ordinal();

Loading…
Cancel
Save