diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java index f24f168679..371694a051 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java @@ -22,6 +22,7 @@ public enum TaskEventType { DELAY, RUNNING, RESULT, + UPDATE_PID, WORKER_REJECT, CACHE, } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java new file mode 100644 index 0000000000..1f81139ae2 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.TaskEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; +import org.apache.dolphinscheduler.remote.command.TaskUpdatePidAckMessage; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; + +import java.util.Optional; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskUpdatePidEventHandler implements TaskEventHandler { + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Override + public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { + int taskInstanceId = taskEvent.getTaskInstanceId(); + int processInstanceId = taskEvent.getProcessInstanceId(); + + WorkflowExecuteRunnable workflowExecuteRunnable = + this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable == null) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle task running event error, cannot find related workflow instance from cache, will discard this event"); + } + Optional taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId); + if (!taskInstanceOptional.isPresent()) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle running event error, cannot find the taskInstance from cache, will discord this event"); + } + TaskInstance taskInstance = taskInstanceOptional.get(); + if (taskInstance.getState().isFinished()) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle task running event error, this task instance is already finished, this event is delay, will discard this event"); + } + + TaskInstance oldTaskInstance = new TaskInstance(); + TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); + try { + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setPid(taskEvent.getProcessId()); + if (!taskInstanceDao.updateTaskInstance(taskInstance)) { + throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed"); + } + sendAckToWorker(taskEvent); + } catch (Exception ex) { + TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); + if (ex instanceof TaskEventHandleError) { + throw ex; + } + throw new TaskEventHandleError("Handle task update pid event error, update taskInstance to db failed", ex); + } + + } + + private void sendAckToWorker(TaskEvent taskEvent) { + // If event handle success, send ack to worker to otherwise the worker will retry this event + TaskUpdatePidAckMessage taskUpdatePidAckMessage = + new TaskUpdatePidAckMessage(true, taskEvent.getTaskInstanceId()); + taskEvent.getChannel().writeAndFlush(taskUpdatePidAckMessage.convert2Command()); + } + + @Override + public TaskEventType getHandleEventType() { + return TaskEventType.UPDATE_PID; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java new file mode 100644 index 0000000000..bca59a079f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.google.common.base.Preconditions; +import io.netty.channel.Channel; + +/** + * task execute running processor + */ +@Component +@Slf4j +public class TaskUpdatePidProcessor implements NettyRequestProcessor { + + @Autowired + private TaskEventService taskEventService; + + /** + * task ack process + * + * @param channel channel channel + * @param command command TaskExecuteAckCommand + */ + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.TASK_UPDATE_PID == command.getType(), + String.format("invalid command type : %s", command.getType())); + TaskUpdatePidCommand taskUpdatePidCommand = + JSONUtils.parseObject(command.getBody(), TaskUpdatePidCommand.class); + log.info("taskUpdatePidCommand: {}", taskUpdatePidCommand); + + TaskEvent taskEvent = TaskEvent.newUpdatePidEvent(taskUpdatePidCommand, + channel, + taskUpdatePidCommand.getMessageSenderAddress()); + taskEventService.addEvent(taskEvent); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index 2573707ec0..1d2827223c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.command.TaskRejectCommand; +import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand; import java.util.Date; @@ -161,4 +162,16 @@ public class TaskEvent { event.setEvent(TaskEventType.CACHE); return event; } + + public static TaskEvent newUpdatePidEvent(TaskUpdatePidCommand command, Channel channel, String workerAddress) { + TaskEvent event = new TaskEvent(); + event.setProcessInstanceId(command.getProcessInstanceId()); + event.setTaskInstanceId(command.getTaskInstanceId()); + event.setStartTime(DateUtils.timeStampToDate(command.getStartTime())); + event.setLogPath(command.getLogPath()); + event.setChannel(channel); + event.setWorkerAddress(workerAddress); + event.setEvent(TaskEventType.UPDATE_PID); + return event; + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java index 1a070c10af..a72d73b849 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningPro import org.apache.dolphinscheduler.server.master.processor.TaskExecuteStartProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskUpdatePidProcessor; import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor; import lombok.extern.slf4j.Slf4j; @@ -67,6 +68,9 @@ public class MasterRPCServer implements AutoCloseable { @Autowired private TaskKillResponseProcessor taskKillResponseProcessor; + @Autowired + private TaskUpdatePidProcessor updatePidProcessor; + @Autowired private TaskRecallProcessor taskRecallProcessor; @@ -86,6 +90,7 @@ public class MasterRPCServer implements AutoCloseable { serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_UPDATE_PID, updatePidProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 920332f4a5..0ee631ae28 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -131,6 +131,16 @@ public enum CommandType { */ WORKFLOW_EXECUTING_DATA_REQUEST, + /** + * update taskInstance's PID request + */ + TASK_UPDATE_PID, + + /** + * update taskInstance's PID response ack, from master to worker + */ + TASK_UPDATE_PID_ACK, + /** * workflow executing data response, from master to api */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java new file mode 100644 index 0000000000..4f878c5c55 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * task execute running ack command + * from master to worker + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class TaskUpdatePidAckMessage implements Serializable { + + private boolean success; + private int taskInstanceId; + + /** + * package response command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.TASK_UPDATE_PID_ACK); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java new file mode 100644 index 0000000000..91e5787bd5 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + * Task running message, means the task is running in worker. + */ +@Data +@NoArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +public class TaskUpdatePidCommand extends BaseCommand { + + /** + * taskInstanceId + */ + private int taskInstanceId; + + /** + * process instance id + */ + private int processInstanceId; + + /** + * startTime + */ + private long startTime; + + /** + * host + */ + private String host; + + /** + * logPath + */ + private String logPath; + + /** + * processId + */ + private int processId; + + public TaskUpdatePidCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { + super(messageSenderAddress, messageReceiverAddress, messageSendTime); + } + + /** + * package request command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.TASK_UPDATE_PID); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index e2e0ce9338..c3e9b952ad 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -186,7 +186,7 @@ public abstract class AbstractCommandExecutor { command.add(String.format("--uid=%s", taskRequest.getTenantCode())); } - public TaskResponse run(String execCommand) throws IOException, InterruptedException { + public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws IOException, InterruptedException { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { @@ -228,6 +228,11 @@ public abstract class AbstractCommandExecutor { // if timeout occurs, exit directly long remainTime = getRemainTime(); + // update pid before waiting for the run to finish + if (null != taskCallBack) { + taskCallBack.updateTaskInstanceInfo(taskInstanceId); + } + // waiting for the run to finish boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java index 68993dc9dc..85ab9a610f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java @@ -60,7 +60,7 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask { public void handle(TaskCallBack taskCallBack) throws TaskException { try { // SHELL task exit code - TaskResponse response = shellCommandExecutor.run(buildCommand()); + TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack); setExitStatusCode(response.getExitStatusCode()); // set appIds setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java index 71643d56cd..b1a85852dc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java @@ -22,4 +22,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; public interface TaskCallBack { public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo); + + public void updateTaskInstanceInfo(int taskInstanceId); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java index 6a832488c4..7f09243104 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java @@ -115,7 +115,7 @@ public class ChunJunTask extends AbstractTask { String jsonFilePath = buildChunJunJsonFile(paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); - TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 53bd2a1871..dfa45ff5d2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -159,7 +159,7 @@ public class DataxTask extends AbstractTask { // run datax processDataSourceService String jsonFilePath = buildDataxJsonFile(paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); - TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setProcessId(commandExecuteResult.getProcessId()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java index 08c8d5466e..7368b6208f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskRunStatus; +import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; @@ -67,7 +68,17 @@ public class DataxTaskTest { private DataxTask dataxTask; - private final TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { + private final TaskCallBack taskCallBack = new TaskCallBack() { + + @Override + public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) { + + } + + @Override + public void updateTaskInstanceInfo(int taskInstanceId) { + + } }; @BeforeEach @@ -99,7 +110,7 @@ public class DataxTaskTest { taskResponse.setStatus(TaskRunStatus.SUCCESS); taskResponse.setExitStatusCode(0); taskResponse.setProcessId(1); - when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse); + when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse); dataxTask.handle(taskCallBack); Assertions.assertEquals(0, dataxTask.getExitStatusCode()); @@ -140,7 +151,7 @@ public class DataxTaskTest { taskResponse.setStatus(TaskRunStatus.SUCCESS); taskResponse.setExitStatusCode(0); taskResponse.setProcessId(1); - when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse); + when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse); dataxTask.handle(taskCallBack); Assertions.assertEquals(0, dataxTask.getExitStatusCode()); @@ -176,7 +187,8 @@ public class DataxTaskTest { shellCommandExecutorFiled.setAccessible(true); shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); - when(shellCommandExecutor.run(anyString())).thenThrow(new InterruptedException("Command execution failed")); + when(shellCommandExecutor.run(anyString(), eq(taskCallBack))) + .thenThrow(new InterruptedException("Command execution failed")); Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack)); } @@ -194,7 +206,8 @@ public class DataxTaskTest { shellCommandExecutorFiled.setAccessible(true); shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); - when(shellCommandExecutor.run(anyString())).thenThrow(new IOException("Command execution failed")); + when(shellCommandExecutor.run(anyString(), eq(taskCallBack))) + .thenThrow(new IOException("Command execution failed")); Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack)); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java index 32d1b48f75..37d2d2f8f9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java @@ -79,7 +79,7 @@ public class DvcTask extends AbstractTask { try { // construct process String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command); + TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setProcessId(commandExecuteResult.getProcessId()); parameters.dealOutParam(shellCommandExecutor.getVarPool()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java index 2dabd12622..b1fab4d436 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.commons.io.IOUtils; @@ -74,8 +75,17 @@ public class EmrAddStepsTaskTest { private EmrAddStepsTask emrAddStepsTask; private AmazonElasticMapReduce emrClient; private Step step; - private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { + private TaskCallBack taskCallBack = new TaskCallBack() { + @Override + public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) { + + } + + @Override + public void updateTaskInstanceInfo(int taskInstanceId) { + + } }; @BeforeEach diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java index 1d5c90ba16..fc50f7ed97 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.commons.io.IOUtils; @@ -96,8 +97,17 @@ public class EmrJobFlowTaskTest { private EmrJobFlowTask emrJobFlowTask; private AmazonElasticMapReduce emrClient; private Cluster cluster; - private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { + private TaskCallBack taskCallBack = new TaskCallBack() { + @Override + public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) { + + } + + @Override + public void updateTaskInstanceInfo(int taskInstanceId) { + + } }; @BeforeEach diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java index efd12b682a..f76e046c17 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java @@ -76,7 +76,7 @@ public class HiveCliTask extends AbstractRemoteTask { @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { - final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand()); + final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand(), taskCallBack); setExitStatusCode(taskResponse.getExitStatusCode()); setAppIds(taskResponse.getAppIds()); setProcessId(taskResponse.getProcessId()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java index 19083de50f..8628c01ea6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java @@ -142,7 +142,7 @@ public class JavaTask extends AbstractTask { throw new RunTypeNotFoundException("run type is required, but it is null now."); } Preconditions.checkNotNull(command, "command not be null."); - TaskResponse taskResponse = shellCommandExecutor.run(command); + TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack); log.info("java task run result: {}", taskResponse); setExitStatusCode(taskResponse.getExitStatusCode()); setAppIds(taskResponse.getAppIds()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java index 7fdc6b2917..c9193c397a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java @@ -25,6 +25,7 @@ import static org.apache.dolphinscheduler.plugin.task.java.JavaConstants.RUN_TYP import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException; @@ -44,8 +45,17 @@ import org.junit.jupiter.api.Test; public class JavaTaskTest { - private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { + private TaskCallBack taskCallBack = new TaskCallBack() { + @Override + public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) { + + } + + @Override + public void updateTaskInstanceInfo(int taskInstanceId) { + + } }; @Test diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java index 61fc07094b..f8d13b77b3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java @@ -83,7 +83,7 @@ public class JupyterTask extends AbstractRemoteTask { @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { - TaskResponse response = shellCommandExecutor.run(buildCommand()); + TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack); setExitStatusCode(response.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(response.getProcessId()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java index d6a1b8243d..acb54c9ebd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java @@ -102,7 +102,7 @@ public class LinkisTask extends AbstractRemoteTask { try { // construct process String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command); + TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(findTaskId(commandExecuteResult.getResultString())); setProcessId(commandExecuteResult.getProcessId()); @@ -128,7 +128,7 @@ public class LinkisTask extends AbstractRemoteTask { args.add(Constants.STATUS_OPTIONS); args.add(taskId); String command = String.join(Constants.SPACE, args); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command); + TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null); String status = findStatus(commandExecuteResult.getResultString()); LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status); switch (jobStatus) { @@ -161,7 +161,7 @@ public class LinkisTask extends AbstractRemoteTask { args.add(Constants.KILL_OPTIONS); args.add(taskId); String command = String.join(Constants.SPACE, args); - shellCommandExecutor.run(command); + shellCommandExecutor.run(command, null); setExitStatusCode(EXIT_CODE_KILL); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java index 7b252c8fd5..cd2487920a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java @@ -116,7 +116,7 @@ public class MlflowTask extends AbstractTask { try { // construct process String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command); + TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack); int exitCode; if (mlflowParameters.getIsDeployDocker()) { exitCode = checkDockerHealth(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index c3b33d064d..cb2cdfdcdc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -110,7 +110,7 @@ public class PythonTask extends AbstractTask { createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile); String command = buildPythonExecuteCommand(pythonScriptFile); - TaskResponse taskResponse = shellCommandExecutor.run(command); + TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack); setExitStatusCode(taskResponse.getExitStatusCode()); setProcessId(taskResponse.getProcessId()); setVarPool(shellCommandExecutor.getVarPool()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java index d48ae11473..f4f6d43a68 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java @@ -69,7 +69,7 @@ public class PytorchTask extends AbstractTask { public void handle(TaskCallBack taskCallBack) throws TaskException { try { String command = buildPythonExecuteCommand(); - TaskResponse taskResponse = shellCommandExecutor.run(command); + TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack); setExitStatusCode(taskResponse.getExitStatusCode()); setProcessId(taskResponse.getProcessId()); setVarPool(shellCommandExecutor.getVarPool()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index 5b9e1ca2fc..975f8d8106 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -99,7 +99,7 @@ public class SeatunnelTask extends AbstractRemoteTask { try { // construct process String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command); + TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index b5dd987547..5c1ea5991d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -90,7 +90,7 @@ public class ShellTask extends AbstractTask { try { // construct process String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command); + TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setProcessId(commandExecuteResult.getProcessId()); shellParameters.dealOutParam(shellCommandExecutor.getVarPool()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index 8d9e585dc3..8c5cb11377 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.ParagraphResult; @@ -64,8 +65,17 @@ public class ZeppelinTaskTest { private ZeppelinTask zeppelinTask; private ParagraphResult paragraphResult; private NoteResult noteResult; - private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { + private TaskCallBack taskCallBack = new TaskCallBack() { + @Override + public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) { + + } + + @Override + public void updateTaskInstanceInfo(int taskInstanceId) { + + } }; @BeforeEach diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java new file mode 100644 index 0000000000..e50c00df7a --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.message; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; + +import lombok.NonNull; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskUpdatePidMessageSender implements MessageSender { + + @Autowired + private WorkerRpcClient workerRpcClient; + + @Autowired + private WorkerConfig workerConfig; + + @Override + public void sendMessage(TaskUpdatePidCommand message) throws RemotingException { + workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command()); + } + + @Override + public TaskUpdatePidCommand buildMessage(@NonNull TaskExecutionContext taskExecutionContext, + @NonNull String messageReceiverAddress) { + TaskUpdatePidCommand taskUpdatePidCommand = + new TaskUpdatePidCommand(workerConfig.getWorkerAddress(), + messageReceiverAddress, + System.currentTimeMillis()); + taskUpdatePidCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskUpdatePidCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); + taskUpdatePidCommand.setHost(taskExecutionContext.getHost()); + taskUpdatePidCommand.setStartTime(taskExecutionContext.getStartTime()); + return taskUpdatePidCommand; + } + + @Override + public CommandType getMessageType() { + return CommandType.TASK_UPDATE_PID; + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java new file mode 100644 index 0000000000..af3680d0eb --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskUpdatePidAckMessage; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; + +import javax.annotation.Resource; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +import com.google.common.base.Preconditions; +import io.netty.channel.Channel; + +/** + * task execute running ack processor + */ +@Component +@Slf4j +public class TaskUpdatePidAckProcessor implements NettyRequestProcessor { + + @Resource + private MessageRetryRunner messageRetryRunner; + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.TASK_UPDATE_PID_ACK == command.getType(), + String.format("invalid command type : %s", command.getType())); + + TaskUpdatePidAckMessage updatePidAckCommand = JSONUtils.parseObject(command.getBody(), + TaskUpdatePidAckMessage.class); + if (updatePidAckCommand == null) { + log.error("task execute update pid ack command is null"); + return; + } + try { + LogUtils.setTaskInstanceIdMDC(updatePidAckCommand.getTaskInstanceId()); + log.info("task execute update pid ack command : {}", updatePidAckCommand); + + if (updatePidAckCommand.isSuccess()) { + messageRetryRunner.removeRetryMessage(updatePidAckCommand.getTaskInstanceId(), + CommandType.TASK_UPDATE_PID); + } + } finally { + LogUtils.removeTaskInstanceIdMDC(); + } + } + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java index 4380f85485..a6c7063622 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java @@ -26,6 +26,9 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskUpdatePidAckProcessor; + +import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -45,6 +48,9 @@ public class WorkerRpcClient implements AutoCloseable { @Autowired private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor; + @Resource + private TaskUpdatePidAckProcessor taskUpdatePidAckProcessor; + @Autowired private TaskRejectAckProcessor taskRejectAckProcessor; @@ -57,6 +63,8 @@ public class WorkerRpcClient implements AutoCloseable { // we only use the client to handle the ack message, we can optimize this, send ack to the nettyServer. this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_UPDATE_PID_ACK, + taskUpdatePidAckProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor); log.info("Worker rpc client started"); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java index 929e253553..6af593460b 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java @@ -29,9 +29,12 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAck import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskUpdatePidAckProcessor; import java.io.Closeable; +import javax.annotation.Resource; + import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -56,6 +59,9 @@ public class WorkerRpcServer implements Closeable { @Autowired private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor; + @Resource + private TaskUpdatePidAckProcessor taskUpdatePidAckProcessor; + @Autowired private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor; @@ -79,6 +85,8 @@ public class WorkerRpcServer implements Closeable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_UPDATE_PID_ACK, + taskUpdatePidAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java index 9ee176b3f9..fac29f79e0 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java @@ -59,4 +59,18 @@ public class TaskCallbackImpl implements TaskCallBack { taskExecutionContext.setAppIds(applicationInfo.getAppIds()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING); } + + @Override + public void updateTaskInstanceInfo(int taskInstanceId) { + TaskExecutionContext taskExecutionContext = + TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); + if (taskExecutionContext == null) { + log.error("task execution context is empty, taskInstanceId: {}", taskInstanceId); + return; + } + + log.info("send remote taskExecutionContext info {}", taskExecutionContext); + workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_UPDATE_PID); + } + }