Browse Source

[JsonSplit-4417][master/worker/api]json split- remove json in task instance. (#5160)

* [json split] refactor json in task instance

* add task_pramas in task_instance

* feature #4417 refactor json split in task instance

* code style

* code style

* code style

* code style

* code style

* update

* update

* update unit test

* update
pull/3/MERGE
bao liang 4 years ago committed by GitHub
parent
commit
86ce8f133a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 33
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  3. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java
  4. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  5. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
  6. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  7. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
  8. 37
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
  9. 1
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  10. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  11. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
  12. 41
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  13. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  14. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  15. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  16. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  17. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  18. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  19. 22
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  20. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  21. 182
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  22. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
  23. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
  24. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
  25. 93
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  26. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  27. 1
      sql/dolphinscheduler_mysql.sql
  28. 1
      sql/dolphinscheduler_postgre.sql

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -57,6 +57,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
@ -70,6 +72,8 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections.map.HashedMap;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -1249,6 +1253,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* List of process instances
*/
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(processDefinition.getCode(), limit);
List<TaskDefinitionLog> taskDefinitionList = processService.queryTaskDefinitionList(processDefinition.getCode(),
processDefinition.getVersion());
Map<Long, TaskDefinition> taskDefinitionMap = new HashedMap();
taskDefinitionList.forEach(taskDefinitionLog -> taskDefinitionMap.put(taskDefinitionLog.getCode(), taskDefinitionLog));
for (ProcessInstance processInstance : processInstanceList) {
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()));
@ -1305,11 +1313,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* if process is sub process, the return sub id, or sub id=0
*/
if (taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) {
String taskJson = taskInstance.getTaskJson();
taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
if (taskInstance.isSubProcess()) {
TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode());
subProcessId = Integer.parseInt(JSONUtils.parseObject(
taskNode.getParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
}
treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getState().toString()
, taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId));

33
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -837,38 +837,7 @@ public class ProcessDefinitionServiceTest {
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setHost("192.168.xx.xx");
taskInstance.setTaskJson("{\n"
+ " \"conditionResult\": {\n"
+ " \"failedNode\": [\n"
+ " \"\"\n"
+ " ],\n"
+ " \"successNode\": [\n"
+ " \"\"\n"
+ " ]\n"
+ " },\n"
+ " \"delayTime\": \"0\",\n"
+ " \"dependence\": {},\n"
+ " \"description\": \"\",\n"
+ " \"id\": \"1\",\n"
+ " \"maxRetryTimes\": \"0\",\n"
+ " \"name\": \"test_task_instance\",\n"
+ " \"params\": {\n"
+ " \"processDefinitionId\": \"222\",\n"
+ " \"resourceList\": []\n"
+ " },\n"
+ " \"preTasks\": [],\n"
+ " \"retryInterval\": \"1\",\n"
+ " \"runFlag\": \"NORMAL\",\n"
+ " \"taskInstancePriority\": \"MEDIUM\",\n"
+ " \"timeout\": {\n"
+ " \"enable\": false,\n"
+ " \"interval\": null,\n"
+ " \"strategy\": \"\"\n"
+ " },\n"
+ " \"type\": \"SUB_PROCESS\",\n"
+ " \"workerGroup\": \"default\"\n"
+ "}");
//task instance exist
taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList);

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskTimeoutStrategy.java

