Browse Source

move dependent and conditions to master server (#2423)

* move condition and dependent to master

* move conditions to master

* move conditions and dependent task to master

* move conditions to master

* update test

* add log

* add test for dependent task

* add test for dependent task

* update

* update

* refactor complexity code

* refactor complexity code

* add conditions task test

* add conditions task test

* update

* update host to host:port

* update logback.xml

Co-authored-by: baoliang <baoliang@analysys.com.cn>
pull/2/head
bao liang 5 years ago committed by GitHub
parent
commit
4946e88872
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  2. 1
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependentUtilsTest.java
  3. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  4. 44
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  5. 14
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
  6. 141
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  7. 217
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  8. 51
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  9. 35
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  10. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  11. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java
  12. 60
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
  13. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
  14. 198
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
  15. 26
      dolphinscheduler-server/src/main/resources/logback-master.xml
  16. 132
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  17. 68
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  18. 35
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  19. 2
      pom.xml

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -39,7 +39,7 @@ public enum TaskType {
*/
SHELL(0, "shell"),
SQL(1, "sql"),
SUB_PROCESS(2, "sub process"),
SUB_PROCESS(2, "sub_process"),
PROCEDURE(3, "procedure"),
MR(4, "mr"),
SPARK(5, "spark"),

1
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependentUtilsTest.java

@ -32,6 +32,7 @@ import java.util.List;
public class DependentUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ShellExecutorTest.class);
@Test
public void getDependResultForRelation() {
//failed

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -377,9 +377,6 @@ public class TaskInstance implements Serializable {
}
public boolean isSubProcess(){
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
}
public String getDependency(){
@ -458,6 +455,18 @@ public class TaskInstance implements Serializable {
return resources;
}
public boolean isSubProcess(){
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
}
public boolean isDependTask(){
return TaskType.DEPENDENT.equals(TaskType.valueOf(this.taskType));
}
public boolean isConditionsTask(){
return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType));
}
public void setResources(List<String> resources) {
this.resources = resources;
}

44
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -361,4 +361,48 @@ public class DagHelper {
processDag.setNodes(taskNodeList);
return processDag;
}
/**
* is there have conditions after the parent node
* @param parentNodeName
* @return
*/
public static boolean haveConditionsAfterNode(String parentNodeName,
DAG<String, TaskNode, TaskNodeRelation> dag
){
boolean result = false;
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeName);
if(CollectionUtils.isEmpty(subsequentNodes)){
return result;
}
for(String nodeName : subsequentNodes){
TaskNode taskNode = dag.getNode(nodeName);
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
if(preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()){
return true;
}
}
return result;
}
/**
* is there have conditions after the parent node
* @param parentNodeName
* @return
*/
public static boolean haveConditionsAfterNode(String parentNodeName,
List<TaskNode> taskNodes
){
boolean result = false;
if(CollectionUtils.isEmpty(taskNodes)){
return result;
}
for(TaskNode taskNode : taskNodes){
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
if(preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()){
return true;
}
}
return result;
}
}

14
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java

@ -29,11 +29,21 @@ public class TaskInstanceTest {
TaskInstance taskInstance = new TaskInstance();
//sub process
taskInstance.setTaskType("sub process");
taskInstance.setTaskType("SUB_PROCESS");
Assert.assertTrue(taskInstance.isSubProcess());
//not sub process
taskInstance.setTaskType("http");
taskInstance.setTaskType("HTTP");
Assert.assertFalse(taskInstance.isSubProcess());
//sub process
taskInstance.setTaskType("CONDITIONS");
Assert.assertTrue(taskInstance.isConditionsTask());
//sub process
taskInstance.setTaskType("DEPENDENT");
Assert.assertTrue(taskInstance.isDependTask());
}
}

