diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index a90d927154..b82da62b02 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -16,24 +16,23 @@ */ package org.apache.dolphinscheduler.dao.entity; -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableField; -import com.fasterxml.jackson.annotation.JsonFormat; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.utils.*; -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; import java.util.Date; import java.util.Map; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; + /** * task instance */ @@ -382,16 +381,16 @@ public class TaskInstance implements Serializable { this.appLink = appLink; } - - - public String getDependency(){ - - if(this.dependency != null){ + public String getDependency() { + if (this.dependency != null) { return this.dependency; } TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); + return taskNode == null ? null : taskNode.getDependence(); + } - return taskNode.getDependence(); + public void setDependency(String dependency) { + this.dependency = dependency; } public Flag getFlag() { @@ -495,10 +494,6 @@ public class TaskInstance implements Serializable { } } - public void setDependency(String dependency) { - this.dependency = dependency; - } - public Priority getTaskInstancePriority() { return taskInstancePriority; } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java index 9c59670872..5742c95a5d 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java @@ -16,6 +16,9 @@ */ package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.utils.JSONUtils; + import org.junit.Assert; import org.junit.Test; @@ -43,7 +46,36 @@ public class TaskInstanceTest { //sub process taskInstance.setTaskType("DEPENDENT"); Assert.assertTrue(taskInstance.isDependTask()); + } + + /** + * test for TaskInstance.getDependence + */ + @Test + public void testTaskInstanceGetDependence() { + TaskInstance taskInstance; + TaskNode taskNode; + + taskInstance = new TaskInstance(); + taskInstance.setTaskJson(null); + Assert.assertNull(taskInstance.getDependency()); + + taskInstance = new TaskInstance(); + taskNode = new TaskNode(); + taskNode.setDependence(null); + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + Assert.assertNull(taskInstance.getDependency()); + taskInstance = new TaskInstance(); + taskNode = new TaskNode(); + // expect a JSON here, and will be unwrap when toJsonString + taskNode.setDependence("\"A\""); + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + Assert.assertEquals("A", taskInstance.getDependency()); + taskInstance = new TaskInstance(); + taskInstance.setTaskJson(null); + taskInstance.setDependency("{}"); + Assert.assertEquals("{}", taskInstance.getDependency()); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index f1ee8ccf11..61058de864 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -16,14 +16,26 @@ */ package org.apache.dolphinscheduler.server.master; - +import org.apache.dolphinscheduler.common.enums.DependentRelation; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.ConditionsTaskExecThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -34,99 +46,144 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import java.util.ArrayList; -import java.util.List; - @RunWith(MockitoJUnitRunner.Silent.class) public class ConditionsTaskTest { - private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); - private ProcessService processService; - private ApplicationContext applicationContext; + /** + * TaskNode.runFlag : task can be run normally + */ + public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL"; + private ProcessService processService; - private MasterConfig config; + private ProcessInstance processInstance; @Before public void before() { - config = new MasterConfig(); + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + + MasterConfig config = new MasterConfig(); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setMasterTaskCommitRetryTimes(3); config.setMasterTaskCommitInterval(1000); + processService = Mockito.mock(ProcessService.class); - applicationContext = Mockito.mock(ApplicationContext.class); - SpringApplicationContext springApplicationContext = new SpringApplicationContext(); - springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + processInstance = getProcessInstance(); Mockito.when(processService - .findTaskInstanceById(252612)) - .thenReturn(getTaskInstance()); + .findProcessInstanceById(processInstance.getId())) + .thenReturn(processInstance); + } - Mockito.when(processService.saveTaskInstance(getTaskInstance())) - .thenReturn(true); + private TaskInstance testBasicInit(ExecutionStatus expectResult) { + TaskInstance taskInstance = getTaskInstance(getTaskNode(), processInstance); - Mockito.when(processService.findProcessInstanceById(10112)) - .thenReturn(getProcessInstance()); + // for MasterBaseTaskExecThread.submit + Mockito.when(processService + .submitTask(taskInstance)) + .thenReturn(taskInstance); + // for MasterBaseTaskExecThread.call + Mockito.when(processService + .findTaskInstanceById(taskInstance.getId())) + .thenReturn(taskInstance); + // for ConditionsTaskExecThread.initTaskParameters + Mockito.when(processService + .saveTaskInstance(taskInstance)) + .thenReturn(true); + // for ConditionsTaskExecThread.updateTaskState + Mockito.when(processService + .updateTaskInstance(taskInstance)) + .thenReturn(true); + // for ConditionsTaskExecThread.waitTaskQuit + List conditions = Stream.of( + getTaskInstanceForValidTaskList(1001, "1", expectResult) + ).collect(Collectors.toList()); Mockito.when(processService - .findValidTaskListByProcessId(10112)) - .thenReturn(getTaskInstances()); + .findValidTaskListByProcessId(processInstance.getId())) + .thenReturn(conditions); + + return taskInstance; } @Test - public void testCondition(){ - TaskInstance taskInstance = getTaskInstance(); - String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"depTasks\":\"1\",\"status\":\"SUCCESS\"}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; - String conditionResult = "{\"successNode\":[\"2\"],\"failedNode\":[\"3\"]}"; + public void testBasicSuccess() throws Exception { + TaskInstance taskInstance = testBasicInit(ExecutionStatus.SUCCESS); + ConditionsTaskExecThread taskExecThread = new ConditionsTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); + } - taskInstance.setDependency(dependString); - Mockito.when(processService.submitTask(taskInstance)) - .thenReturn(taskInstance); - ConditionsTaskExecThread conditions = - new ConditionsTaskExecThread(taskInstance); + @Test + public void testBasicFailure() throws Exception { + TaskInstance taskInstance = testBasicInit(ExecutionStatus.FAILURE); + ConditionsTaskExecThread taskExecThread = new ConditionsTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.FAILURE, taskExecThread.getTaskInstance().getState()); + } - try { - conditions.call(); - } catch (Exception e) { - e.printStackTrace(); - } + private TaskNode getTaskNode() { + TaskNode taskNode = new TaskNode(); + taskNode.setId("tasks-1000"); + taskNode.setName("C"); + taskNode.setType(TaskType.CONDITIONS.toString()); + taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL); - Assert.assertEquals(ExecutionStatus.SUCCESS, conditions.getTaskInstance().getState()); - } + DependentItem dependentItem = new DependentItem(); + dependentItem.setDepTasks("1"); + dependentItem.setStatus(ExecutionStatus.SUCCESS); + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + dependentTaskModel.setDependItemList(Stream.of(dependentItem).collect(Collectors.toList())); + dependentTaskModel.setRelation(DependentRelation.AND); - private TaskInstance getTaskInstance(){ - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(252612); - taskInstance.setName("C"); - taskInstance.setTaskType("CONDITIONS"); - taskInstance.setProcessInstanceId(10112); - taskInstance.setProcessDefinitionId(100001); - return taskInstance; - } + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList())); + dependentParameters.setRelation(DependentRelation.AND); + // in: AND(AND(1 is SUCCESS)) + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + ConditionsParameters conditionsParameters = new ConditionsParameters(); + conditionsParameters.setSuccessNode(Stream.of("2").collect(Collectors.toList())); + conditionsParameters.setFailedNode(Stream.of("3").collect(Collectors.toList())); - private List getTaskInstances(){ - List list = new ArrayList<>(); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(199999); - taskInstance.setName("1"); - taskInstance.setState(ExecutionStatus.SUCCESS); - list.add(taskInstance); - return list; + // out: SUCCESS => 2, FAILED => 3 + taskNode.setConditionResult(JSONUtils.toJsonString(conditionsParameters)); + + return taskNode; } - private ProcessInstance getProcessInstance(){ + private ProcessInstance getProcessInstance() { ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(10112); - processInstance.setProcessDefinitionId(100001); + processInstance.setId(1000); + processInstance.setProcessDefinitionId(1000); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); return processInstance; } + private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance processInstance) { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1000); + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + taskInstance.setName(taskNode.getName()); + taskInstance.setTaskType(taskNode.getType()); + taskInstance.setProcessInstanceId(processInstance.getId()); + taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); + return taskInstance; + } + + private TaskInstance getTaskInstanceForValidTaskList(int id, String name, ExecutionStatus state) { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(id); + taskInstance.setName(name); + taskInstance.setState(state); + return taskInstance; + } } diff --git a/pom.xml b/pom.xml index 5a3590affd..2937a814fc 100644 --- a/pom.xml +++ b/pom.xml @@ -797,6 +797,7 @@ **/dao/mapper/CommandMapperTest.java **/dao/mapper/ConnectionFactoryTest.java **/dao/mapper/DataSourceMapperTest.java + **/dao/entity/TaskInstanceTest.java **/dao/entity/UdfFuncTest.java **/remote/JsonSerializerTest.java **/remote/RemoveTaskLogResponseCommandTest.java @@ -820,7 +821,7 @@ **/server/master/AlertManagerTest.java **/server/master/MasterCommandTest.java **/server/master/DependentTaskTest.java - + **/server/master/ConditionsTaskTest.java **/server/master/MasterExecThreadTest.java **/server/master/ParamsTest.java **/server/register/ZookeeperNodeManagerTest.java