@ -57,4 +57,5 @@ public enum TaskTimeoutStrategy {
}
throw new IllegalArgumentException("invalid status : " + status);
}
}

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
@ -75,7 +76,7 @@ public class TaskInstance implements Serializable {
private long taskCode;
/**
* process definition code
* process definition code
*/
private long processDefinitionCode;
@ -184,7 +185,7 @@ public class TaskInstance implements Serializable {
* dependency
*/
@TableField(exist = false)
private String dependency;
private DependentParameters dependency;
/**
* duration
@ -235,7 +236,7 @@ public class TaskInstance implements Serializable {
* varPool string
*/
private String varPool;
/**
* executor name
*/
@ -251,6 +252,11 @@ public class TaskInstance implements Serializable {
*/
private int delayTime;
/**
* task params
*/
private String taskParams;
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
@ -264,7 +270,7 @@ public class TaskInstance implements Serializable {
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public ProcessInstance getProcessInstance() {
return processInstance;
}
@ -429,15 +435,14 @@ public class TaskInstance implements Serializable {
this.appLink = appLink;
}
public String getDependency() {
if (this.dependency != null) {
return this.dependency;
public DependentParameters getDependency() {
if (this.dependency == null) {
this.dependency = JSONUtils.parseObject(this.getTaskParams(), DependentParameters.class);
}
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
return taskNode == null ? null : taskNode.getDependence();
return this.dependency;
}
public void setDependency(String dependency) {
public void setDependency(DependentParameters dependency) {
this.dependency = dependency;
}
@ -644,4 +649,12 @@ public class TaskInstance implements Serializable {
public void setTaskDefinitionVersion(int taskDefinitionVersion) {
this.taskDefinitionVersion = taskDefinitionVersion;
}
public String getTaskParams() {
return taskParams;
}
public void setTaskParams(String taskParams) {
this.taskParams = taskParams;
}
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java

@ -39,4 +39,9 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper<ProcessTaskRela
*/
List<ProcessTaskRelationLog> queryByProcessCodeAndVersion(@Param("processCode") long processCode,
@Param("processVersion") int processVersion);
List<ProcessTaskRelationLog> queryByTaskRelationList(@Param("processCode") long processCode,
@Param("processVersion") int processVersion,
@Param("taskCode") long taskCode,
@Param("taskVersion") long taskVersion);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -59,11 +59,12 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("taskDefinitionCode") long taskDefinitionCode,
@Param("version") int version);
/**
* query task definition log
*
* @param taskDefinitions taskDefinition collection
* @return task definition log
* @param taskDefinitions
* @return
*/
List<TaskDefinitionLog> queryByTaskDefinitions(@Param("taskDefinitions") Collection<TaskDefinition> taskDefinitions);
}

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml

@ -29,4 +29,14 @@
WHERE process_definition_code = #{processCode}
and process_definition_version = #{processVersion}
</select>
<select id="queryByTaskRelationList"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
select
<include refid="baseSql"/>
from t_ds_process_task_relation_log
WHERE process_definition_code = #{processCode}
and process_definition_version = #{processVersion}
and post_task_code = #{taskCode}
and post_task_version = #{taskVersion}
</select>
</mapper>

37
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java

@ -16,9 +16,16 @@
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
@ -57,25 +64,33 @@ public class TaskInstanceTest {
TaskNode taskNode;
taskInstance = new TaskInstance();
taskInstance.setTaskJson(null);
Assert.assertNull(taskInstance.getDependency());
taskInstance = new TaskInstance();
taskNode = new TaskNode();
taskNode.setDependence(null);
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
Assert.assertNull(taskInstance.getDependency());
taskInstance = new TaskInstance();
taskNode = new TaskNode();
// expect a JSON here, and will be unwrap when toJsonString
taskNode.setDependence("\"A\"");
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
Assert.assertEquals("A", taskInstance.getDependency());
taskInstance.setTaskParams(JSONUtils.toJsonString(getDependentParameters()));
taskInstance.getDependency();
}
taskInstance = new TaskInstance();
taskInstance.setTaskJson(null);
taskInstance.setDependency("{}");
Assert.assertEquals("{}", taskInstance.getDependency());
/**
*
* @return
*/
private DependentParameters getDependentParameters() {
DependentParameters dependentParameters = new DependentParameters();
List<DependentTaskModel> dependTaskList = new ArrayList<>();
List<DependentItem> dependentItems = new ArrayList<>();
DependentItem dependentItem = new DependentItem();
dependentItem.setDepTasks("A");
dependentItem.setDefinitionCode(222L);
dependentItem.setCycle("today");
dependentItems.add(dependentItem);
dependentParameters.setDependTaskList(dependTaskList);
dependentParameters.setRelation(DependentRelation.AND);
return dependentParameters;
}
}

1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -107,7 +107,6 @@ public class TaskInstanceMapperTest {
taskInstance.setState(state);
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskJson("{}");
taskInstance.setProcessInstanceId(processInstanceId);
taskInstance.setTaskType(taskType);
taskInstance.setProcessDefinitionCode(1L);

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.builder;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.entity.*;
@ -44,7 +46,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setResources(taskInstance.getResources());
@ -52,6 +53,16 @@ public class TaskExecutionContextBuilder {
return this;
}
public TaskExecutionContextBuilder buildTaskDefinitionRelatedInfo(TaskDefinition taskDefinition) {
int timeoutSeconds = taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT;
if (timeoutSeconds >= Integer.MAX_VALUE) {
timeoutSeconds = Integer.MAX_VALUE;
}
taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy().getCode());
taskExecutionContext.setTaskTimeout(timeoutSeconds);
return this;
}
/**
* build processInstance related info

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java vendored

@ -76,7 +76,6 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
taskInstance.setStartTime(taskExecutionContext.getStartTime());
taskInstance.setTaskType(taskInstance.getTaskType());
taskInstance.setExecutePath(taskInstance.getExecutePath());
taskInstance.setTaskJson(taskInstance.getTaskJson());
taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), taskInstance);
}

41
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -145,7 +145,6 @@ public class TaskPriorityQueueConsumer extends Thread {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
} catch (Exception e) {
logger.error("dispatcher task error", e);
}
@ -201,8 +200,6 @@ public class TaskPriorityQueueConsumer extends Thread {
// task type
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
// task node
TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
@ -221,7 +218,7 @@ public class TaskPriorityQueueConsumer extends Thread {
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setResources(getResourceFullNames(taskNode));
taskInstance.setResources(getResourceFullNames(taskInstance));
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
@ -230,21 +227,21 @@ public class TaskPriorityQueueConsumer extends Thread {
// SQL task
if (taskType == TaskType.SQL) {
setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
}
// DATAX task
if (taskType == TaskType.DATAX) {
setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
}
// procedure task
if (taskType == TaskType.PROCEDURE) {
setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
setProcedureTaskRelation(procedureTaskExecutionContext, taskInstance);
}
if (taskType == TaskType.SQOOP) {
setSqoopTaskRelation(sqoopTaskExecutionContext, taskNode);
setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
}
return TaskExecutionContextBuilder.get()
@ -262,10 +259,10 @@ public class TaskPriorityQueueConsumer extends Thread {
* set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext
* @param taskNode taskNode
* @param taskInstance taskInstance
*/
private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) {
ProcedureParameters procedureParameters = JSONUtils.parseObject(taskNode.getParams(), ProcedureParameters.class);
private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) {
ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
int datasourceId = procedureParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
@ -275,10 +272,10 @@ public class TaskPriorityQueueConsumer extends Thread {
* set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskNode taskNode
* @param taskInstance taskInstance
*/
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
DataSource dbSource = processService.findDataSourceById(dataxParameters.getDataSource());
DataSource dbTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
@ -300,10 +297,10 @@ public class TaskPriorityQueueConsumer extends Thread {
* set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
* @param taskNode taskNode
* @param taskInstance taskInstance
*/
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
SqoopParameters sqoopParameters = JSONUtils.parseObject(taskNode.getParams(), SqoopParameters.class);
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) {
SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
// sqoop job type is template set task relation
if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
@ -331,10 +328,10 @@ public class TaskPriorityQueueConsumer extends Thread {
* set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
* @param taskNode taskNode
* @param taskInstance taskInstance
*/
private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) {
SqlParameters sqlParameters = JSONUtils.parseObject(taskNode.getParams(), SqlParameters.class);
private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) {
SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
int datasourceId = sqlParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
@ -381,9 +378,9 @@ public class TaskPriorityQueueConsumer extends Thread {
/**
* get resource map key is full name and value is tenantCode
*/
protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
protected Map<String, String> getResourceFullNames(TaskInstance taskInstance) {
Map<String, String> resourcesMap = new HashMap<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskInstance.getTaskType(), taskInstance.getTaskParams());
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -130,7 +130,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class);
this.dependentParameters = taskInstance.getDependency();
}
/**

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -102,8 +102,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
*/
private void initDependParameters() {
this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(),
DependentParameters.class);
this.dependentParameters = taskInstance.getDependency();
for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){
this.dependentTaskList.add(new DependentExecute(

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -19,12 +19,13 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -121,10 +122,11 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* init task timeout parameters
*/
private void initTimeoutParams() {
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
TaskDefinition taskDefinition = processService.findTaskDefinition(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
boolean timeoutEnable = taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN;
taskTimeoutParameter = new TaskTimeoutParameter(timeoutEnable,
taskDefinition.getTimeoutNotifyStrategy(),
taskDefinition.getTimeout());
if (taskTimeoutParameter.getEnable()) {
checkTimeoutFlag = true;
}

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -380,7 +380,8 @@ public class MasterExecThread implements Runnable {
*/
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList = processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>());
List<TaskNode> taskNodeList =
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>());
forbiddenTaskList.clear();
taskNodeList.stream().forEach(taskNode -> {
if (taskNode.isForbidden()) {
@ -485,8 +486,6 @@ public class MasterExecThread implements Runnable {
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
// process instance id
taskInstance.setProcessInstanceId(processInstance.getId());
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
// task instance type
taskInstance.setTaskType(taskNode.getType());
// task instance whether alert
@ -556,9 +555,7 @@ public class MasterExecThread implements Runnable {
}
}
result.put(LOCAL_PARAMS, allParam);
taskNode.setParams(JSONUtils.toJsonString(result));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
taskInstance.setTaskParams(JSONUtils.toJsonString(result));
}
}

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -132,8 +132,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
return;
}
// task node
TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
@ -148,13 +146,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.getResources(),
logger);
taskExecutionContext.setTaskParams(taskNode.getParams());
taskExecutionContext.setTaskParams(taskExecutionContext.getTaskParams());
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
// set task timeout
setTaskTimeout(taskExecutionContext, taskNode);
taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
@ -238,38 +233,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
return globalParamsMap;
}
/**
* set task timeout
* @param taskExecutionContext TaskExecutionContext
* @param taskNode
*/
private void setTaskTimeout(TaskExecutionContext taskExecutionContext, TaskNode taskNode) {
// the default timeout is the maximum value of the integer
taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
if (taskTimeoutParameter.getEnable()) {
// get timeout strategy
taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
switch (taskTimeoutParameter.getStrategy()) {
case WARN:
break;
case FAILED:
if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) {
taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
}
break;
case WARNFAILED:
if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) {
taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
}
break;
default:
logger.error("not support task timeout strategy: {}", taskTimeoutParameter.getStrategy());
throw new IllegalArgumentException("not support task timeout strategy");
}
}
}
/**
* kill task

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -171,7 +171,6 @@ public class ConditionsTaskTest {
private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance processInstance) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType());
taskInstance.setProcessInstanceId(processInstance.getId());

22
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -60,6 +61,10 @@ public class DependentTaskTest {
*/
public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
public static final Long TASK_CODE = 1111L;
public static final int TASK_VERSION = 1;
private ProcessService processService;
/**
@ -113,6 +118,9 @@ public class DependentTaskTest {
Mockito.when(processService
.findTaskInstanceById(1000))
.thenAnswer(i -> taskInstance);
Mockito.when(processService.findTaskDefinition(TASK_CODE, TASK_VERSION))
.thenReturn(getTaskDefinition());
}
private void testBasicInit() {
@ -359,14 +367,25 @@ public class DependentTaskTest {
return taskNode;
}
private TaskDefinition getTaskDefinition() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setCode(TASK_CODE);
taskDefinition.setVersion(TASK_VERSION);
taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
taskDefinition.setTimeout(0);
return taskDefinition;
}
private void setupTaskInstance(TaskNode taskNode) {
taskInstance = new TaskInstance();
taskInstance.setId(1000);
taskInstance.setTaskCode(TASK_CODE);
taskInstance.setTaskDefinitionVersion(TASK_VERSION);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
taskInstance.setTaskType(taskNode.getType());
taskInstance.setDependency(JSONUtils.parseObject(taskNode.getDependence(), DependentParameters.class));
taskInstance.setName(taskNode.getName());
}
@ -405,7 +424,6 @@ public class DependentTaskTest {
taskInstance.setName(taskName);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setTaskJson("{}");
taskInstance.setState(state);
return taskInstance;
}

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -148,7 +148,8 @@ public class SubProcessTaskTest {
private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance processInstance) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
taskInstance.setName("S");
taskInstance.setTaskType(TaskType.SUB_PROCESS.toString());
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType());
taskInstance.setProcessInstanceId(processInstance.getId());

182
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -110,25 +110,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,"
+ "\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@ -161,13 +142,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\","
+ "\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\","
+ "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],"
+ "\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\","
+ "\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@ -212,26 +186,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ "\"forbidden\":false,\"id\":\"tasks-97625\","
+ "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\","
+ "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\","
+ " \\\"postStatements\\\":[],"
+ " \\\"jobSpeedRecord\\\":1000,"
+ " \\\"customConfig\\\":0,"
+ " \\\"dtType\\\":\\\"MYSQL\\\","
+ " \\\"dsType\\\":\\\"MYSQL\\\","
+ " \\\"jobSpeedByte\\\":0,"
+ " \\\"dataSource\\\":80,"
+ " \\\"dataTarget\\\":80,"
+ " \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\","
+ " \\\"preStatements\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"DATAX\","
+ "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@ -274,32 +228,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ "\"forbidden\":false,\"id\":\"tasks-63634\","
+ "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\","
+ "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\","
+ " \\\"targetType\\\":\\\"HDFS\\\","
+ " \\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\","
+ " \\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\","
+ " \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\","
+ " \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\","
+ " \\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\","
+ " \\\"modelType\\\":\\\"import\\\","
+ " \\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\","
+ " \\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\","
+ " \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\","
+ " \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\","
+ " \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\","
+ " \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\","
+ " \\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\","
+ " \\\"localParams\\\":[],\\\"concurrency\\\":1}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SQOOP\","
+ "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@ -342,16 +270,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ "\"forbidden\":false,\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\","
+ "\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@ -370,24 +288,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
@ -424,24 +324,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
@ -477,24 +359,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
@ -527,32 +391,12 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
// task node
TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
Map<String, String> map = taskPriorityQueueConsumer.getResourceFullNames(taskNode);
Map<String, String> map = taskPriorityQueueConsumer.getResourceFullNames(taskInstance);
List<Resource> resourcesList = new ArrayList<Resource>();
Resource resource = new Resource();
@ -598,15 +442,15 @@ public class TaskPriorityQueueConsumerTest {
public void testSetDataxTaskRelation() throws Exception {
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
TaskNode taskNode = new TaskNode();
taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}");
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSource.setConnectionParams("");
dataSource.setType(DbType.MYSQL);
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode);
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
@ -620,24 +464,6 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@ -114,7 +114,6 @@ public class MasterTaskExecThreadTest {
taskInstance.setTaskType("SHELL");
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setTaskJson("{}");
taskInstance.setProcessInstanceId(10111);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
return taskInstance;

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java

@ -87,7 +87,6 @@ public class ShellCommandExecutorTest {
TaskInstance taskInstance = processService.findTaskInstanceById(7657);
String taskJson = taskInstance.getTaskJson();
// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
// taskProps.setTaskParams(taskNode.getParams());

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java

@ -110,9 +110,7 @@ public class SqlExecutorTest {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
taskProps.setTaskParams(taskNode.getParams());
taskProps.setTaskParams(taskInstance.getTaskParams());
// custom logger

93
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -110,6 +110,8 @@ import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.commons.collections.map.HashedMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@ -1160,9 +1162,12 @@ public class ProcessService {
ProcessInstanceMap instanceMap,
TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, Object> subProcessParam = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Integer childDefineId = Integer.parseInt(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)));
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
task.getTaskCode(), task.getTaskDefinitionVersion()
);
Map<String, String> subProcessParam = JSONUtils.toMap(taskDefinition.getTaskParams());
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID));
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams());
@ -1663,8 +1668,7 @@ public class ProcessService {
if (row == null || row.size() == 0) {
return;
}
TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Map<String, Object> taskParams = JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class);
Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams == null) {
return;
@ -1689,9 +1693,7 @@ public class ProcessService {
}
}
taskParams.put(LOCAL_PARAMS, allParam);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
String params4ProcessString = JSONUtils.toJsonString(params4Property);
int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId());
logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId());
@ -2392,7 +2394,7 @@ public class ProcessService {
taskDefinitionMap.get(preTaskName).getVersion(),
taskDefinitionMap.get(taskNode.getName()).getCode(),
taskDefinitionMap.get(taskNode.getName()).getVersion(),
ConditionType.of("none"),
ConditionType.NONE,
taskNode.getConditionResult(),
now,
now));
@ -2541,6 +2543,77 @@ public class ProcessService {
return new ArrayList<>(taskNodeMap.values());
}
/**
* getTaskNodeFromTaskInstance
* return null if task definition do not exists
*
* @param taskInstance
* @return
*/
public TaskNode getTaskNodeFromTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode = new TaskNode();
ProcessInstance processInstance = processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
if (taskDefinition == null) {
return null;
}
List<ProcessTaskRelationLog> taskRelationList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
taskInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()
);
Map<Long, Integer> taskCodeMap = new HashedMap();
taskRelationList.forEach(relation -> taskCodeMap.putIfAbsent(relation.getPostTaskCode(), relation.getPostTaskVersion()));
taskNode.setCode(String.valueOf(taskDefinition.getCode()));
taskNode.setVersion(taskDefinition.getVersion());
taskNode.setName(taskDefinition.getName());
taskNode.setName(taskDefinition.getName());
taskNode.setDesc(taskDefinition.getDescription());
taskNode.setType(taskDefinition.getTaskType().getDescp());
taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL);
taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
taskNode.setRetryInterval(taskDefinition.getFailRetryInterval());
taskNode.setParams(taskDefinition.getTaskParams());
taskNode.setTaskInstancePriority(taskDefinition.getTaskPriority());
taskNode.setWorkerGroup(taskDefinition.getWorkerGroup());
return taskNode;
}
/**
* find task definition by code and verision
*
* @param taskCode
* @param taskDefinitionVersion
* @return
*/
public TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion) {
return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
}
/**
* query taks definition list by process code and process version
*
* @param processCode
* @param processVersion
* @return
*/
public List<TaskDefinitionLog> queryTaskDefinitionList(Long processCode, int processVersion) {
List<ProcessTaskRelationLog> processTaskRelationLogs =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
Map<Long, TaskDefinition> postTaskDefinitionMap = new HashedMap();
processTaskRelationLogs.forEach(processTaskRelationLog -> {
Long code = processTaskRelationLog.getPostTaskCode();
int version = processTaskRelationLog.getPostTaskVersion();
if (postTaskDefinitionMap.containsKey(code)) {
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
postTaskDefinitionMap.putIfAbsent(code, taskDefinition);
}
});
return new ArrayList(postTaskDefinitionMap.values());
}
/**
* parse locations
*
@ -2565,7 +2638,7 @@ public class ProcessService {
* add authorized resources
*
* @param ownResources own resources
* @param userId userId
* @param userId userId
*/
private void addAuthorizedResources(List<Resource> ownResources, int userId) {
List<Integer> relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -114,7 +114,7 @@ public class ProcessServiceTest {
parentInstance.setWarningGroupId(0);
TaskInstance task = new TaskInstance();
task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}");
task.setTaskParams("{\"processDefinitionId\":100}}");
task.setId(10);
ProcessInstance childInstance = null;

1
sql/dolphinscheduler_mysql.sql

@ -830,6 +830,7 @@ CREATE TABLE `t_ds_task_instance` (
`retry_times` int(4) DEFAULT '0' COMMENT 'task retry times',
`pid` int(4) DEFAULT NULL COMMENT 'pid of task',
`app_link` text COMMENT 'yarn app id',
`task_params` text COMMENT 'job custom parameters',
`flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
`retry_interval` int(4) DEFAULT NULL COMMENT 'retry interval when task failed ',
`max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times',

1
sql/dolphinscheduler_postgre.sql

@ -689,6 +689,7 @@ CREATE TABLE t_ds_task_instance (
retry_times int DEFAULT '0' ,
pid int DEFAULT NULL ,
app_link text ,
task_params text ,
flag int DEFAULT '1' ,
retry_interval int DEFAULT NULL ,
max_retry_times int DEFAULT NULL ,

Loading…
Cancel
Save