diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java new file mode 100644 index 0000000000..cafd894484 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.builder; + +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; + +/** + * TaskExecutionContext builder + */ +public class TaskExecutionContextBuilder { + + public static TaskExecutionContextBuilder get(){ + return new TaskExecutionContextBuilder(); + } + + private TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + + /** + * build taskInstance related info + * + * @param taskInstance taskInstance + * @return TaskExecutionContextBuilder + */ + public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){ + taskExecutionContext.setTaskId(taskInstance.getId()); + taskExecutionContext.setTaskName(taskInstance.getName()); + taskExecutionContext.setStartTime(taskInstance.getStartTime()); + taskExecutionContext.setTaskType(taskInstance.getTaskType()); + taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); + taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); + return this; + } + + + /** + * build processInstance related info + * + * @param processInstance + * @return TaskExecutionContextBuilder + */ + public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(ProcessInstance processInstance){ + taskExecutionContext.setProcessInstanceId(processInstance.getId()); + taskExecutionContext.setScheduleTime(processInstance.getScheduleTime()); + taskExecutionContext.setGlobalParams(processInstance.getGlobalParams()); + taskExecutionContext.setExecutorId(processInstance.getExecutorId()); + taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode()); + taskExecutionContext.setTenantCode(processInstance.getTenantCode()); + taskExecutionContext.setQueue(processInstance.getQueue()); + return this; + } + + /** + * build processDefinition related info + * + * @param processDefinition processDefinition + * @return TaskExecutionContextBuilder + */ + public TaskExecutionContextBuilder buildProcessDefinitionRelatedInfo(ProcessDefinition processDefinition){ + taskExecutionContext.setProcessDefineId(processDefinition.getId()); + taskExecutionContext.setProjectId(processDefinition.getProjectId()); + return this; + } + + /** + * create + * + * @return taskExecutionContext + */ + public TaskExecutionContext create(){ + return taskExecutionContext; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index d704629f1e..016783977f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -128,13 +130,9 @@ public class MasterBaseTaskExecThread implements Callable { // TODO send task to worker public void sendToWorker(TaskInstance taskInstance){ final Address address = new Address("127.0.0.1", 12346); - /** - * set taskInstance relation - */ - TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance); ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand( - FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance))); + FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance))); try { Command responseCommand = nettyRemotingClient.sendSync(address, taskRequestCommand.convert2Command(), 2000); @@ -156,18 +154,25 @@ public class MasterBaseTaskExecThread implements Callable { } /** - * set task instance relation + * get TaskExecutionContext * * @param taskInstance taskInstance + * @return TaskExecutionContext */ - private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){ + private TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){ taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); - int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); + Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); + // verify tenant is null if (verifyTenantIsNull(tenant, taskInstance)) { - processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId()); + processService.changeTaskState(ExecutionStatus.FAILURE, + taskInstance.getStartTime(), + taskInstance.getHost(), + null, + null, + taskInstance.getId()); return null; } // set queue for process instance, user-specified queue takes precedence over tenant queue @@ -175,7 +180,11 @@ public class MasterBaseTaskExecThread implements Callable { taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); - return taskInstance; + return TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) + .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) + .create(); } @@ -196,34 +205,6 @@ public class MasterBaseTaskExecThread implements Callable { } - /** - * taskInstance convert to taskInfo - * - * @param taskInstance taskInstance - * @return taskInfo - */ - private TaskExecutionContext convertToTaskInfo(TaskInstance taskInstance){ - TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); - taskExecutionContext.setTaskId(taskInstance.getId()); - taskExecutionContext.setTaskName(taskInstance.getName()); - taskExecutionContext.setStartTime(taskInstance.getStartTime()); - taskExecutionContext.setTaskType(taskInstance.getTaskType()); - taskExecutionContext.setExecutePath(getExecLocalPath(taskInstance)); - taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); - taskExecutionContext.setProcessInstanceId(taskInstance.getProcessInstance().getId()); - taskExecutionContext.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime()); - taskExecutionContext.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams()); - taskExecutionContext.setExecutorId(taskInstance.getProcessInstance().getExecutorId()); - taskExecutionContext.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode()); - taskExecutionContext.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); - taskExecutionContext.setQueue(taskInstance.getProcessInstance().getQueue()); - taskExecutionContext.setProcessDefineId(taskInstance.getProcessDefine().getId()); - taskExecutionContext.setProjectId(taskInstance.getProcessDefine().getProjectId()); - - return taskExecutionContext; - } - - /** * get execute local path *