From 73f5a8e0a7f99209d4f6998fc6b83f90ab82462c Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Mon, 12 Apr 2021 10:53:22 +0800 Subject: [PATCH] [Feature][JsonSplit] Fix master/worker server (#5253) * Fix dependency failure * Fix taskInstance * task instance list page * code review * fix logger path * Fix master/worker server Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../common/utils/FileUtils.java | 11 ++++---- .../common/utils/FileUtilsTest.java | 4 +-- .../builder/TaskExecutionContextBuilder.java | 5 ++-- .../server/entity/TaskExecutionContext.java | 28 +++++-------------- .../consumer/TaskPriorityQueueConsumer.java | 1 - .../runner/MasterBaseTaskExecThread.java | 2 +- .../processor/TaskExecuteProcessor.java | 5 ++-- .../worker/runner/TaskExecuteThread.java | 3 +- .../processor/TaskExecuteProcessorTest.java | 6 ++-- .../worker/runner/TaskExecuteThreadTest.java | 1 - .../runner/WorkerManagerThreadTest.java | 1 - .../server/worker/task/TaskManagerTest.java | 1 - 12 files changed, 26 insertions(+), 42 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index ae6291a854..8c45c21c01 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -114,15 +114,16 @@ public class FileUtils { /** * directory of process execution * - * @param projectId project id - * @param processDefineId process definition id + * @param projectCode project code + * @param processDefineCode process definition Code + * @param processDefineVersion process definition version * @param processInstanceId process instance id * @param taskInstanceId task instance id * @return directory of process execution */ - public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) { - String fileName = String.format("%s/exec/process/%d/%d/%d/%d", DATA_BASEDIR, - projectId, processDefineId, processInstanceId, taskInstanceId); + public static String getProcessExecDir(long projectCode, long processDefineCode, int processDefineVersion, int processInstanceId, int taskInstanceId) { + String fileName = String.format("%s/exec/process/%d/%s/%d/%d", DATA_BASEDIR, + projectCode, processDefineCode + "_" + processDefineVersion, processInstanceId, taskInstanceId); File file = new File(fileName); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java index a1ddef158f..4cbd4ae682 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java @@ -60,8 +60,8 @@ public class FileUtilsTest { @Test public void testGetProcessExecDir() { - String dir = FileUtils.getProcessExecDir(1, 2, 3, 4); - Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3/4", dir); + String dir = FileUtils.getProcessExecDir(1L, 2L, 1, 3, 4); + Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2_1/3/4", dir); } @Test diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 620100f0ad..dc00335c91 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -88,8 +88,9 @@ public class TaskExecutionContextBuilder { * @return TaskExecutionContextBuilder */ public TaskExecutionContextBuilder buildProcessDefinitionRelatedInfo(ProcessDefinition processDefinition){ - taskExecutionContext.setProcessDefineId(processDefinition.getId()); - taskExecutionContext.setProjectId(processDefinition.getProjectId()); + taskExecutionContext.setProcessDefineCode(processDefinition.getCode()); + taskExecutionContext.setProcessDefineVersion(processDefinition.getVersion()); + taskExecutionContext.setProjectCode(processDefinition.getProjectCode()); return this; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index af5755cfd8..2a4c089d76 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -141,14 +141,9 @@ public class TaskExecutionContext implements Serializable { private String queue; /** - * process define id + * project code */ - private int processDefineId; - - /** - * project id - */ - private int projectId; + private long projectCode; /** * taskParams @@ -385,20 +380,12 @@ public class TaskExecutionContext implements Serializable { this.queue = queue; } - public int getProcessDefineId() { - return processDefineId; - } - - public void setProcessDefineId(int processDefineId) { - this.processDefineId = processDefineId; - } - - public int getProjectId() { - return projectId; + public long getProjectCode() { + return projectCode; } - public void setProjectId(int projectId) { - this.projectId = projectId; + public void setProjectCode(long projectCode) { + this.projectCode = projectCode; } public String getTaskParams() { @@ -551,8 +538,7 @@ public class TaskExecutionContext implements Serializable { + ", cmdTypeIfComplement=" + cmdTypeIfComplement + ", tenantCode='" + tenantCode + '\'' + ", queue='" + queue + '\'' - + ", processDefineId=" + processDefineId - + ", projectId=" + projectId + + ", projectCode=" + projectCode + ", taskParams='" + taskParams + '\'' + ", envFile='" + envFile + '\'' + ", definedParams=" + definedParams diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 89050ed4d9..b51764a644 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; -import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index b3b6e389be..4266907e34 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -284,7 +284,7 @@ public class MasterBaseTaskExecThread implements Callable { logger.warn("process id:{} process name:{} task id: {},name:{} execution time out", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); // send warn mail - ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); + //ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(), processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); return true; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 76b6fe1c70..7589e8567e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -216,8 +216,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { * @return execute local path */ private String getExecLocalPath(TaskExecutionContext taskExecutionContext) { - return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), - taskExecutionContext.getProcessDefineId(), + return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 3263554e32..ee43d36cfe 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -150,8 +150,7 @@ public class TaskExecuteThread implements Runnable, Delayed { taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setDefinedParams(getGlobalParamsMap()); - taskExecutionContext.setTaskAppId(String.format("%s_%s_%s", - taskExecutionContext.getProcessDefineId(), + taskExecutionContext.setTaskAppId(String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java index 8275b36bd8..8fef8aba0b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java @@ -136,8 +136,9 @@ public class TaskExecuteProcessorTest { .thenReturn(taskExecutionContext); PowerMockito.mockStatic(FileUtils.class); - PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), - taskExecutionContext.getProcessDefineId(), + PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())) .thenReturn(taskExecutionContext.getExecutePath()); @@ -168,7 +169,6 @@ public class TaskExecuteProcessorTest { public TaskExecutionContext getTaskExecutionContext() { TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setProcessId(12345); - taskExecutionContext.setProcessDefineId(1); taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskType("sql"); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java index 394f187053..72c63f6cee 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -75,7 +75,6 @@ public class TaskExecuteThreadTest { // init task execution context, logger taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setProcessId(12345); - taskExecutionContext.setProcessDefineId(1); taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskType(""); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java index 7a73b40b43..99a1143fd9 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java @@ -79,7 +79,6 @@ public class WorkerManagerThreadTest { // init task execution context, logger taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setProcessId(12345); - taskExecutionContext.setProcessDefineId(1); taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTenantCode("test"); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java index b40977b9e7..76539ae361 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java @@ -63,7 +63,6 @@ public class TaskManagerTest { // init task execution context, logger taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setProcessId(12345); - taskExecutionContext.setProcessDefineId(1); taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskType("");