diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java index 1ee79156dc..56fdd078d7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java @@ -65,4 +65,13 @@ public enum CommandType { public String getDescp() { return descp; } + + public static CommandType of(Integer status){ + for(CommandType cmdType : values()){ + if(cmdType.getCode() == status){ + return cmdType; + } + } + throw new IllegalArgumentException("invalid status : " + status); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 2db1eda8f4..c9481baf94 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -27,13 +27,14 @@ import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import java.io.Serializable; import java.util.Date; /** * task instance */ @TableName("t_ds_task_instance") -public class TaskInstance { +public class TaskInstance implements Serializable { /** * id @@ -198,7 +199,7 @@ public class TaskInstance { - public void init(String host,Date startTime,String executePath){ + public void init(String host,Date startTime,String executePath){ this.host = host; this.startTime = startTime; this.executePath = executePath; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java index f95fbc7a4d..0c3238a5c5 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java @@ -79,8 +79,10 @@ public interface DataSourceMapper extends BaseMapper { /** * list authorized UDF function + * * @param userId userId * @param dataSourceIds data source id array + * @param T * @return UDF function list */ List listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index 4d01142c97..e1556f32f1 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInstanceJson='" + taskInstanceJson + '\'' + '}'; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInfoJson; public String getTaskInfoJson() { return taskInfoJson; } public void setTaskInfoJson(String taskInfoJson) { this.taskInfoJson = taskInfoJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInfoJson) { this.taskInfoJson = taskInfoJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInfoJson='" + taskInfoJson + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java new file mode 100644 index 0000000000..3fb58fe3da --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import java.io.Serializable; +import java.util.Date; + +/** + * master/worker task transport + */ +public class TaskInfo implements Serializable{ + + /** + * task instance id + */ + private Integer taskId; + + + /** + * taks name + */ + private String taskName; + + /** + * task start time + */ + private Date startTime; + + /** + * task type + */ + private String taskType; + + /** + * task execute path + */ + private String executePath; + + /** + * task json + */ + private String taskJson; + + + /** + * process instance id + */ + private Integer processInstanceId; + + + /** + * process instance schedule time + */ + private Date scheduleTime; + + /** + * process instance global parameters + */ + private String globalParams; + + + /** + * execute user id + */ + private Integer executorId; + + + /** + * command type if complement + */ + private Integer cmdTypeIfComplement; + + + /** + * tenant code + */ + private String tenantCode; + + /** + * task queue + */ + private String queue; + + + /** + * process define id + */ + private Integer processDefineId; + + /** + * project id + */ + private Integer projectId; + + public Integer getTaskId() { + return taskId; + } + + public void setTaskId(Integer taskId) { + this.taskId = taskId; + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public String getTaskType() { + return taskType; + } + + public void setTaskType(String taskType) { + this.taskType = taskType; + } + + public String getExecutePath() { + return executePath; + } + + public void setExecutePath(String executePath) { + this.executePath = executePath; + } + + public String getTaskJson() { + return taskJson; + } + + public void setTaskJson(String taskJson) { + this.taskJson = taskJson; + } + + public Integer getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(Integer processInstanceId) { + this.processInstanceId = processInstanceId; + } + + public Date getScheduleTime() { + return scheduleTime; + } + + public void setScheduleTime(Date scheduleTime) { + this.scheduleTime = scheduleTime; + } + + public String getGlobalParams() { + return globalParams; + } + + public void setGlobalParams(String globalParams) { + this.globalParams = globalParams; + } + + public String getTenantCode() { + return tenantCode; + } + + public void setTenantCode(String tenantCode) { + this.tenantCode = tenantCode; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public Integer getProcessDefineId() { + return processDefineId; + } + + public void setProcessDefineId(Integer processDefineId) { + this.processDefineId = processDefineId; + } + + public Integer getProjectId() { + return projectId; + } + + public void setProjectId(Integer projectId) { + this.projectId = projectId; + } + + public Integer getExecutorId() { + return executorId; + } + + public void setExecutorId(Integer executorId) { + this.executorId = executorId; + } + + public Integer getCmdTypeIfComplement() { + return cmdTypeIfComplement; + } + + public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) { + this.cmdTypeIfComplement = cmdTypeIfComplement; + } + + @Override + public String toString() { + return "TaskInfo{" + + "taskId=" + taskId + + ", taskName='" + taskName + '\'' + + ", startTime=" + startTime + + ", taskType='" + taskType + '\'' + + ", executePath='" + executePath + '\'' + + ", taskJson='" + taskJson + '\'' + + ", processInstanceId=" + processInstanceId + + ", scheduleTime=" + scheduleTime + + ", globalParams='" + globalParams + '\'' + + ", executorId=" + executorId + + ", cmdTypeIfComplement=" + cmdTypeIfComplement + + ", tenantCode='" + tenantCode + '\'' + + ", queue='" + queue + '\'' + + ", processDefineId=" + processDefineId + + ", projectId=" + projectId + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 0a153e7041..d0c7bc2350 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -118,6 +118,7 @@ public class MasterServer implements IStoppable { // //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(45678); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService)); this.nettyRemotingServer.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 0dd45f091e..c3b6a05676 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -46,6 +46,13 @@ public class TaskResponseProcessor implements NettyRequestProcessor { this.processService = processService; } + /** + * task final result response + * need master process , state persistence + * + * @param channel channel + * @param command command + */ @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index a261b3430a..4ae057a681 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -17,14 +17,18 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.utils.BeanContext; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Address; @@ -124,11 +128,23 @@ public class MasterBaseTaskExecThread implements Callable { // TODO send task to worker public void sendToWorker(TaskInstance taskInstance){ final Address address = new Address("127.0.0.1", 12346); - ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance)); + + /** + * set taskInstance relation + */ + TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance); + + ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand( + FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance))); try { - Command responseCommand = nettyRemotingClient.sendSync(address, taskRequestCommand.convert2Command(), Integer.MAX_VALUE); - ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(responseCommand.getBody(), ExecuteTaskAckCommand.class); + Command responseCommand = nettyRemotingClient.sendSync(address, + taskRequestCommand.convert2Command(), Integer.MAX_VALUE); + + ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( + responseCommand.getBody(), ExecuteTaskAckCommand.class); + logger.info("taskAckCommand : {}",taskAckCommand); + processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()), taskAckCommand.getStartTime(), taskAckCommand.getHost(), @@ -141,6 +157,88 @@ public class MasterBaseTaskExecThread implements Callable { } } + + /** + * set task instance relation + * + * @param taskInstance taskInstance + */ + private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){ + taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); + + int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); + Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); + // verify tenant is null + if (verifyTenantIsNull(tenant, taskInstance)) { + processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId()); + return null; + } + // set queue for process instance, user-specified queue takes precedence over tenant queue + String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); + taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); + taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); + + return taskInstance; + } + + + /** + * whehter tenant is null + * @param tenant tenant + * @param taskInstance taskInstance + * @return result + */ + private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { + if(tenant == null){ + logger.error("tenant not exists,process instance id : {},task instance id : {}", + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); + return true; + } + return false; + } + + + /** + * taskInstance convert to taskInfo + * + * @param taskInstance taskInstance + * @return taskInfo + */ + private TaskInfo convertToTaskInfo(TaskInstance taskInstance){ + TaskInfo taskInfo = new TaskInfo(); + taskInfo.setTaskId(taskInstance.getId()); + taskInfo.setTaskName(taskInstance.getName()); + taskInfo.setStartTime(taskInstance.getStartTime()); + taskInfo.setTaskType(taskInstance.getTaskType()); + taskInfo.setExecutePath(getExecLocalPath(taskInstance)); + taskInfo.setTaskJson(taskInstance.getTaskJson()); + taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId()); + taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime()); + taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams()); + taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId()); + taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode()); + taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); + taskInfo.setQueue(taskInstance.getProcessInstance().getQueue()); + taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId()); + taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId()); + + return taskInfo; + } + + + /** + * get execute local path + * + * @return execute local path + */ + private String getExecLocalPath(TaskInstance taskInstance){ + return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); + } + /** * submit master base task exec thread * @return TaskInstance diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index 8ea9ccbe5b..ba2149492b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -85,71 +85,39 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( command.getBody(), ExecuteTaskRequestCommand.class); - String taskInstanceJson = taskRequestCommand.getTaskInstanceJson(); + String taskInstanceJson = taskRequestCommand.getTaskInfoJson(); - TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class); - - taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); - - - //TODO this logic need add to master - int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); - Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); - // verify tenant is null - if (verifyTenantIsNull(tenant, taskInstance)) { - processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId()); - return; - } - // set queue for process instance, user-specified queue takes precedence over tenant queue - String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); - taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); - taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); - //TODO end + TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class); // local execute path - String execLocalPath = getExecLocalPath(taskInstance); + String execLocalPath = getExecLocalPath(taskInfo); logger.info("task instance local execute path : {} ", execLocalPath); - // init task - taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath); + try { - FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode()); + FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode()); } catch (Exception ex){ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); } - taskCallbackService.addCallbackChannel(taskInstance.getId(), + taskCallbackService.addCallbackChannel(taskInfo.getTaskId(), new CallbackChannel(channel, command.getOpaque())); // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, + workerExecService.submit(new TaskScheduleThread(taskInfo, processService, taskCallbackService)); } - /** - * whehter tenant is null - * @param tenant tenant - * @param taskInstance taskInstance - * @return result - */ - private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { - if(tenant == null){ - logger.error("tenant not exists,process instance id : {},task instance id : {}", - taskInstance.getProcessInstance().getId(), - taskInstance.getId()); - return true; - } - return false; - } /** - * get execute local path - * @param taskInstance taskInstance + * get execute local path + * + * @param taskInfo taskInfo * @return execute local path */ - private String getExecLocalPath(TaskInstance taskInstance){ - return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId()); + private String getExecLocalPath(TaskInfo taskInfo){ + return FileUtils.getProcessExecDir(taskInfo.getProjectId(), + taskInfo.getProcessDefineId(), + taskInfo.getProcessInstanceId(), + taskInfo.getTaskId()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 349e762616..0de8ea3921 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator; @@ -29,14 +30,12 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; -import org.apache.dolphinscheduler.common.utils.CommonUtils; -import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; @@ -64,7 +63,7 @@ public class TaskScheduleThread implements Runnable { /** * task instance */ - private TaskInstance taskInstance; + private TaskInfo taskInfo; /** * process service @@ -82,72 +81,69 @@ public class TaskScheduleThread implements Runnable { private TaskCallbackService taskInstanceCallbackService; /** - * constructor + * constructor * - * @param taskInstance task instance - * @param processService process dao + * @param taskInfo taskInfo + * @param processService processService + * @param taskInstanceCallbackService taskInstanceCallbackService */ - public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ + public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ this.processService = processService; - this.taskInstance = taskInstance; + this.taskInfo = taskInfo; this.taskInstanceCallbackService = taskInstanceCallbackService; } @Override public void run() { - ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); + ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInfo.getTaskId()); try { // tell master that task is in executing - ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType()); - taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); + ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType()); + taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand); - logger.info("script path : {}", taskInstance.getExecutePath()); + logger.info("script path : {}", taskInfo.getExecutePath()); // task node - TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); + TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class); // get resource files List resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local downloadResource( - taskInstance.getExecutePath(), + taskInfo.getExecutePath(), resourceFiles, logger); - - // get process instance according to tak instance - ProcessInstance processInstance = taskInstance.getProcessInstance(); - // set task props TaskProps taskProps = new TaskProps(taskNode.getParams(), - taskInstance.getExecutePath(), - processInstance.getScheduleTime(), - taskInstance.getName(), - taskInstance.getTaskType(), - taskInstance.getId(), + taskInfo.getExecutePath(), + taskInfo.getScheduleTime(), + taskInfo.getTaskName(), + taskInfo.getTaskType(), + taskInfo.getTaskId(), CommonUtils.getSystemEnvPath(), - processInstance.getTenantCode(), - processInstance.getQueue(), - taskInstance.getStartTime(), + taskInfo.getTenantCode(), + taskInfo.getQueue(), + taskInfo.getStartTime(), getGlobalParamsMap(), - taskInstance.getDependency(), - processInstance.getCmdTypeIfComplement()); + null, + CommandType.of(taskInfo.getCmdTypeIfComplement())); // set task timeout setTaskTimeout(taskProps, taskNode); taskProps.setTaskAppId(String.format("%s_%s_%s", - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); + taskInfo.getProcessDefineId(), + taskInfo.getProcessInstanceId(), + taskInfo.getTaskId())); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); + taskInfo.getProcessDefineId(), + taskInfo.getProcessInstanceId(), + taskInfo.getTaskId())); - task = TaskManager.newTask(taskInstance.getTaskType(), + task = TaskManager.newTask(taskInfo.getTaskType(), taskProps, taskLogger); @@ -163,14 +159,14 @@ public class TaskScheduleThread implements Runnable { // responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); - logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus()); + logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus()); }catch (Exception e){ logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); } finally { - taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); + taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand); } } @@ -182,7 +178,7 @@ public class TaskScheduleThread implements Runnable { Map globalParamsMap = new HashMap<>(16); // global params string - String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams(); + String globalParamsStr = taskInfo.getGlobalParams(); if (globalParamsStr != null) { List globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class); @@ -192,18 +188,18 @@ public class TaskScheduleThread implements Runnable { } /** * build ack command - * @param taskType + * @param taskType taskType */ private ExecuteTaskAckCommand buildAckCommand(String taskType) { ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand(); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); ackCommand.setLogPath(getTaskLogPath()); - ackCommand.setHost("localhost"); + ackCommand.setHost(OSUtils.getHost()); ackCommand.setStartTime(new Date()); if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){ ackCommand.setExecutePath(null); }else{ - ackCommand.setExecutePath(taskInstance.getExecutePath()); + ackCommand.setExecutePath(taskInfo.getExecutePath()); } return ackCommand; } @@ -219,15 +215,15 @@ public class TaskScheduleThread implements Runnable { .getDiscriminator()).getLogBase(); if (baseLog.startsWith(Constants.SINGLE_SLASH)){ return baseLog + Constants.SINGLE_SLASH + - taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + - taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInstance.getId() + ".log"; + taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + + taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskInfo.getTaskId() + ".log"; } return System.getProperty("user.dir") + Constants.SINGLE_SLASH + baseLog + Constants.SINGLE_SLASH + - taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + - taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInstance.getId() + ".log"; + taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + + taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskInfo.getTaskId() + ".log"; } /** @@ -333,33 +329,9 @@ public class TaskScheduleThread implements Runnable { * @throws Exception exception */ private void checkDownloadPermission(List projectRes) throws Exception { - int userId = taskInstance.getProcessInstance().getExecutorId(); + int userId = taskInfo.getExecutorId(); String[] resNames = projectRes.toArray(new String[projectRes.size()]); PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); permissionCheck.checkPermission(); } - - /** - * update task state according to task type - * @param taskType - */ - private void updateTaskState(String taskType) { - // update task status is running - if(taskType.equals(TaskType.SQL.name()) || - taskType.equals(TaskType.PROCEDURE.name())){ - processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, - taskInstance.getStartTime(), - taskInstance.getHost(), - null, - getTaskLogPath(), - taskInstance.getId()); - }else{ - processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, - taskInstance.getStartTime(), - taskInstance.getHost(), - taskInstance.getExecutePath(), - getTaskLogPath(), - taskInstance.getId()); - } - } } \ No newline at end of file