141
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -14,32 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.conditions;
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ConditionsTask extends AbstractTask {
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
/**
@ -48,66 +44,51 @@ public class ConditionsTask extends AbstractTask {
private DependentParameters dependentParameters;
/**
* process dao
*/
private ProcessService processService;
/**
* taskInstance
*/
private TaskInstance taskInstance;
/**
*
* complete task map
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
/**
* taskExecutionContext
* condition result
*/
private TaskExecutionContext taskExecutionContext;
private DependResult conditionResult;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* constructor of MasterBaseTaskExecThread
*
* @param logger logger
* @param taskInstance task instance
*/
public ConditionsTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
public ConditionsTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
}
@Override
public void init() throws Exception {
logger.info("conditions task initialize");
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.dependentParameters = JSONUtils.parseObject(taskExecutionContext.
getDependenceTaskExecutionContext()
.getDependence(),
DependentParameters.class);
this.taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
if(taskInstance == null){
throw new Exception("cannot find the task instance!");
}
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for(TaskInstance task : taskInstanceList){
this.completeTaskList.putIfAbsent(task.getName(), task.getState());
public Boolean submitWaitComplete() {
try{
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
logger.info("dependent task start");
waitTaskQuit();
updateTaskState();
}catch (Exception e){
logger.error("conditions task run exception" , e);
}
return true;
}
@Override
public void handle() throws Exception {
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT,
taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
private void waitTaskQuit() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId()
);
for(TaskInstance task : taskInstances){
completeTaskList.putIfAbsent(task.getName(), task.getState());
}
List<DependResult> modelResultList = new ArrayList<>();
for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){
@ -119,14 +100,43 @@ public class ConditionsTask extends AbstractTask {
DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
modelResultList.add(modelResult);
}
DependResult result = DependentUtils.getDependResultForRelation(
conditionResult = DependentUtils.getDependResultForRelation(
dependentParameters.getRelation(), modelResultList
);
logger.info("the conditions task depend result : {}", result);
exitStatusCode = (result == DependResult.SUCCESS) ?
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
logger.info("the conditions task depend result : {}", conditionResult);
}
/**
*
*/
private void updateTaskState() {
ExecutionStatus status;
if(this.cancel){
status = ExecutionStatus.KILL;
}else{
status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}
private void initTaskParameters() {
this.taskInstance.setLogPath(getTaskLogPath(taskInstance));
this.taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class);
}
/**
* depend result for depend item
* @param item
* @return
*/
private DependResult getDependResultForItem(DependentItem item){
DependResult dependResult = DependResult.SUCCESS;
@ -137,16 +147,13 @@ public class ConditionsTask extends AbstractTask {
}
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks());
if(executionStatus != item.getStatus()){
logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus().toString(), executionStatus.toString());
logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus(), executionStatus);
dependResult = DependResult.FAILED;
}
logger.info("depend item: {}, depend result: {}",
item.getDepTasks(), dependResult);
logger.info("dependent item complete {} {},{}",
Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult);
return dependResult;
}
@Override
public AbstractParameters getParameters() {
return null;
}
}
}

