diff --git a/dolphinscheduler-dao/src/main/resources/application.properties b/dolphinscheduler-dao/src/main/resources/application.properties index 06b0ee94d5..c928c72df6 100644 --- a/dolphinscheduler-dao/src/main/resources/application.properties +++ b/dolphinscheduler-dao/src/main/resources/application.properties @@ -22,7 +22,7 @@ spring.datasource.driver-class-name=org.postgresql.Driver spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler # mysql #spring.datasource.driver-class-name=com.mysql.jdbc.Driver -#spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 +#spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 # h2 #spring.datasource.driver-class-name=org.h2.Driver #spring.datasource.url=jdbc:h2:file:../sql/h2;AUTO_SERVER=TRUE diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java index b08cabc2e9..087bb80ccb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.DependentRelation; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -82,7 +83,7 @@ public class DependentExecute { * @param currentTime current time * @return DependResult */ - public DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){ + private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){ List dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue()); return calculateResultForTasks(dependentItem, dateIntervals ); } @@ -94,7 +95,8 @@ public class DependentExecute { * @return dateIntervals */ private DependResult calculateResultForTasks(DependentItem dependentItem, - List dateIntervals) { + List dateIntervals) { + DependResult result = DependResult.FAILED; for(DateInterval dateInterval : dateIntervals){ ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), @@ -104,25 +106,35 @@ public class DependentExecute { dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() ); return DependResult.FAILED; } + // need to check workflow for updates, so get all task and check the task state if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){ - result = getDependResultByState(processInstance.getState()); - }else{ - TaskInstance taskInstance = null; - List taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); + List taskNodes = + processService.getTaskNodeListByDefinitionId(dependentItem.getDefinitionId()); - for(TaskInstance task : taskInstanceList){ - if(task.getName().equals(dependentItem.getDepTasks())){ - taskInstance = task; - break; + if(taskNodes != null && taskNodes.size() > 0){ + List results = new ArrayList<>(); + DependResult tmpResult = DependResult.FAILED; + for(TaskNode taskNode:taskNodes){ + tmpResult = getDependTaskResult(taskNode.getName(),processInstance); + if(DependResult.FAILED == tmpResult){ + break; + }else{ + results.add(getDependTaskResult(taskNode.getName(),processInstance)); + } + } + + if(DependResult.FAILED == tmpResult){ + result = DependResult.FAILED; + }else if(results.contains(DependResult.WAITING)){ + result = DependResult.WAITING; + }else{ + result = DependResult.SUCCESS; } - } - if(taskInstance == null){ - // cannot find task in the process instance - // maybe because process instance is running or failed. - result = getDependResultByState(processInstance.getState()); }else{ - result = getDependResultByState(taskInstance.getState()); + result = DependResult.FAILED; } + }else{ + result = getDependTaskResult(dependentItem.getDepTasks(),processInstance); } if(result != DependResult.SUCCESS){ break; @@ -131,6 +143,35 @@ public class DependentExecute { return result; } + /** + * get depend task result + * @param taskName + * @param processInstance + * @return + */ + private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) { + DependResult result = DependResult.FAILED; + TaskInstance taskInstance = null; + List taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); + + for(TaskInstance task : taskInstanceList){ + if(task.getName().equals(taskName)){ + taskInstance = task; + break; + } + } + + if(taskInstance == null){ + // cannot find task in the process instance + // maybe because process instance is running or failed. + result = getDependResultByProcessStateWhenTaskNull(processInstance.getState()); + }else{ + result = getDependResultByState(taskInstance.getState()); + } + + return result; + } + /** * find the last one process instance that : * 1. manual run and finish between the interval @@ -172,7 +213,9 @@ public class DependentExecute { */ private DependResult getDependResultByState(ExecutionStatus state) { - if(state.typeIsRunning() || state == ExecutionStatus.SUBMITTED_SUCCESS || state == ExecutionStatus.WAITTING_THREAD){ + if(state.typeIsRunning() + || state == ExecutionStatus.SUBMITTED_SUCCESS + || state == ExecutionStatus.WAITTING_THREAD){ return DependResult.WAITING; }else if(state.typeIsSuccess()){ return DependResult.SUCCESS; @@ -181,6 +224,22 @@ public class DependentExecute { } } + /** + * get dependent result by task instance state when task instance is null + * @param state state + * @return DependResult + */ + private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) { + + if(state.typeIsRunning() + || state == ExecutionStatus.SUBMITTED_SUCCESS + || state == ExecutionStatus.WAITTING_THREAD){ + return DependResult.WAITING; + }else{ + return DependResult.FAILED; + } + } + /** * judge depend item finished * @param currentTime current time @@ -222,7 +281,7 @@ public class DependentExecute { * @param currentTime current time * @return DependResult */ - public DependResult getDependResultForItem(DependentItem item, Date currentTime){ + private DependResult getDependResultForItem(DependentItem item, Date currentTime){ String key = item.getKey(); if(dependResultMap.containsKey(key)){ return dependResultMap.get(key); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java index f074d57e6c..b426d32502 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -82,10 +82,11 @@ public class DependentTask extends AbstractTask { this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(), DependentParameters.class); - - for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ - this.dependentTaskList.add(new DependentExecute( - taskModel.getDependItemList(), taskModel.getRelation())); + if(dependentParameters != null){ + for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ + this.dependentTaskList.add(new DependentExecute( + taskModel.getDependItemList(), taskModel.getRelation())); + } } this.processService = SpringApplicationContext.getBean(ProcessService.class); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java index 272fb546da..c13a7647fe 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java @@ -17,47 +17,106 @@ package org.apache.dolphinscheduler.server.worker.task.dependent; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.DateInterval; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +@RunWith(MockitoJUnitRunner.Silent.class) public class DependentTaskTest { private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); + private ProcessService processService; + private ApplicationContext applicationContext; - @Test - public void testDependInit() throws Exception{ - TaskProps taskProps = new TaskProps(); + @Before + public void before() throws Exception{ + processService = Mockito.mock(ProcessService.class); + Mockito.when(processService + .findLastRunningProcess(4,DependentDateUtils.getTodayInterval(new Date()).get(0))) + .thenReturn(findLastProcessInterval()); + Mockito.when(processService + .getTaskNodeListByDefinitionId(4)) + .thenReturn(getTaskNodes()); + Mockito.when(processService + .findValidTaskListByProcessId(11)) + .thenReturn(getTaskInstances()); - String dependString = "{\n" + - "\"dependTaskList\":[\n" + - " {\n" + - " \"dependItemList\":[\n" + - " {\n" + - " \"definitionId\": 101,\n" + - " \"depTasks\": \"ALL\",\n" + - " \"cycle\": \"day\",\n" + - " \"dateValue\": \"last1Day\"\n" + - " }\n" + - " ],\n" + - " \"relation\": \"AND\"\n" + - " }\n" + - " ],\n" + - "\"relation\":\"OR\"\n" + - "}"; + Mockito.when(processService + .findTaskInstanceById(252612)) + .thenReturn(getTaskInstance()); + applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + } + @Test + public void test() throws Exception{ + + TaskProps taskProps = new TaskProps(); + String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; taskProps.setTaskInstId(252612); taskProps.setDependence(dependString); + taskProps.setTaskStartTime(new Date()); DependentTask dependentTask = new DependentTask(taskProps, logger); dependentTask.init(); dependentTask.handle(); - Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_FAILURE ); + Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_SUCCESS ); } + private ProcessInstance findLastProcessInterval(){ + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(11); + processInstance.setState(ExecutionStatus.SUCCESS); + return processInstance; + } + + private List getTaskNodes(){ + List list = new ArrayList<>(); + TaskNode taskNode = new TaskNode(); + taskNode.setName("C"); + taskNode.setType("SQL"); + list.add(taskNode); + return list; + } + private List getTaskInstances(){ + List list = new ArrayList<>(); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setName("C"); + taskInstance.setState(ExecutionStatus.SUCCESS); + taskInstance.setDependency("1231"); + list.add(taskInstance); + return list; + } + + private TaskInstance getTaskInstance(){ + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(252612); + taskInstance.setName("C"); + taskInstance.setState(ExecutionStatus.SUCCESS); + return taskInstance; + } } \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 3312c1004a..ce4424e24a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -236,6 +236,30 @@ public class ProcessService { return processInstanceMapper.queryDetailById(processId); } + /** + * get task node list by definitionId + * @param defineId + * @return + */ + public List getTaskNodeListByDefinitionId(Integer defineId){ + ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); + if (processDefinition == null) { + logger.info("process define not exists"); + return null; + } + + String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + + //process data check + if (null == processData) { + logger.error("process data is null"); + return null; + } + + return processData.getTasks(); + } + /** * find process instance by id * @param processId processId diff --git a/pom.xml b/pom.xml index ec0435cc22..84df526317 100644 --- a/pom.xml +++ b/pom.xml @@ -740,6 +740,7 @@ **/server/worker/task/datax/DataxTaskTest.java **/server/worker/task/shell/ShellTaskTest.java **/server/worker/task/sqoop/SqoopTaskTest.java + **/server/worker/task/dependent/DependentTaskTest.java **/server/utils/DataxUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java **/dao/datasource/BaseDataSourceTest.java