Browse Source

[Feature][JsonSplit] Fix master/worker server (#5253)

* Fix dependency failure

* Fix taskInstance

* task instance list page

* code review

* fix logger path

* Fix master/worker server

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
73f5a8e0a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
  2. 4
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
  3. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  4. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  5. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  6. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  7. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  8. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  9. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
  10. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
  11. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
  12. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

@ -114,15 +114,16 @@ public class FileUtils {
/** /**
* directory of process execution * directory of process execution
* *
* @param projectId project id * @param projectCode project code
* @param processDefineId process definition id * @param processDefineCode process definition Code
* @param processDefineVersion process definition version
* @param processInstanceId process instance id * @param processInstanceId process instance id
* @param taskInstanceId task instance id * @param taskInstanceId task instance id
* @return directory of process execution * @return directory of process execution
*/ */
public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) { public static String getProcessExecDir(long projectCode, long processDefineCode, int processDefineVersion, int processInstanceId, int taskInstanceId) {
String fileName = String.format("%s/exec/process/%d/%d/%d/%d", DATA_BASEDIR, String fileName = String.format("%s/exec/process/%d/%s/%d/%d", DATA_BASEDIR,
projectId, processDefineId, processInstanceId, taskInstanceId); projectCode, processDefineCode + "_" + processDefineVersion, processInstanceId, taskInstanceId);
File file = new File(fileName); File file = new File(fileName);
if (!file.getParentFile().exists()) { if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs(); file.getParentFile().mkdirs();

4
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java

@ -60,8 +60,8 @@ public class FileUtilsTest {
@Test @Test
public void testGetProcessExecDir() { public void testGetProcessExecDir() {
String dir = FileUtils.getProcessExecDir(1, 2, 3, 4); String dir = FileUtils.getProcessExecDir(1L, 2L, 1, 3, 4);
Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3/4", dir); Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2_1/3/4", dir);
} }
@Test @Test

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -88,8 +88,9 @@ public class TaskExecutionContextBuilder {
* @return TaskExecutionContextBuilder * @return TaskExecutionContextBuilder
*/ */
public TaskExecutionContextBuilder buildProcessDefinitionRelatedInfo(ProcessDefinition processDefinition){ public TaskExecutionContextBuilder buildProcessDefinitionRelatedInfo(ProcessDefinition processDefinition){
taskExecutionContext.setProcessDefineId(processDefinition.getId()); taskExecutionContext.setProcessDefineCode(processDefinition.getCode());
taskExecutionContext.setProjectId(processDefinition.getProjectId()); taskExecutionContext.setProcessDefineVersion(processDefinition.getVersion());
taskExecutionContext.setProjectCode(processDefinition.getProjectCode());
return this; return this;
} }

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -141,14 +141,9 @@ public class TaskExecutionContext implements Serializable {
private String queue; private String queue;
/** /**
* process define id * project code
*/ */
private int processDefineId; private long projectCode;
/**
* project id
*/
private int projectId;
/** /**
* taskParams * taskParams
@ -385,20 +380,12 @@ public class TaskExecutionContext implements Serializable {
this.queue = queue; this.queue = queue;
} }
public int getProcessDefineId() { public long getProjectCode() {
return processDefineId; return projectCode;
}
public void setProcessDefineId(int processDefineId) {
this.processDefineId = processDefineId;
}
public int getProjectId() {
return projectId;
} }
public void setProjectId(int projectId) { public void setProjectCode(long projectCode) {
this.projectId = projectId; this.projectCode = projectCode;
} }
public String getTaskParams() { public String getTaskParams() {
@ -551,8 +538,7 @@ public class TaskExecutionContext implements Serializable {
+ ", cmdTypeIfComplement=" + cmdTypeIfComplement + ", cmdTypeIfComplement=" + cmdTypeIfComplement
+ ", tenantCode='" + tenantCode + '\'' + ", tenantCode='" + tenantCode + '\''
+ ", queue='" + queue + '\'' + ", queue='" + queue + '\''
+ ", processDefineId=" + processDefineId + ", projectCode=" + projectCode
+ ", projectId=" + projectId
+ ", taskParams='" + taskParams + '\'' + ", taskParams='" + taskParams + '\''
+ ", envFile='" + envFile + '\'' + ", envFile='" + envFile + '\''
+ ", definedParams=" + definedParams + ", definedParams=" + definedParams

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -284,7 +284,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
logger.warn("process id:{} process name:{} task id: {},name:{} execution time out", logger.warn("process id:{} process name:{} task id: {},name:{} execution time out",
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
// send warn mail // send warn mail
ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); //ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(), processInstance.getId(), processInstance.getName(), alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(), processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName()); taskInstance.getId(), taskInstance.getName());
return true; return true;

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -216,8 +216,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
* @return execute local path * @return execute local path
*/ */
private String getExecLocalPath(TaskExecutionContext taskExecutionContext) { private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
} }

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -150,8 +150,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap()); taskExecutionContext.setDefinedParams(getGlobalParamsMap());
taskExecutionContext.setTaskAppId(String.format("%s_%s_%s", taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId())); taskExecutionContext.getTaskInstanceId()));

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -136,8 +136,9 @@ public class TaskExecuteProcessorTest {
.thenReturn(taskExecutionContext); .thenReturn(taskExecutionContext);
PowerMockito.mockStatic(FileUtils.class); PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId())) taskExecutionContext.getTaskInstanceId()))
.thenReturn(taskExecutionContext.getExecutePath()); .thenReturn(taskExecutionContext.getExecutePath());
@ -168,7 +169,6 @@ public class TaskExecuteProcessorTest {
public TaskExecutionContext getTaskExecutionContext() { public TaskExecutionContext getTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345); taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("sql"); taskExecutionContext.setTaskType("sql");

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -75,7 +75,6 @@ public class TaskExecuteThreadTest {
// init task execution context, logger // init task execution context, logger
taskExecutionContext = new TaskExecutionContext(); taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345); taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType(""); taskExecutionContext.setTaskType("");

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java

@ -79,7 +79,6 @@ public class WorkerManagerThreadTest {
// init task execution context, logger // init task execution context, logger
taskExecutionContext = new TaskExecutionContext(); taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345); taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTenantCode("test"); taskExecutionContext.setTenantCode("test");

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

@ -63,7 +63,6 @@ public class TaskManagerTest {
// init task execution context, logger // init task execution context, logger
taskExecutionContext = new TaskExecutionContext(); taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345); taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType(""); taskExecutionContext.setTaskType("");

Loading…
Cancel
Save