217
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
public class DependentTaskExecThread extends MasterBaseTaskExecThread {
private DependentParameters dependentParameters;
/**
* dependent task list
*/
private List<DependentExecute> dependentTaskList = new ArrayList<>();
/**
* depend item result map
* save the result to log file
*/
private Map<String, DependResult> dependResultMap = new HashMap<>();
/**
* dependent date
*/
private Date dependentDate;
/**
* constructor of MasterBaseTaskExecThread
*
* @param taskInstance task instance
*/
public DependentTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
}
@Override
public Boolean submitWaitComplete() {
try{
logger.info("dependent task start");
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
initDependParameters();
waitTaskQuit();
updateTaskState();
}catch (Exception e){
logger.error("dependent task run exception" , e);
}
return true;
}
/**
* init dependent parameters
*/
private void initDependParameters() {
this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(),
DependentParameters.class);
for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){
this.dependentTaskList.add(new DependentExecute(
taskModel.getDependItemList(), taskModel.getRelation()));
}
if(this.processInstance.getScheduleTime() != null){
this.dependentDate = this.processInstance.getScheduleTime();
}else{
this.dependentDate = new Date();
}
}
/**
*
*/
private void updateTaskState() {
ExecutionStatus status;
if(this.cancel){
status = ExecutionStatus.KILL;
}else{
DependResult result = getTaskDependResult();
status = (result == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
}
/**
* wait dependent tasks quit
*/
private Boolean waitTaskQuit() {
logger.info("wait depend task : {} complete", this.taskInstance.getName());
if (taskInstance.getState().typeIsFinished()) {
logger.info("task {} already complete. task state:{}",
this.taskInstance.getName(),
this.taskInstance.getState());
return true;
}
while (Stopper.isRunning()) {
try{
if(this.processInstance == null){
logger.error("process instance not exists , master task exec thread exit");
return true;
}
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
cancelTaskInstance();
break;
}
if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){
break;
}
// updateProcessInstance task instance
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("exception",e);
if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId());
}
}
}
return true;
}
/**
* cancel dependent task
*/
private void cancelTaskInstance() {
this.cancel = true;
}
private void initTaskParameters() {
taskInstance.setLogPath(getTaskLogPath(taskInstance));
taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
}
/**
* judge all dependent tasks finish
* @return whether all dependent tasks finish
*/
private boolean allDependentTaskFinish(){
boolean finish = true;
for(DependentExecute dependentExecute : dependentTaskList){
for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) {
if(!dependResultMap.containsKey(entry.getKey())){
dependResultMap.put(entry.getKey(), entry.getValue());
//save depend result to log
logger.info("dependent item complete {} {},{}",
DEPENDENT_SPLIT, entry.getKey(), entry.getValue());
}
}
if(!dependentExecute.finish(dependentDate)){
finish = false;
}
}
return finish;
}
/**
* get dependent result
* @return DependResult
*/
private DependResult getTaskDependResult(){
List<DependResult> dependResultList = new ArrayList<>();
for(DependentExecute dependentExecute : dependentTaskList){
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate);
dependResultList.add(dependResult);
}
DependResult result = DependentUtils.getDependResultForRelation(
this.dependentParameters.getRelation(), dependResultList
);
logger.info("dependent task completed, dependent result:{}", result);
return result;
}
}

51
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -16,11 +16,15 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -41,7 +45,8 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* logger of MasterBaseTaskExecThread
*/
private static final Logger logger = LoggerFactory.getLogger(MasterBaseTaskExecThread.class);
protected Logger logger = LoggerFactory.getLogger(getClass());
/**
* process service
@ -71,7 +76,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* master config
*/
private MasterConfig masterConfig;
protected MasterConfig masterConfig;
/**
* taskUpdateQueue
@ -80,12 +85,10 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* constructor of MasterBaseTaskExecThread
* @param taskInstance task instance
* @param processInstance process instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
public MasterBaseTaskExecThread(TaskInstance taskInstance){
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.processInstance = processInstance;
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
@ -123,7 +126,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
try {
if(!submitDB){
// submit task to db
task = processService.submitTask(taskInstance, processInstance);
task = processService.submitTask(taskInstance);
if(task != null && task.getId() != 0){
submitDB = true;
}
@ -159,7 +162,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
public Boolean dispatchTask(TaskInstance taskInstance) {
try{
if(taskInstance.isSubProcess()){
if(taskInstance.isConditionsTask()
|| taskInstance.isDependTask()
|| taskInstance.isSubProcess()){
return true;
}
if(taskInstance.getState().typeIsFinished()){
@ -233,7 +238,39 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
*/
@Override
public Boolean call() throws Exception {
this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
return submitWaitComplete();
}
/**
* get task log path
* @return log path
*/
public String getTaskLogPath(TaskInstance task) {
String logPath;
try{
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
logPath = baseLog + Constants.SINGLE_SLASH +
task.getProcessDefinitionId() + Constants.SINGLE_SLASH +
task.getProcessInstanceId() + Constants.SINGLE_SLASH +
task.getId() + ".log";
}else{
logPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
task.getProcessDefinitionId() + Constants.SINGLE_SLASH +
task.getProcessInstanceId() + Constants.SINGLE_SLASH +
task.getId() + ".log";
}
}catch (Exception e){
logger.error("logger", e);
logPath = "";
}
return logPath;
}
}

