From 86ce8f133a42a2464a4916c6e0b42dc74628c70d Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Sat, 3 Apr 2021 17:58:04 +0800 Subject: [PATCH] [JsonSplit-4417][master/worker/api]json split- remove json in task instance. (#5160) * [json split] refactor json in task instance * add task_pramas in task_instance * feature #4417 refactor json split in task instance * code style * code style * code style * code style * code style * update * update * update unit test * update --- .../impl/ProcessDefinitionServiceImpl.java | 15 +- .../service/ProcessDefinitionServiceTest.java | 33 +--- .../common/enums/TaskTimeoutStrategy.java | 1 + .../dao/entity/TaskInstance.java | 33 +++- .../mapper/ProcessTaskRelationLogMapper.java | 5 + .../dao/mapper/TaskDefinitionLogMapper.java | 7 +- .../mapper/ProcessTaskRelationLogMapper.xml | 10 + .../dao/entity/TaskInstanceTest.java | 37 ++-- .../dao/mapper/TaskInstanceMapperTest.java | 1 - .../builder/TaskExecutionContextBuilder.java | 13 +- .../impl/TaskInstanceCacheManagerImpl.java | 1 - .../consumer/TaskPriorityQueueConsumer.java | 41 ++-- .../runner/ConditionsTaskExecThread.java | 2 +- .../runner/DependentTaskExecThread.java | 3 +- .../runner/MasterBaseTaskExecThread.java | 12 +- .../master/runner/MasterExecThread.java | 9 +- .../worker/runner/TaskExecuteThread.java | 39 +--- .../server/master/ConditionsTaskTest.java | 1 - .../server/master/DependentTaskTest.java | 22 ++- .../server/master/SubProcessTaskTest.java | 3 +- .../TaskPriorityQueueConsumerTest.java | 182 +----------------- .../runner/MasterTaskExecThreadTest.java | 1 - .../shell/ShellCommandExecutorTest.java | 1 - .../server/worker/sql/SqlExecutorTest.java | 4 +- .../service/process/ProcessService.java | 93 ++++++++- .../service/process/ProcessServiceTest.java | 2 +- sql/dolphinscheduler_mysql.sql | 1 + sql/dolphinscheduler_postgre.sql | 1 + 28 files changed, 238 insertions(+), 335 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 319c636c77..13048c5b21 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -57,6 +57,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; @@ -70,6 +72,8 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.commons.collections.map.HashedMap; + import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -1249,6 +1253,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * List of process instances */ List processInstanceList = processInstanceService.queryByProcessDefineCode(processDefinition.getCode(), limit); + List taskDefinitionList = processService.queryTaskDefinitionList(processDefinition.getCode(), + processDefinition.getVersion()); + Map taskDefinitionMap = new HashedMap(); + taskDefinitionList.forEach(taskDefinitionLog -> taskDefinitionMap.put(taskDefinitionLog.getCode(), taskDefinitionLog)); for (ProcessInstance processInstance : processInstanceList) { processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())); @@ -1305,11 +1313,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * if process is sub process, the return sub id, or sub id=0 */ - if (taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) { - String taskJson = taskInstance.getTaskJson(); - taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); + if (taskInstance.isSubProcess()) { + TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode()); subProcessId = Integer.parseInt(JSONUtils.parseObject( - taskNode.getParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText()); + taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText()); } treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getState().toString() , taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 234dd4dc4d..9735512370 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -837,38 +837,7 @@ public class ProcessDefinitionServiceTest { taskInstance.setName("test_task_instance"); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setHost("192.168.xx.xx"); - taskInstance.setTaskJson("{\n" - + " \"conditionResult\": {\n" - + " \"failedNode\": [\n" - + " \"\"\n" - + " ],\n" - + " \"successNode\": [\n" - + " \"\"\n" - + " ]\n" - + " },\n" - + " \"delayTime\": \"0\",\n" - + " \"dependence\": {},\n" - + " \"description\": \"\",\n" - + " \"id\": \"1\",\n" - + " \"maxRetryTimes\": \"0\",\n" - + " \"name\": \"test_task_instance\",\n" - + " \"params\": {\n" - + " \"processDefinitionId\": \"222\",\n" - + " \"resourceList\": []\n" - + " },\n" - + " \"preTasks\": [],\n" - + " \"retryInterval\": \"1\",\n" - + " \"runFlag\": \"NORMAL\",\n" - + " \"taskInstancePriority\": \"MEDIUM\",\n" - + " \"timeout\": {\n" - + " \"enable\": false,\n" - + " \"interval\": null,\n" - + " \"strategy\": \"\"\n" - + " },\n" - + " \"type\": \"SUB_PROCESS\",\n" - + " \"workerGroup\": \"default\"\n" - + "}"); - //task instance exist + taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n"); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java index a8bd3255de..335b986010 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java @@ -57,4 +57,5 @@ public enum TaskTimeoutStrategy { } throw new IllegalArgumentException("invalid status : " + status); } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 73d4bca60b..00f2b9a91b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; @@ -75,7 +76,7 @@ public class TaskInstance implements Serializable { private long taskCode; /** - * process definition code + * process definition code */ private long processDefinitionCode; @@ -184,7 +185,7 @@ public class TaskInstance implements Serializable { * dependency */ @TableField(exist = false) - private String dependency; + private DependentParameters dependency; /** * duration @@ -235,7 +236,7 @@ public class TaskInstance implements Serializable { * varPool string */ private String varPool; - + /** * executor name */ @@ -251,6 +252,11 @@ public class TaskInstance implements Serializable { */ private int delayTime; + /** + * task params + */ + private String taskParams; + public void init(String host, Date startTime, String executePath) { this.host = host; this.startTime = startTime; @@ -264,7 +270,7 @@ public class TaskInstance implements Serializable { public void setVarPool(String varPool) { this.varPool = varPool; } - + public ProcessInstance getProcessInstance() { return processInstance; } @@ -429,15 +435,14 @@ public class TaskInstance implements Serializable { this.appLink = appLink; } - public String getDependency() { - if (this.dependency != null) { - return this.dependency; + public DependentParameters getDependency() { + if (this.dependency == null) { + this.dependency = JSONUtils.parseObject(this.getTaskParams(), DependentParameters.class); } - TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); - return taskNode == null ? null : taskNode.getDependence(); + return this.dependency; } - public void setDependency(String dependency) { + public void setDependency(DependentParameters dependency) { this.dependency = dependency; } @@ -644,4 +649,12 @@ public class TaskInstance implements Serializable { public void setTaskDefinitionVersion(int taskDefinitionVersion) { this.taskDefinitionVersion = taskDefinitionVersion; } + + public String getTaskParams() { + return taskParams; + } + + public void setTaskParams(String taskParams) { + this.taskParams = taskParams; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java index deda046f76..9183b0f624 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java @@ -39,4 +39,9 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper queryByProcessCodeAndVersion(@Param("processCode") long processCode, @Param("processVersion") int processVersion); + + List queryByTaskRelationList(@Param("processCode") long processCode, + @Param("processVersion") int processVersion, + @Param("taskCode") long taskCode, + @Param("taskVersion") long taskVersion); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java index 59ea5c567b..1ad40e0124 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java @@ -59,11 +59,12 @@ public interface TaskDefinitionLogMapper extends BaseMapper { TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("taskDefinitionCode") long taskDefinitionCode, @Param("version") int version); + /** - * query task definition log * - * @param taskDefinitions taskDefinition collection - * @return task definition log + * @param taskDefinitions + * @return */ List queryByTaskDefinitions(@Param("taskDefinitions") Collection taskDefinitions); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml index b604a1d377..e7e4a12455 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml @@ -29,4 +29,14 @@ WHERE process_definition_code = #{processCode} and process_definition_version = #{processVersion} + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java index 5742c95a5d..d2117dc620 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java @@ -16,9 +16,16 @@ */ package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.enums.DependentRelation; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import java.util.ArrayList; +import java.util.List; + import org.junit.Assert; import org.junit.Test; @@ -57,25 +64,33 @@ public class TaskInstanceTest { TaskNode taskNode; taskInstance = new TaskInstance(); - taskInstance.setTaskJson(null); Assert.assertNull(taskInstance.getDependency()); taskInstance = new TaskInstance(); taskNode = new TaskNode(); taskNode.setDependence(null); - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); Assert.assertNull(taskInstance.getDependency()); taskInstance = new TaskInstance(); - taskNode = new TaskNode(); - // expect a JSON here, and will be unwrap when toJsonString - taskNode.setDependence("\"A\""); - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); - Assert.assertEquals("A", taskInstance.getDependency()); + taskInstance.setTaskParams(JSONUtils.toJsonString(getDependentParameters())); + taskInstance.getDependency(); + } - taskInstance = new TaskInstance(); - taskInstance.setTaskJson(null); - taskInstance.setDependency("{}"); - Assert.assertEquals("{}", taskInstance.getDependency()); + /** + * + * @return + */ + private DependentParameters getDependentParameters() { + DependentParameters dependentParameters = new DependentParameters(); + List dependTaskList = new ArrayList<>(); + List dependentItems = new ArrayList<>(); + DependentItem dependentItem = new DependentItem(); + dependentItem.setDepTasks("A"); + dependentItem.setDefinitionCode(222L); + dependentItem.setCycle("today"); + dependentItems.add(dependentItem); + dependentParameters.setDependTaskList(dependTaskList); + dependentParameters.setRelation(DependentRelation.AND); + return dependentParameters; } } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index 6dc348c851..ee57a70ab1 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -107,7 +107,6 @@ public class TaskInstanceMapperTest { taskInstance.setState(state); taskInstance.setStartTime(new Date()); taskInstance.setEndTime(new Date()); - taskInstance.setTaskJson("{}"); taskInstance.setProcessInstanceId(processInstanceId); taskInstance.setTaskType(taskType); taskInstance.setProcessDefinitionCode(1L); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 7bfd9a0507..620100f0ad 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.builder; +import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; + import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.entity.*; @@ -44,7 +46,6 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setStartTime(taskInstance.getStartTime()); taskExecutionContext.setTaskType(taskInstance.getTaskType()); taskExecutionContext.setLogPath(taskInstance.getLogPath()); - taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup()); taskExecutionContext.setHost(taskInstance.getHost()); taskExecutionContext.setResources(taskInstance.getResources()); @@ -52,6 +53,16 @@ public class TaskExecutionContextBuilder { return this; } + public TaskExecutionContextBuilder buildTaskDefinitionRelatedInfo(TaskDefinition taskDefinition) { + int timeoutSeconds = taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT; + if (timeoutSeconds >= Integer.MAX_VALUE) { + timeoutSeconds = Integer.MAX_VALUE; + } + taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy().getCode()); + taskExecutionContext.setTaskTimeout(timeoutSeconds); + return this; + } + /** * build processInstance related info diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java index 4d55490a8d..0dc7035f23 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java @@ -76,7 +76,6 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { taskInstance.setStartTime(taskExecutionContext.getStartTime()); taskInstance.setTaskType(taskInstance.getTaskType()); taskInstance.setExecutePath(taskInstance.getExecutePath()); - taskInstance.setTaskJson(taskInstance.getTaskJson()); taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), taskInstance); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 69058a49b8..d19829221d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -145,7 +145,6 @@ public class TaskPriorityQueueConsumer extends Thread { TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); } } - } catch (Exception e) { logger.error("dispatcher task error", e); } @@ -201,8 +200,6 @@ public class TaskPriorityQueueConsumer extends Thread { // task type TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); - // task node - TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class); Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); @@ -221,7 +218,7 @@ public class TaskPriorityQueueConsumer extends Thread { String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); - taskInstance.setResources(getResourceFullNames(taskNode)); + taskInstance.setResources(getResourceFullNames(taskInstance)); SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); @@ -230,21 +227,21 @@ public class TaskPriorityQueueConsumer extends Thread { // SQL task if (taskType == TaskType.SQL) { - setSQLTaskRelation(sqlTaskExecutionContext, taskNode); + setSQLTaskRelation(sqlTaskExecutionContext, taskInstance); } // DATAX task if (taskType == TaskType.DATAX) { - setDataxTaskRelation(dataxTaskExecutionContext, taskNode); + setDataxTaskRelation(dataxTaskExecutionContext, taskInstance); } // procedure task if (taskType == TaskType.PROCEDURE) { - setProcedureTaskRelation(procedureTaskExecutionContext, taskNode); + setProcedureTaskRelation(procedureTaskExecutionContext, taskInstance); } if (taskType == TaskType.SQOOP) { - setSqoopTaskRelation(sqoopTaskExecutionContext, taskNode); + setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance); } return TaskExecutionContextBuilder.get() @@ -262,10 +259,10 @@ public class TaskPriorityQueueConsumer extends Thread { * set procedure task relation * * @param procedureTaskExecutionContext procedureTaskExecutionContext - * @param taskNode taskNode + * @param taskInstance taskInstance */ - private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) { - ProcedureParameters procedureParameters = JSONUtils.parseObject(taskNode.getParams(), ProcedureParameters.class); + private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) { + ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class); int datasourceId = procedureParameters.getDatasource(); DataSource datasource = processService.findDataSourceById(datasourceId); procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); @@ -275,10 +272,10 @@ public class TaskPriorityQueueConsumer extends Thread { * set datax task relation * * @param dataxTaskExecutionContext dataxTaskExecutionContext - * @param taskNode taskNode + * @param taskInstance taskInstance */ - protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) { - DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class); + protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) { + DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class); DataSource dbSource = processService.findDataSourceById(dataxParameters.getDataSource()); DataSource dbTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); @@ -300,10 +297,10 @@ public class TaskPriorityQueueConsumer extends Thread { * set sqoop task relation * * @param sqoopTaskExecutionContext sqoopTaskExecutionContext - * @param taskNode taskNode + * @param taskInstance taskInstance */ - private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { - SqoopParameters sqoopParameters = JSONUtils.parseObject(taskNode.getParams(), SqoopParameters.class); + private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) { + SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class); // sqoop job type is template set task relation if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) { @@ -331,10 +328,10 @@ public class TaskPriorityQueueConsumer extends Thread { * set SQL task relation * * @param sqlTaskExecutionContext sqlTaskExecutionContext - * @param taskNode taskNode + * @param taskInstance taskInstance */ - private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) { - SqlParameters sqlParameters = JSONUtils.parseObject(taskNode.getParams(), SqlParameters.class); + private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) { + SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class); int datasourceId = sqlParameters.getDatasource(); DataSource datasource = processService.findDataSourceById(datasourceId); sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); @@ -381,9 +378,9 @@ public class TaskPriorityQueueConsumer extends Thread { /** * get resource map key is full name and value is tenantCode */ - protected Map getResourceFullNames(TaskNode taskNode) { + protected Map getResourceFullNames(TaskInstance taskInstance) { Map resourcesMap = new HashMap<>(); - AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); + AbstractParameters baseParam = TaskParametersUtils.getParameters(taskInstance.getTaskType(), taskInstance.getTaskParams()); if (baseParam != null) { List projectResourceFiles = baseParam.getResourceFilesList(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java index d0b314e0bf..8cf6176771 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java @@ -130,7 +130,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { taskInstance.setStartTime(new Date()); this.processService.saveTaskInstance(taskInstance); - this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class); + this.dependentParameters = taskInstance.getDependency(); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java index 9f78e0c532..4ef4783659 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java @@ -102,8 +102,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { */ private void initDependParameters() { - this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), - DependentParameters.class); + this.dependentParameters = taskInstance.getDependency(); for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ this.dependentTaskList.add(new DependentExecute( diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index be666aed70..30aa39b969 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -19,12 +19,13 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; -import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -121,10 +122,11 @@ public class MasterBaseTaskExecThread implements Callable { * init task timeout parameters */ private void initTimeoutParams() { - String taskJson = taskInstance.getTaskJson(); - TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); - taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); - + TaskDefinition taskDefinition = processService.findTaskDefinition(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); + boolean timeoutEnable = taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN; + taskTimeoutParameter = new TaskTimeoutParameter(timeoutEnable, + taskDefinition.getTimeoutNotifyStrategy(), + taskDefinition.getTimeout()); if (taskTimeoutParameter.getEnable()) { checkTimeoutFlag = true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 41eb15017f..4f83958b22 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -380,7 +380,8 @@ public class MasterExecThread implements Runnable { */ private void buildFlowDag() throws Exception { recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); - List taskNodeList = processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>()); + List taskNodeList = + processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>()); forbiddenTaskList.clear(); taskNodeList.stream().forEach(taskNode -> { if (taskNode.isForbidden()) { @@ -485,8 +486,6 @@ public class MasterExecThread implements Runnable { taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); // process instance id taskInstance.setProcessInstanceId(processInstance.getId()); - // task instance node json - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); // task instance type taskInstance.setTaskType(taskNode.getType()); // task instance whether alert @@ -556,9 +555,7 @@ public class MasterExecThread implements Runnable { } } result.put(LOCAL_PARAMS, allParam); - taskNode.setParams(JSONUtils.toJsonString(result)); - // task instance node json - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + taskInstance.setTaskParams(JSONUtils.toJsonString(result)); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 409c2b7a2e..3263554e32 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -132,8 +132,6 @@ public class TaskExecuteThread implements Runnable, Delayed { return; } - // task node - TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); @@ -148,13 +146,10 @@ public class TaskExecuteThread implements Runnable, Delayed { taskExecutionContext.getResources(), logger); - taskExecutionContext.setTaskParams(taskNode.getParams()); + taskExecutionContext.setTaskParams(taskExecutionContext.getTaskParams()); taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setDefinedParams(getGlobalParamsMap()); - // set task timeout - setTaskTimeout(taskExecutionContext, taskNode); - taskExecutionContext.setTaskAppId(String.format("%s_%s_%s", taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), @@ -238,38 +233,6 @@ public class TaskExecuteThread implements Runnable, Delayed { return globalParamsMap; } - /** - * set task timeout - * @param taskExecutionContext TaskExecutionContext - * @param taskNode - */ - private void setTaskTimeout(TaskExecutionContext taskExecutionContext, TaskNode taskNode) { - // the default timeout is the maximum value of the integer - taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE); - TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); - if (taskTimeoutParameter.getEnable()) { - // get timeout strategy - taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode()); - switch (taskTimeoutParameter.getStrategy()) { - case WARN: - break; - case FAILED: - if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) { - taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60); - } - break; - case WARNFAILED: - if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) { - taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60); - } - break; - default: - logger.error("not support task timeout strategy: {}", taskTimeoutParameter.getStrategy()); - throw new IllegalArgumentException("not support task timeout strategy"); - - } - } - } /** * kill task diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index 61058de864..9a750e1a52 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -171,7 +171,6 @@ public class ConditionsTaskTest { private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance processInstance) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1000); - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); taskInstance.setName(taskNode.getName()); taskInstance.setTaskType(taskNode.getType()); taskInstance.setProcessInstanceId(processInstance.getId()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 8b0410b3f3..571e79a15c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.DependentRelation; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.model.DependentItem; import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -60,6 +61,10 @@ public class DependentTaskTest { */ public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL"; + + public static final Long TASK_CODE = 1111L; + public static final int TASK_VERSION = 1; + private ProcessService processService; /** @@ -113,6 +118,9 @@ public class DependentTaskTest { Mockito.when(processService .findTaskInstanceById(1000)) .thenAnswer(i -> taskInstance); + + Mockito.when(processService.findTaskDefinition(TASK_CODE, TASK_VERSION)) + .thenReturn(getTaskDefinition()); } private void testBasicInit() { @@ -359,14 +367,25 @@ public class DependentTaskTest { return taskNode; } + private TaskDefinition getTaskDefinition() { + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setCode(TASK_CODE); + taskDefinition.setVersion(TASK_VERSION); + taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE); + taskDefinition.setTimeout(0); + return taskDefinition; + } + private void setupTaskInstance(TaskNode taskNode) { taskInstance = new TaskInstance(); taskInstance.setId(1000); + taskInstance.setTaskCode(TASK_CODE); + taskInstance.setTaskDefinitionVersion(TASK_VERSION); taskInstance.setProcessInstanceId(processInstance.getId()); taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); taskInstance.setTaskType(taskNode.getType()); + taskInstance.setDependency(JSONUtils.parseObject(taskNode.getDependence(), DependentParameters.class)); taskInstance.setName(taskNode.getName()); } @@ -405,7 +424,6 @@ public class DependentTaskTest { taskInstance.setName(taskName); taskInstance.setProcessInstanceId(processInstance.getId()); taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); - taskInstance.setTaskJson("{}"); taskInstance.setState(state); return taskInstance; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java index 518b11631f..046ca5bde7 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java @@ -148,7 +148,8 @@ public class SubProcessTaskTest { private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance processInstance) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1000); - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + taskInstance.setName("S"); + taskInstance.setTaskType(TaskType.SUB_PROCESS.toString()); taskInstance.setName(taskNode.getName()); taskInstance.setTaskType(taskNode.getType()); taskInstance.setProcessInstanceId(processInstance.getId()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 14b1c61ce3..bf2cf4c4b3 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -110,25 +110,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," - + "\"conditionsTask\":false," - + "\"depList\":[]," - + "\"dependence\":\"{}\"," - + "\"forbidden\":false," - + "\"id\":\"tasks-55201\"," - + "\"maxRetryTimes\":0," - + "\"name\":\"测试任务\"," - + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," - + "\"preTasks\":\"[]\"," - + "\"retryInterval\":1," - + "\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false," - + "\"interval\":0}," - + "\"timeout\":\"{\\\"enable\\\":false," - + "\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"SHELL\"," - + "\"workerGroup\":\"default\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); @@ -161,13 +142,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\"," - + "\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\"," - + "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[]," - + "\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\"," - + "\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); @@ -212,26 +186,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," - + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\"," - + "\"forbidden\":false,\"id\":\"tasks-97625\"," - + "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\"," - + "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\"," - + " \\\"postStatements\\\":[]," - + " \\\"jobSpeedRecord\\\":1000," - + " \\\"customConfig\\\":0," - + " \\\"dtType\\\":\\\"MYSQL\\\"," - + " \\\"dsType\\\":\\\"MYSQL\\\"," - + " \\\"jobSpeedByte\\\":0," - + " \\\"dataSource\\\":80," - + " \\\"dataTarget\\\":80," - + " \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\"," - + " \\\"preStatements\\\":[]}\"," - + "\"preTasks\":\"[]\"," - + "\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"DATAX\"," - + "\"workerGroup\":\"default\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); @@ -274,32 +228,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\"," - + "\"forbidden\":false,\"id\":\"tasks-63634\"," - + "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\"," - + "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\"," - + " \\\"targetType\\\":\\\"HDFS\\\"," - + " \\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\"," - + " \\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\"," - + " \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\"," - + " \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\"," - + " \\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\"," - + " \\\"modelType\\\":\\\"import\\\"," - + " \\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\"," - + " \\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\"," - + " \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\"," - + " \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\"," - + " \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\"," - + " \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\"," - + " \\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\"," - + " \\\"localParams\\\":[],\\\"concurrency\\\":1}\"," - + "\"preTasks\":\"[]\"," - + "\"retryInterval\":1," - + "\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"SQOOP\"," - + "\"workerGroup\":\"default\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); @@ -342,16 +270,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," - + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\"," - + "\"forbidden\":false,\"id\":\"tasks-55201\"," - + "\"maxRetryTimes\":0,\"name\":\"测试任务\"," - + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\"," - + "\"retryInterval\":1,\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"SHELL\"," - + "\"workerGroup\":\"default\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); @@ -370,24 +288,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," - + "\"conditionsTask\":false," - + "\"depList\":[]," - + "\"dependence\":\"{}\"," - + "\"forbidden\":false," - + "\"id\":\"tasks-55201\"," - + "\"maxRetryTimes\":0," - + "\"name\":\"测试任务\"," - + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," - + "\"preTasks\":\"[]\"," - + "\"retryInterval\":1," - + "\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," - + "\"timeout\":\"{\\\"enable\\\":false," - + "\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"SHELL\"," - + "\"workerGroup\":\"NoWorkGroup\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("NoWorkGroup"); taskInstance.setExecutorId(2); @@ -424,24 +324,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," - + "\"conditionsTask\":false," - + "\"depList\":[]," - + "\"dependence\":\"{}\"," - + "\"forbidden\":false," - + "\"id\":\"tasks-55201\"," - + "\"maxRetryTimes\":0," - + "\"name\":\"测试任务\"," - + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," - + "\"preTasks\":\"[]\"," - + "\"retryInterval\":1," - + "\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," - + "\"timeout\":\"{\\\"enable\\\":false," - + "\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"SHELL\"," - + "\"workerGroup\":\"NoWorkGroup\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("NoWorkGroup"); taskInstance.setExecutorId(2); @@ -477,24 +359,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," - + "\"conditionsTask\":false," - + "\"depList\":[]," - + "\"dependence\":\"{}\"," - + "\"forbidden\":false," - + "\"id\":\"tasks-55201\"," - + "\"maxRetryTimes\":0," - + "\"name\":\"测试任务\"," - + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," - + "\"preTasks\":\"[]\"," - + "\"retryInterval\":1," - + "\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," - + "\"timeout\":\"{\\\"enable\\\":false," - + "\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"SHELL\"," - + "\"workerGroup\":\"NoWorkGroup\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("NoWorkGroup"); taskInstance.setExecutorId(2); @@ -527,32 +391,12 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," - + "\"conditionsTask\":false," - + "\"depList\":[]," - + "\"dependence\":\"{}\"," - + "\"forbidden\":false," - + "\"id\":\"tasks-55201\"," - + "\"maxRetryTimes\":0," - + "\"name\":\"测试任务\"," - + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\"," - + "\"preTasks\":\"[]\"," - + "\"retryInterval\":1," - + "\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," - + "\"timeout\":\"{\\\"enable\\\":false," - + "\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"SHELL\"," - + "\"workerGroup\":\"NoWorkGroup\"}"); - taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("NoWorkGroup"); taskInstance.setExecutorId(2); // task node - TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class); - Map map = taskPriorityQueueConsumer.getResourceFullNames(taskNode); + Map map = taskPriorityQueueConsumer.getResourceFullNames(taskInstance); List resourcesList = new ArrayList(); Resource resource = new Resource(); @@ -598,15 +442,15 @@ public class TaskPriorityQueueConsumerTest { public void testSetDataxTaskRelation() throws Exception { DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); - TaskNode taskNode = new TaskNode(); - taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}"); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}"); DataSource dataSource = new DataSource(); dataSource.setId(1); dataSource.setConnectionParams(""); dataSource.setType(DbType.MYSQL); Mockito.doReturn(dataSource).when(processService).findDataSourceById(1); - taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode); + taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext, taskInstance); Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId()); Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId()); @@ -620,24 +464,6 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," - + "\"conditionsTask\":false," - + "\"depList\":[]," - + "\"dependence\":\"{}\"," - + "\"forbidden\":false," - + "\"id\":\"tasks-55201\"," - + "\"maxRetryTimes\":0," - + "\"name\":\"测试任务\"," - + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," - + "\"preTasks\":\"[]\"," - + "\"retryInterval\":1," - + "\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\"," - + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," - + "\"timeout\":\"{\\\"enable\\\":false," - + "\\\"strategy\\\":\\\"\\\"}\"," - + "\"type\":\"SHELL\"," - + "\"workerGroup\":\"NoWorkGroup\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("NoWorkGroup"); taskInstance.setExecutorId(2); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index 405ad435cc..9229e59d59 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -114,7 +114,6 @@ public class MasterTaskExecThreadTest { taskInstance.setTaskType("SHELL"); taskInstance.setId(252612); taskInstance.setName("C"); - taskInstance.setTaskJson("{}"); taskInstance.setProcessInstanceId(10111); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); return taskInstance; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index ac91e1d9b8..e224fa81c3 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -87,7 +87,6 @@ public class ShellCommandExecutorTest { TaskInstance taskInstance = processService.findTaskInstanceById(7657); - String taskJson = taskInstance.getTaskJson(); // TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class); // taskProps.setTaskParams(taskNode.getParams()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java index dbaa13215f..caf15510af 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java @@ -110,9 +110,7 @@ public class SqlExecutorTest { TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); - String taskJson = taskInstance.getTaskJson(); - TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); - taskProps.setTaskParams(taskNode.getParams()); + taskProps.setTaskParams(taskInstance.getTaskParams()); // custom logger diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index bccd926a34..2c6349c1dd 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -110,6 +110,8 @@ import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; +import org.apache.commons.collections.map.HashedMap; + import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -1160,9 +1162,12 @@ public class ProcessService { ProcessInstanceMap instanceMap, TaskInstance task) { CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); - TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); - Integer childDefineId = Integer.parseInt(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID))); + TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( + task.getTaskCode(), task.getTaskDefinitionVersion() + ); + Map subProcessParam = JSONUtils.toMap(taskDefinition.getTaskParams()); + Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)); + Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); Map globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams()); @@ -1663,8 +1668,7 @@ public class ProcessService { if (row == null || row.size() == 0) { return; } - TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class); - Map taskParams = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); + Map taskParams = JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class); Object localParams = taskParams.get(LOCAL_PARAMS); if (localParams == null) { return; @@ -1689,9 +1693,7 @@ public class ProcessService { } } taskParams.put(LOCAL_PARAMS, allParam); - taskNode.setParams(JSONUtils.toJsonString(taskParams)); - // task instance node json - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams)); String params4ProcessString = JSONUtils.toJsonString(params4Property); int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId()); logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId()); @@ -2392,7 +2394,7 @@ public class ProcessService { taskDefinitionMap.get(preTaskName).getVersion(), taskDefinitionMap.get(taskNode.getName()).getCode(), taskDefinitionMap.get(taskNode.getName()).getVersion(), - ConditionType.of("none"), + ConditionType.NONE, taskNode.getConditionResult(), now, now)); @@ -2541,6 +2543,77 @@ public class ProcessService { return new ArrayList<>(taskNodeMap.values()); } + /** + * getTaskNodeFromTaskInstance + * return null if task definition do not exists + * + * @param taskInstance + * @return + */ + public TaskNode getTaskNodeFromTaskInstance(TaskInstance taskInstance) { + TaskNode taskNode = new TaskNode(); + ProcessInstance processInstance = processInstanceMapper.selectById(taskInstance.getProcessInstanceId()); + TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( + taskInstance.getTaskCode(), + taskInstance.getTaskDefinitionVersion()); + if (taskDefinition == null) { + return null; + } + List taskRelationList = processTaskRelationLogMapper.queryByProcessCodeAndVersion( + taskInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion() + ); + Map taskCodeMap = new HashedMap(); + + taskRelationList.forEach(relation -> taskCodeMap.putIfAbsent(relation.getPostTaskCode(), relation.getPostTaskVersion())); + + taskNode.setCode(String.valueOf(taskDefinition.getCode())); + taskNode.setVersion(taskDefinition.getVersion()); + taskNode.setName(taskDefinition.getName()); + taskNode.setName(taskDefinition.getName()); + taskNode.setDesc(taskDefinition.getDescription()); + taskNode.setType(taskDefinition.getTaskType().getDescp()); + taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL); + taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes()); + taskNode.setRetryInterval(taskDefinition.getFailRetryInterval()); + taskNode.setParams(taskDefinition.getTaskParams()); + taskNode.setTaskInstancePriority(taskDefinition.getTaskPriority()); + taskNode.setWorkerGroup(taskDefinition.getWorkerGroup()); + return taskNode; + } + + /** + * find task definition by code and verision + * + * @param taskCode + * @param taskDefinitionVersion + * @return + */ + public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) { + return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion); + } + + /** + * query taks definition list by process code and process version + * + * @param processCode + * @param processVersion + * @return + */ + public List queryTaskDefinitionList(Long processCode, int processVersion) { + List processTaskRelationLogs = + processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion); + Map postTaskDefinitionMap = new HashedMap(); + processTaskRelationLogs.forEach(processTaskRelationLog -> { + Long code = processTaskRelationLog.getPostTaskCode(); + int version = processTaskRelationLog.getPostTaskVersion(); + if (postTaskDefinitionMap.containsKey(code)) { + TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version); + postTaskDefinitionMap.putIfAbsent(code, taskDefinition); + } + }); + return new ArrayList(postTaskDefinitionMap.values()); + } + /** * parse locations * @@ -2565,7 +2638,7 @@ public class ProcessService { * add authorized resources * * @param ownResources own resources - * @param userId userId + * @param userId userId */ private void addAuthorizedResources(List ownResources, int userId) { List relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 15933747ae..20ba75c375 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -114,7 +114,7 @@ public class ProcessServiceTest { parentInstance.setWarningGroupId(0); TaskInstance task = new TaskInstance(); - task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}"); + task.setTaskParams("{\"processDefinitionId\":100}}"); task.setId(10); ProcessInstance childInstance = null; diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index e081590207..ded1038ad5 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -830,6 +830,7 @@ CREATE TABLE `t_ds_task_instance` ( `retry_times` int(4) DEFAULT '0' COMMENT 'task retry times', `pid` int(4) DEFAULT NULL COMMENT 'pid of task', `app_link` text COMMENT 'yarn app id', + `task_params` text COMMENT 'job custom parameters', `flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available', `retry_interval` int(4) DEFAULT NULL COMMENT 'retry interval when task failed ', `max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times', diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql index 2b381a48b0..5d0c214d85 100644 --- a/sql/dolphinscheduler_postgre.sql +++ b/sql/dolphinscheduler_postgre.sql @@ -689,6 +689,7 @@ CREATE TABLE t_ds_task_instance ( retry_times int DEFAULT '0' , pid int DEFAULT NULL , app_link text , + task_params text , flag int DEFAULT '1' , retry_interval int DEFAULT NULL , max_retry_times int DEFAULT NULL ,