Browse Source

[Feature][JsonSplit] Fix taskInstance (#5246)

* Fix dependency failure

* Fix taskInstance

* task instance list page

* code review

* fix logger path

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
31188ffca0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
  2. 2
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java
  3. 47
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  4. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  5. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  6. 6
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  7. 61
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  8. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  9. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  10. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  11. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  12. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  13. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
  14. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  15. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  16. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  17. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  18. 11
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  19. 15
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
  20. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
  21. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
  22. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
  23. 25
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
  24. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
  25. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  26. 3
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  27. 2
      sql/dolphinscheduler_mysql.sql
  28. 1
      sql/dolphinscheduler_postgre.sql

7
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java

@ -67,20 +67,15 @@ public class LoggerUtils {
* build job id
*
* @param affix Task Logger's prefix
* @param processDefId process define id
* @param processInstId process instance id
* @param taskId task id
* @return task id format
*/
public static String buildTaskId(String affix,
int processDefId,
int processInstId,
int taskId) {
// - [taskAppId=TASK_79_4084_15210]
return String.format(" - %s%s-%s-%s-%s]", TASK_APPID_LOG_FORMAT, affix,
processDefId,
processInstId,
taskId);
return String.format(" - %s%s-%s-%s]", TASK_APPID_LOG_FORMAT, affix, processInstId, taskId);
}
/**

2
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java

@ -42,7 +42,7 @@ public class LoggerUtilsTest {
@Test
public void buildTaskId() {
String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, 79, 4084, 15210);
String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, 4084, 15210);
Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId);
}

47
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -58,13 +57,6 @@ public class TaskInstance implements Serializable {
*/
private String taskType;
/**
* process definition id
* TODO delete
*/
@TableField(exist = false)
private int processDefinitionId;
/**
* process instance id
*/
@ -75,11 +67,6 @@ public class TaskInstance implements Serializable {
*/
private long taskCode;
/**
* process definition code
*/
private long processDefinitionCode;
/**
* task defintion version
*/
@ -91,13 +78,6 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private String processInstanceName;
/**
* task json
* TODO delete
*/
@TableField(exist = false)
private String taskJson;
/**
* state
*/
@ -255,6 +235,7 @@ public class TaskInstance implements Serializable {
/**
* task params
*/
@TableField(exist = false)
private String taskParams;
public void init(String host, Date startTime, String executePath) {
@ -311,14 +292,6 @@ public class TaskInstance implements Serializable {
this.taskType = taskType;
}
public int getProcessDefinitionId() {
return processDefinitionId;
}
public void setProcessDefinitionId(int processDefinitionId) {
this.processDefinitionId = processDefinitionId;
}
public int getProcessInstanceId() {
return processInstanceId;
}
@ -327,14 +300,6 @@ public class TaskInstance implements Serializable {
this.processInstanceId = processInstanceId;
}
public String getTaskJson() {
return taskJson;
}
public void setTaskJson(String taskJson) {
this.taskJson = taskJson;
}
public ExecutionStatus getState() {
return state;
}
@ -593,10 +558,8 @@ public class TaskInstance implements Serializable {
+ "id=" + id
+ ", name='" + name + '\''
+ ", taskType='" + taskType + '\''
+ ", processDefinitionId=" + processDefinitionId
+ ", processInstanceId=" + processInstanceId
+ ", processInstanceName='" + processInstanceName + '\''
+ ", taskJson='" + taskJson + '\''
+ ", state=" + state
+ ", firstSubmitTime=" + firstSubmitTime
+ ", submitTime=" + submitTime
@ -634,14 +597,6 @@ public class TaskInstance implements Serializable {
this.taskCode = taskCode;
}
public long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public int getTaskDefinitionVersion() {
return taskDefinitionVersion;
}

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -51,12 +51,10 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId,
@Param("name") String name);
Integer countTask(
@Param("projectCodes") Long[] projectCodes,
Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);
List<ExecuteStatusCount> countTaskInstanceStateByUser(
@Param("startTime") Date startTime,
List<ExecuteStatusCount> countTaskInstanceStateByUser(@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("projectCodes") Long[] projectCodes);

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -19,13 +19,13 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper">
<sql id="baseSql">
id, name, task_type, process_instance_id, task_code, task_definition_version, process_definition_code, state, submit_time,
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group, executor_id,
first_submit_time, delay_time, var_pool
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_definition_code, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.var_pool
@ -72,7 +72,7 @@
<select id="countTaskInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select state, count(0) as count
from t_ds_task_instance t
left join t_ds_process_definition d on d.code=t.process_definition_code
left join t_ds_task_definition_log d on d.code=t.task_code and d.version=t.task_definition_version
left join t_ds_project p on p.code=d.project_code
where 1=1
<if test="projectCodes != null and projectCodes.length != 0">
@ -97,8 +97,9 @@
</select>
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_process_definition process
where task.process_definition_code=process.code
from t_ds_task_instance task,t_ds_task_definition_log define
where task.task_code=define.code
and task.task_definition_version=define.version
<if test="projectCodes != null and projectCodes.length != 0">
and process.project_code in
<foreach collection="projectCodes" index="index" item="i" open="(" separator="," close=")">
@ -120,7 +121,7 @@
,
process.name as process_instance_name
from t_ds_task_instance instance
left join t_ds_process_definition define on instance.process_definition_code = define.code
left join t_ds_task_definition_log define on define.code=instance.task_code and define.version=instance.task_definition_version
left join t_ds_process_instance process on process.id=instance.process_instance_id
where define.project_code = #{projectCode}
<if test="startTime != null">

6
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -109,8 +109,6 @@ public class TaskInstanceMapperTest {
taskInstance.setEndTime(new Date());
taskInstance.setProcessInstanceId(processInstanceId);
taskInstance.setTaskType(taskType);
taskInstance.setProcessDefinitionCode(1L);
// taskInstance.setProcessDefinitionId(processDefinitionId);
taskInstanceMapper.insert(taskInstance);
return taskInstance;
}
@ -293,7 +291,7 @@ public class TaskInstanceMapperTest {
definition.setCreateTime(new Date());
definition.setUpdateTime(new Date());
processDefinitionMapper.insert(definition);
task.setProcessDefinitionId(definition.getId());
//task.setProcessDefinitionId(definition.getId());
taskInstanceMapper.updateById(task);
int countTask = taskInstanceMapper.countTask(
@ -330,7 +328,7 @@ public class TaskInstanceMapperTest {
definition.setCreateTime(new Date());
definition.setUpdateTime(new Date());
processDefinitionMapper.insert(definition);
task.setProcessDefinitionId(definition.getId());
//task.setProcessDefinitionId(definition.getId());
taskInstanceMapper.updateById(task);

61
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -85,6 +85,16 @@ public class TaskExecutionContext implements Serializable {
*/
private int processId;
/**
* processCode
*/
private Long processDefineCode;
/**
* processVersion
*/
private int processDefineVersion;
/**
* appIds
*/
@ -130,7 +140,6 @@ public class TaskExecutionContext implements Serializable {
*/
private String queue;
/**
* process define id
*/
@ -296,6 +305,22 @@ public class TaskExecutionContext implements Serializable {
this.processId = processId;
}
public Long getProcessDefineCode() {
return processDefineCode;
}
public void setProcessDefineCode(Long processDefineCode) {
this.processDefineCode = processDefineCode;
}
public int getProcessDefineVersion() {
return processDefineVersion;
}
public void setProcessDefineVersion(int processDefineVersion) {
this.processDefineVersion = processDefineVersion;
}
public String getAppIds() {
return appIds;
}
@ -505,22 +530,24 @@ public class TaskExecutionContext implements Serializable {
@Override
public String toString() {
return "TaskExecutionContext{"
+ "taskInstanceId=" + taskInstanceId
+ ", taskName='" + taskName + '\''
+ ", currentExecutionStatus=" + currentExecutionStatus
+ ", firstSubmitTime=" + firstSubmitTime
+ ", startTime=" + startTime
+ ", taskType='" + taskType + '\''
+ ", host='" + host + '\''
+ ", executePath='" + executePath + '\''
+ ", logPath='" + logPath + '\''
+ ", taskJson='" + taskJson + '\''
+ ", processId=" + processId
+ ", appIds='" + appIds + '\''
+ ", processInstanceId=" + processInstanceId
+ ", scheduleTime=" + scheduleTime
+ ", globalParams='" + globalParams + '\''
+ ", executorId=" + executorId
+ "taskInstanceId=" + taskInstanceId
+ ", taskName='" + taskName + '\''
+ ", currentExecutionStatus=" + currentExecutionStatus
+ ", firstSubmitTime=" + firstSubmitTime
+ ", startTime=" + startTime
+ ", taskType='" + taskType + '\''
+ ", host='" + host + '\''
+ ", executePath='" + executePath + '\''
+ ", logPath='" + logPath + '\''
+ ", taskJson='" + taskJson + '\''
+ ", processId=" + processId
+ ", processDefineCode=" + processDefineCode
+ ", processDefineVersion=" + processDefineVersion
+ ", appIds='" + appIds + '\''
+ ", processInstanceId=" + processInstanceId
+ ", scheduleTime=" + scheduleTime
+ ", globalParams='" + globalParams + '\''
+ ", executorId=" + executorId
+ ", cmdTypeIfComplement=" + cmdTypeIfComplement
+ ", tenantCode='" + tenantCode + '\''
+ ", queue='" + queue + '\''

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -201,7 +201,7 @@ public class TaskPriorityQueueConsumer extends Thread {
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
@ -396,7 +396,7 @@ public class TaskPriorityQueueConsumer extends Thread {
}
// get the resource id in order to get the resource names in batch
Stream<Integer> resourceIdStream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId());
Stream<Integer> resourceIdStream = projectResourceFiles.stream().map(ResourceInfo::getId);
Set<Integer> resourceIdsSet = resourceIdStream.collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(resourceIdsSet)) {

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -23,20 +23,19 @@ import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
/**
@ -69,7 +68,6 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
try{
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
@ -124,12 +122,14 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
}
private void initTaskParameters() {
this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
}

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -82,7 +82,6 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
logger.info("dependent task start");
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
@ -183,7 +182,10 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
}
private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -267,10 +267,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* call
*
* @return boolean
* @throws Exception exception
*/
@Override
public Boolean call() throws Exception {
public Boolean call() {
this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
return submitWaitComplete();
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -482,8 +482,6 @@ public class MasterExecThread implements Runnable {
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
// task name
taskInstance.setName(nodeName);
// process instance define id
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
// task instance state
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
// process instance id

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
@ -40,8 +39,7 @@ public class LogUtils {
/**
* get task log path
*/
@SuppressWarnings("unchecked")
private static String getTaskLogPath(int processDefinitionId, int processInstanceId, int taskInstanceId) {
public static String getTaskLogPath(Long processDefineCode, int processDefineVersion, int processInstanceId, int taskInstanceId) {
// Optional.map will be skipped if null
return Optional.of(LoggerFactory.getILoggerFactory())
.map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
@ -50,25 +48,21 @@ public class LogUtils {
.map(TaskLogDiscriminator::getLogBase)
.map(e -> Paths.get(e)
.toAbsolutePath()
.resolve(String.valueOf(processDefinitionId))
.resolve(processDefineCode + "_" + processDefineVersion)
.resolve(String.valueOf(processInstanceId))
.resolve(taskInstanceId + ".log"))
.map(Path::toString)
.orElse("");
}
/**
* get task log path by TaskInstance
*/
public static String getTaskLogPath(TaskInstance taskInstance) {
return getTaskLogPath(taskInstance.getProcessDefinitionId(), taskInstance.getProcessInstanceId(), taskInstance.getId());
}
/**
* get task log path by TaskExecutionContext
*/
public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
return getTaskLogPath(taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
return getTaskLogPath(taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
}

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -133,7 +133,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
setTaskCache(taskExecutionContext);
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -174,7 +174,6 @@ public class ConditionsTaskTest {
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType());
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
return taskInstance;
}

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -382,7 +382,6 @@ public class DependentTaskTest {
taskInstance.setTaskCode(TASK_CODE);
taskInstance.setTaskDefinitionVersion(TASK_VERSION);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
taskInstance.setTaskType(taskNode.getType());
taskInstance.setDependency(JSONUtils.parseObject(taskNode.getDependence(), DependentParameters.class));
@ -423,7 +422,6 @@ public class DependentTaskTest {
taskInstance.setId(taskInstanceId);
taskInstance.setName(taskName);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setState(state);
return taskInstance;
}

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -153,7 +153,6 @@ public class SubProcessTaskTest {
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType());
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
return taskInstance;
}

11
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -107,7 +107,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -139,7 +138,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SQL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -183,7 +181,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("DATAX");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -225,7 +222,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SQOOP");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -267,7 +263,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -285,7 +280,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -321,7 +315,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -356,7 +349,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -388,7 +380,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -416,7 +407,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
ProcessInstance processInstance = new ProcessInstance();
@ -461,7 +451,6 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);

15
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import java.nio.file.Path;
@ -38,10 +38,11 @@ public class LogUtilsTest {
@Test
public void testGetTaskLogPath() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(100);
taskInstance.setId(1000);
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessInstanceId(100);
taskExecutionContext.setTaskInstanceId(1000);
taskExecutionContext.setProcessDefineCode(1L);
taskExecutionContext.setProcessDefineVersion(1);
Logger rootLogger = (Logger) LoggerFactory.getILoggerFactory().getLogger("ROOT");
Assert.assertNotNull(rootLogger);
@ -59,8 +60,8 @@ public class LogUtilsTest {
Path logPath = Paths.get(".").toAbsolutePath().getParent()
.resolve(logBase)
.resolve("1").resolve("100").resolve("1000.log");
Assert.assertEquals(logPath.toString(), LogUtils.getTaskLogPath(taskInstance));
.resolve("1_1").resolve("100").resolve("1000.log");
Assert.assertEquals(logPath.toString(), LogUtils.getTaskLogPath(taskExecutionContext));
}
}

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -112,7 +112,6 @@ public class TaskExecuteProcessorTest {
.thenReturn(taskExecutionContextCacheManager);
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -90,7 +90,6 @@ public class TaskExecuteThreadTest {
taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()
));

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java

@ -95,7 +95,6 @@ public class WorkerManagerThreadTest {
taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()
));

25
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java

@ -19,24 +19,23 @@ package org.apache.dolphinscheduler.server.worker.sql;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.common.utils.*;
import java.util.Date;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* python shell command executor test
* python shell command executor test
*/
@Ignore
public class SqlExecutorTest {
@ -46,7 +45,7 @@ public class SqlExecutorTest {
private ProcessService processService = null;
@Before
public void before(){
public void before() {
processService = SpringApplicationContext.getBean(ProcessService.class);
}
@ -88,11 +87,11 @@ public class SqlExecutorTest {
/**
* Basic test template for SQLTasks, mainly test different types of DBMS types
*
* @param nodeName node name for selected task
* @param taskAppId task app id
* @param tenantCode tenant code
* @param taskInstId task instance id
* @throws Exception
*/
private void sharedTestSqlTask(String nodeName, String taskAppId, String tenantCode, int taskInstId) throws Exception {
TaskProps taskProps = new TaskProps();
@ -115,15 +114,13 @@ public class SqlExecutorTest {
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
// AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
//AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
AbstractTask task = null;
logger.info("task info : {}", task);
logger.info("task info : {}", task);
// job init
task.init();
@ -132,11 +129,11 @@ public class SqlExecutorTest {
task.handle();
ExecutionStatus status = ExecutionStatus.SUCCESS;
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) {
status = ExecutionStatus.SUCCESS;
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
} else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL) {
status = ExecutionStatus.KILL;
}else {
} else {
status = ExecutionStatus.FAILURE;
}

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

@ -75,7 +75,6 @@ public class TaskManagerTest {
taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()
));

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1459,7 +1459,8 @@ public class ProcessService {
// get process instance
ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = findProcessDefineById(taskInstance.getProcessDefinitionId());
ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
@ -2168,16 +2169,15 @@ public class ProcessService {
* format task app id in task instance
*/
public String formatTaskAppId(TaskInstance taskInstance) {
ProcessDefinition definition = this.findProcessDefineById(taskInstance.getProcessDefinitionId());
ProcessInstance processInstanceById = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
if (definition == null || processInstanceById == null) {
ProcessInstance processInstance = findProcessInstanceById(taskInstance.getProcessInstanceId());
if (processInstance == null) {
return "";
}
ProcessDefinition definition = findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
if (definition == null) {
return "";
}
return String.format("%s_%s_%s",
definition.getId(),
processInstanceById.getId(),
taskInstance.getId());
return String.format("%s_%s_%s", definition.getId(), processInstance.getId(), taskInstance.getId());
}
@ -2563,7 +2563,7 @@ public class ProcessService {
return null;
}
List<ProcessTaskRelationLog> taskRelationList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
taskInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()
);
Map<Long, Integer> taskCodeMap = new HashedMap();

3
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -311,9 +311,7 @@ public class ProcessServiceTest {
public void testFormatTaskAppId() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(333);
taskInstance.setProcessDefinitionId(111);
taskInstance.setProcessInstanceId(222);
Mockito.when(processService.findProcessDefineById(taskInstance.getProcessDefinitionId())).thenReturn(null);
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(null);
Assert.assertEquals("", processService.formatTaskAppId(taskInstance));
@ -321,7 +319,6 @@ public class ProcessServiceTest {
processDefinition.setId(111);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(222);
Mockito.when(processService.findProcessDefineById(taskInstance.getProcessDefinitionId())).thenReturn(processDefinition);
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
Assert.assertEquals("111_222_333", processService.formatTaskAppId(taskInstance));

2
sql/dolphinscheduler_mysql.sql

@ -819,7 +819,6 @@ CREATE TABLE `t_ds_task_instance` (
`task_type` varchar(64) DEFAULT NULL COMMENT 'task type',
`task_code` bigint(20) NOT NULL COMMENT 'task definition code',
`task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',
`state` tinyint(4) DEFAULT NULL COMMENT 'Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
`submit_time` datetime DEFAULT NULL COMMENT 'task submit time',
@ -844,7 +843,6 @@ CREATE TABLE `t_ds_task_instance` (
`var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `task_instance_index` (`process_definition_code`,`process_instance_id`) USING BTREE,
CONSTRAINT `foreign_key_instance_id` FOREIGN KEY (`process_instance_id`) REFERENCES `t_ds_process_instance` (`id`) ON DELETE CASCADE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

1
sql/dolphinscheduler_postgre.sql

@ -676,7 +676,6 @@ CREATE TABLE t_ds_task_instance (
task_type varchar(64) DEFAULT NULL ,
task_code bigint NOT NULL,
task_definition_version int DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL ,
process_instance_id int DEFAULT NULL ,
state int DEFAULT NULL ,
submit_time timestamp DEFAULT NULL ,

Loading…
Cancel
Save