35
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -365,7 +365,6 @@ public class MasterExecThread implements Runnable {
}
// generate process dag
dag = DagHelper.buildDagGraph(processDag);
}
/**
@ -418,9 +417,13 @@ public class MasterExecThread implements Runnable {
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
MasterBaseTaskExecThread abstractExecThread = null;
if(taskInstance.isSubProcess()){
abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
abstractExecThread = new SubProcessTaskExecThread(taskInstance);
}else if(taskInstance.isDependTask()){
abstractExecThread = new DependentTaskExecThread(taskInstance);
}else if(taskInstance.isConditionsTask()){
abstractExecThread = new ConditionsTaskExecThread(taskInstance);
}else {
abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
abstractExecThread = new MasterTaskExecThread(taskInstance);
}
Future<Boolean> future = taskExecService.submit(abstractExecThread);
activeTaskNode.putIfAbsent(abstractExecThread, future);
@ -504,27 +507,7 @@ public class MasterExecThread implements Runnable {
return taskInstance;
}
/**
* is there have conditions after the parent node
* @param parentNodeName
* @return
*/
private boolean haveConditionsAfterNode(String parentNodeName){
boolean result = false;
Collection<String> startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList);
if(startVertex == null){
return result;
}
for(String nodeName : startVertex){
TaskNode taskNode = dag.getNode(nodeName);
if(taskNode.getType().equals(TaskType.CONDITIONS.toString())){
result = true;
break;
}
}
return result;
}
/**
* if all of the task dependence are skip, skip it too.
@ -701,7 +684,7 @@ public class MasterExecThread implements Runnable {
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
// conditions task would not return failed.
if(depTaskState.typeIsFailure()
&& !haveConditionsAfterNode(depsNode)
&& !DagHelper.haveConditionsAfterNode(depsNode, dag )
&& !dag.getNode(depsNode).isConditionsTask()){
return DependResult.FAILED;
}
@ -1017,8 +1000,8 @@ public class MasterExecThread implements Runnable {
addTaskToStandByList(task);
}else{
completeTaskList.put(task.getName(), task);
if( task.getTaskType().equals(TaskType.CONDITIONS.toString()) ||
haveConditionsAfterNode(task.getName())) {
if( task.isConditionsTask()
|| DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
submitPostNode(task.getName());
}else{
errorTaskList.put(task.getName(), task);

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -16,7 +16,6 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.slf4j.Logger;
import com.alibaba.fastjson.JSON;
@ -28,7 +27,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
@ -38,7 +36,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.LoggerFactory;
import java.util.Date;
@ -48,12 +45,6 @@ import java.util.Date;
*/
public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
* logger of MasterTaskExecThread
*/
private static final Logger logger = LoggerFactory.getLogger(MasterTaskExecThread.class);
/**
* taskInstance state manager
*/
@ -65,10 +56,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
* constructor of MasterTaskExecThread
* @param taskInstance task instance
* @param processInstance process instance
*/
public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
super(taskInstance, processInstance);
public MasterTaskExecThread(TaskInstance taskInstance){
super(taskInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
}

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java

@ -21,8 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
@ -31,11 +29,6 @@ import java.util.Date;
*/
public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
/**
* logger of SubProcessTaskExecThread
*/
private static final Logger logger = LoggerFactory.getLogger(SubProcessTaskExecThread.class);
/**
* sub process instance
*/
@ -44,10 +37,9 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
/**
* sub process task exec thread
* @param taskInstance task instance
* @param processInstance process instance
*/
public SubProcessTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
super(taskInstance, processInstance);
public SubProcessTaskExecThread(TaskInstance taskInstance){
super(taskInstance);
}
@Override

60
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.dependent;
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
@ -23,9 +23,11 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@ -108,31 +110,7 @@ public class DependentExecute {
}
// need to check workflow for updates, so get all task and check the task state
if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
List<TaskNode> taskNodes =
processService.getTaskNodeListByDefinitionId(dependentItem.getDefinitionId());
if(taskNodes != null && taskNodes.size() > 0){
List<DependResult> results = new ArrayList<>();
DependResult tmpResult = DependResult.FAILED;
for(TaskNode taskNode:taskNodes){
tmpResult = getDependTaskResult(taskNode.getName(),processInstance);
if(DependResult.FAILED == tmpResult){
break;
}else{
results.add(getDependTaskResult(taskNode.getName(),processInstance));
}
}
if(DependResult.FAILED == tmpResult){
result = DependResult.FAILED;
}else if(results.contains(DependResult.WAITING)){
result = DependResult.WAITING;
}else{
result = DependResult.SUCCESS;
}
}else{
result = DependResult.FAILED;
}
result = dependResultByProcessInstance(processInstance);
}else{
result = getDependTaskResult(dependentItem.getDepTasks(),processInstance);
}
@ -143,6 +121,32 @@ public class DependentExecute {
return result;
}
/**
* depend type = depend_all
* skip the condition tasks.
* judge all the task
* @return
*/
private DependResult dependResultByProcessInstance(ProcessInstance processInstance){
DependResult result = DependResult.FAILED;
List<TaskNode> taskNodes =
processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId());
if(CollectionUtils.isEmpty(taskNodes)) {
return result;
}
for(TaskNode taskNode:taskNodes){
if(taskNode.isConditionsTask()
|| DagHelper.haveConditionsAfterNode(taskNode.getName(), taskNodes)){
continue;
}
DependResult tmpResult = getDependTaskResult(taskNode.getName(),processInstance);
if(DependResult.SUCCESS != tmpResult){
return tmpResult;
}
}
return DependResult.SUCCESS;
}
/**
* get depend task result
* @param taskName
@ -150,7 +154,7 @@ public class DependentExecute {
* @return
*/
private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) {
DependResult result = DependResult.FAILED;
DependResult result;
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
@ -182,7 +186,7 @@ public class DependentExecute {
*/
private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) {
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval);
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime());
if(runningProcess != null){
return runningProcess;
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.conditions.ConditionsTask;
import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
@ -69,8 +68,6 @@ public class TaskManager {
return new DataxTask(taskExecutionContext, logger);
case SQOOP:
return new SqoopTask(taskExecutionContext, logger);
case CONDITIONS:
return new ConditionsTask(taskExecutionContext, logger);
default:
logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
throw new IllegalArgumentException("not support task type");

198
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java

@ -1,198 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.dependent;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
/**
* Dependent Task
*/
public class DependentTask extends AbstractTask {
/**
* dependent task list
*/
private List<DependentExecute> dependentTaskList = new ArrayList<>();
/**
* depend item result map
* save the result to log file
*/
private Map<String, DependResult> dependResultMap = new HashMap<>();
/**
* dependent parameters
*/
private DependentParameters dependentParameters;
/**
* dependent date
*/
private Date dependentDate;
/**
* process service
*/
private ProcessService processService;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public DependentTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init(){
logger.info("dependent task initialize");
this.dependentParameters = JSONUtils.parseObject(null,
DependentParameters.class);
if(dependentParameters != null){
for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){
this.dependentTaskList.add(new DependentExecute(
taskModel.getDependItemList(), taskModel.getRelation()));
}
}
this.processService = SpringApplicationContext.getBean(ProcessService.class);
if(taskExecutionContext.getScheduleTime() != null){
this.dependentDate = taskExecutionContext.getScheduleTime();
}else{
this.dependentDate = taskExecutionContext.getStartTime();
}
}
@Override
public void handle() throws Exception {
// set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
try{
TaskInstance taskInstance = null;
while(Stopper.isRunning()){
taskInstance = processService.findTaskInstanceById(this.taskExecutionContext.getTaskInstanceId());
if(taskInstance == null){
exitStatusCode = -1;
break;
}
if(taskInstance.getState() == ExecutionStatus.KILL){
this.cancel = true;
}
if(this.cancel || allDependentTaskFinish()){
break;
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if(cancel){
exitStatusCode = Constants.EXIT_CODE_KILL;
}else{
DependResult result = getTaskDependResult();
exitStatusCode = (result == DependResult.SUCCESS) ?
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
}catch (Exception e){
logger.error(e.getMessage(),e);
exitStatusCode = -1;
throw e;
}
}
/**
* get dependent result
* @return DependResult
*/
private DependResult getTaskDependResult(){
List<DependResult> dependResultList = new ArrayList<>();
for(DependentExecute dependentExecute : dependentTaskList){
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate);
dependResultList.add(dependResult);
}
DependResult result = DependentUtils.getDependResultForRelation(
this.dependentParameters.getRelation(), dependResultList
);
return result;
}
/**
* judge all dependent tasks finish
* @return whether all dependent tasks finish
*/
private boolean allDependentTaskFinish(){
boolean finish = true;
for(DependentExecute dependentExecute : dependentTaskList){
for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) {
if(!dependResultMap.containsKey(entry.getKey())){
dependResultMap.put(entry.getKey(), entry.getValue());
//save depend result to log
logger.info("dependent item complete {} {},{}",
DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString());
}
}
if(!dependentExecute.finish(dependentDate)){
finish = false;
}
}
return finish;
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
this.cancel = true;
}
@Override
public AbstractParameters getParameters() {
return null;
}
}

26
dolphinscheduler-server/src/main/resources/logback-master.xml

@ -29,7 +29,30 @@
</encoder>
</appender>
<conversionRule conversionWord="messsage"
converterClass="org.apache.dolphinscheduler.server.log.SensitiveDataConverter"/>
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
<!-- master server logback config start -->
<appender name="MASTERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-master.log</file>
@ -52,6 +75,7 @@
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="MASTERLOGFILE"/>
</root>

132
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.ConditionsTaskExecThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.ArrayList;
import java.util.List;
@RunWith(MockitoJUnitRunner.Silent.class)
public class ConditionsTaskTest {
private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class);
private ProcessService processService;
private ApplicationContext applicationContext;
private MasterConfig config;
@Before
public void before() {
config = new MasterConfig();
config.setMasterTaskCommitRetryTimes(3);
config.setMasterTaskCommitInterval(1000);
processService = Mockito.mock(ProcessService.class);
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
Mockito.when(processService
.findTaskInstanceById(252612))
.thenReturn(getTaskInstance());
Mockito.when(processService.saveTaskInstance(getTaskInstance()))
.thenReturn(true);
Mockito.when(processService.findProcessInstanceById(10112))
.thenReturn(getProcessInstance());
Mockito.when(processService
.findValidTaskListByProcessId(10112))
.thenReturn(getTaskInstances());
}
@Test
public void testCondition(){
TaskInstance taskInstance = getTaskInstance();
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"depTasks\":\"1\",\"status\":\"SUCCESS\"}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
String conditionResult = "{\"successNode\":[\"2\"],\"failedNode\":[\"3\"]}";
taskInstance.setDependency(dependString);
Mockito.when(processService.submitTask(taskInstance))
.thenReturn(taskInstance);
ConditionsTaskExecThread conditions =
new ConditionsTaskExecThread(taskInstance);
try {
conditions.call();
} catch (Exception e) {
e.printStackTrace();
}
Assert.assertEquals(ExecutionStatus.SUCCESS, conditions.getTaskInstance().getState());
}
private TaskInstance getTaskInstance(){
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setTaskType("CONDITIONS");
taskInstance.setProcessInstanceId(10112);
taskInstance.setProcessDefinitionId(100001);
return taskInstance;
}
private List<TaskInstance> getTaskInstances(){
List<TaskInstance> list = new ArrayList<>();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(199999);
taskInstance.setName("1");
taskInstance.setState(ExecutionStatus.SUCCESS);
list.add(taskInstance);
return list;
}
private ProcessInstance getProcessInstance(){
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(10112);
processInstance.setProcessDefinitionId(100001);
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
return processInstance;
}
}

68
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java → dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.dependent;
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
@ -50,12 +50,23 @@ public class DependentTaskTest {
private ApplicationContext applicationContext;
private MasterConfig config;
@Before
public void before() throws Exception{
config = new MasterConfig();
config.setMasterTaskCommitRetryTimes(3);
config.setMasterTaskCommitInterval(1000);
processService = Mockito.mock(ProcessService.class);
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0);
Mockito.when(processService
.findLastRunningProcess(4,DependentDateUtils.getTodayInterval(new Date()).get(0)))
.findLastRunningProcess(4, dateInterval.getStartTime(),
dateInterval.getEndTime()))
.thenReturn(findLastProcessInterval());
Mockito.when(processService
.getTaskNodeListByDefinitionId(4))
.thenReturn(getTaskNodes());
@ -66,32 +77,62 @@ public class DependentTaskTest {
Mockito.when(processService
.findTaskInstanceById(252612))
.thenReturn(getTaskInstance());
Mockito.when(processService.findProcessInstanceById(10111))
.thenReturn(getProcessInstance());
Mockito.when(processService.findProcessDefineById(0))
.thenReturn(getProcessDefinition());
Mockito.when(processService.saveTaskInstance(getTaskInstance()))
.thenReturn(true);
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
}
@Test
public void test() throws Exception{
TaskProps taskProps = new TaskProps();
TaskInstance taskInstance = getTaskInstance();
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
taskProps.setDependence(dependString);
taskProps.setTaskStartTime(new Date());
DependentTask dependentTask = new DependentTask(new TaskExecutionContext(), logger);
dependentTask.init();
dependentTask.handle();
Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_SUCCESS );
taskInstance.setDependency(dependString);
Mockito.when(processService.submitTask(taskInstance))
.thenReturn(taskInstance);
DependentTaskExecThread dependentTask =
new DependentTaskExecThread(taskInstance);
dependentTask.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState());
}
private ProcessInstance findLastProcessInterval(){
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(11);
processInstance.setProcessDefinitionId(4);
processInstance.setState(ExecutionStatus.SUCCESS);
return processInstance;
}
private ProcessDefinition getProcessDefinition(){
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(0);
return processDefinition;
}
private ProcessInstance getProcessInstance(){
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(10111);
processInstance.setProcessDefinitionId(0);
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
return processInstance;
}
private List<TaskNode> getTaskNodes(){
List<TaskNode> list = new ArrayList<>();
TaskNode taskNode = new TaskNode();
@ -113,9 +154,10 @@ public class DependentTaskTest {
private TaskInstance getTaskInstance(){
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("DEPENDENT");
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setState(ExecutionStatus.SUCCESS);
taskInstance.setProcessInstanceId(10111);
return taskInstance;
}

35
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -782,14 +782,13 @@ public class ProcessService {
* submit task to db
* submit sub process to command
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
logger.info("start submit task : {}, instance id:{}, state: {}, ",
taskInstance.getName(), processInstance.getId(), processInstance.getState() );
processInstance = this.findProcessInstanceDetailById(processInstance.getId());
public TaskInstance submitTask(TaskInstance taskInstance){
ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
logger.info("start submit task : {}, instance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if(task == null){
@ -1637,13 +1636,14 @@ public class ProcessService {
/**
* find last running process instance
* @param definitionId process definition id
* @param dateInterval dateInterval
* @param startTime start time
* @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, DateInterval dateInterval) {
public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionId,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
startTime,
endTime,
stateArray);
}
@ -1799,5 +1799,22 @@ public class ProcessService {
return resourceMapper.listResourceByIds(resIds);
}
/**
* format task app id in task instance
* @param taskInstance
* @return
*/
public String formatTaskAppId(TaskInstance taskInstance){
ProcessDefinition definition = this.findProcessDefineById(taskInstance.getProcessDefinitionId());
ProcessInstance processInstanceById = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
if(definition == null || processInstanceById == null){
return "";
}
return String.format("%s_%s_%s",
definition.getId(),
processInstanceById.getId(),
taskInstance.getId());
}
}

2
pom.xml

@ -782,6 +782,8 @@
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/AlertManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include>
<include>**/server/master/DependentTaskTest.java</include>
<include>**/server/master/ConditionsTaskTest.java</include>
<include>**/server/master/MasterExecThreadTest.java</include>
<include>**/server/master/ParamsTest.java</include>
<include>**/server/register/ZookeeperNodeManagerTest.java</include>

Loading…
Cancel
Save