Browse Source

[improve-#13201] update pid during running (#13206)

3.2.0-release
fuchanghai 2 years ago committed by GitHub
parent
commit
7bf3e3cdd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
  2. 103
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java
  3. 66
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java
  4. 13
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  5. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
  6. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  7. 55
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java
  8. 83
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java
  9. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  10. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
  11. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
  12. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
  13. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  14. 23
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
  15. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
  16. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
  17. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
  18. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
  19. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
  20. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
  21. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
  22. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
  23. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
  24. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
  25. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
  26. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  27. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
  28. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
  29. 65
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java
  30. 71
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java
  31. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
  32. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
  33. 14
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java

@ -22,6 +22,7 @@ public enum TaskEventType {
DELAY, DELAY,
RUNNING, RUNNING,
RESULT, RESULT,
UPDATE_PID,
WORKER_REJECT, WORKER_REJECT,
CACHE, CACHE,
} }

103
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.remote.command.TaskUpdatePidAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskUpdatePidEventHandler implements TaskEventHandler {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private TaskInstanceDao taskInstanceDao;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task running event error, cannot find related workflow instance from cache, will discard this event");
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
if (!taskInstanceOptional.isPresent()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle running event error, cannot find the taskInstance from cache, will discord this event");
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().isFinished()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task running event error, this task instance is already finished, this event is delay, will discard this event");
}
TaskInstance oldTaskInstance = new TaskInstance();
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
try {
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setPid(taskEvent.getProcessId());
if (!taskInstanceDao.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed");
}
sendAckToWorker(taskEvent);
} catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
if (ex instanceof TaskEventHandleError) {
throw ex;
}
throw new TaskEventHandleError("Handle task update pid event error, update taskInstance to db failed", ex);
}
}
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskUpdatePidAckMessage taskUpdatePidAckMessage =
new TaskUpdatePidAckMessage(true, taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskUpdatePidAckMessage.convert2Command());
}
@Override
public TaskEventType getHandleEventType() {
return TaskEventType.UPDATE_PID;
}
}

66
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskUpdatePidProcessor.java

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* task execute running processor
*/
@Component
@Slf4j
public class TaskUpdatePidProcessor implements NettyRequestProcessor {
@Autowired
private TaskEventService taskEventService;
/**
* task ack process
*
* @param channel channel channel
* @param command command TaskExecuteAckCommand
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_UPDATE_PID == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskUpdatePidCommand taskUpdatePidCommand =
JSONUtils.parseObject(command.getBody(), TaskUpdatePidCommand.class);
log.info("taskUpdatePidCommand: {}", taskUpdatePidCommand);
TaskEvent taskEvent = TaskEvent.newUpdatePidEvent(taskUpdatePidCommand,
channel,
taskUpdatePidCommand.getMessageSenderAddress());
taskEventService.addEvent(taskEvent);
}
}

13
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand; import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
import java.util.Date; import java.util.Date;
@ -161,4 +162,16 @@ public class TaskEvent {
event.setEvent(TaskEventType.CACHE); event.setEvent(TaskEventType.CACHE);
return event; return event;
} }
public static TaskEvent newUpdatePidEvent(TaskUpdatePidCommand command, Channel channel, String workerAddress) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
event.setStartTime(DateUtils.timeStampToDate(command.getStartTime()));
event.setLogPath(command.getLogPath());
event.setChannel(channel);
event.setWorkerAddress(workerAddress);
event.setEvent(TaskEventType.UPDATE_PID);
return event;
}
} }

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningPro
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteStartProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskExecuteStartProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskUpdatePidProcessor;
import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor; import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -67,6 +68,9 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired @Autowired
private TaskKillResponseProcessor taskKillResponseProcessor; private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
private TaskUpdatePidProcessor updatePidProcessor;
@Autowired @Autowired
private TaskRecallProcessor taskRecallProcessor; private TaskRecallProcessor taskRecallProcessor;
@ -86,6 +90,7 @@ public class MasterRPCServer implements AutoCloseable {
serverConfig.setListenPort(masterConfig.getListenPort()); serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_UPDATE_PID, updatePidProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -131,6 +131,16 @@ public enum CommandType {
*/ */
WORKFLOW_EXECUTING_DATA_REQUEST, WORKFLOW_EXECUTING_DATA_REQUEST,
/**
* update taskInstance's PID request
*/
TASK_UPDATE_PID,
/**
* update taskInstance's PID response ack, from master to worker
*/
TASK_UPDATE_PID_ACK,
/** /**
* workflow executing data response, from master to api * workflow executing data response, from master to api
*/ */

55
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidAckMessage.java

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* task execute running ack command
* from master to worker
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TaskUpdatePidAckMessage implements Serializable {
private boolean success;
private int taskInstanceId;
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_UPDATE_PID_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}

83
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskUpdatePidCommand.java

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* Task running message, means the task is running in worker.
*/
@Data
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskUpdatePidCommand extends BaseCommand {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* process instance id
*/
private int processInstanceId;
/**
* startTime
*/
private long startTime;
/**
* host
*/
private String host;
/**
* logPath
*/
private String logPath;
/**
* processId
*/
private int processId;
public TaskUpdatePidCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
}
/**
* package request command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_UPDATE_PID);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}

7
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -186,7 +186,7 @@ public abstract class AbstractCommandExecutor {
command.add(String.format("--uid=%s", taskRequest.getTenantCode())); command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
} }
public TaskResponse run(String execCommand) throws IOException, InterruptedException { public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws IOException, InterruptedException {
TaskResponse result = new TaskResponse(); TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId(); int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
@ -228,6 +228,11 @@ public abstract class AbstractCommandExecutor {
// if timeout occurs, exit directly // if timeout occurs, exit directly
long remainTime = getRemainTime(); long remainTime = getRemainTime();
// update pid before waiting for the run to finish
if (null != taskCallBack) {
taskCallBack.updateTaskInstanceInfo(taskInstanceId);
}
// waiting for the run to finish // waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -60,7 +60,7 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
public void handle(TaskCallBack taskCallBack) throws TaskException { public void handle(TaskCallBack taskCallBack) throws TaskException {
try { try {
// SHELL task exit code // SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand()); TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
setExitStatusCode(response.getExitStatusCode()); setExitStatusCode(response.getExitStatusCode());
// set appIds // set appIds
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java

@ -22,4 +22,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
public interface TaskCallBack { public interface TaskCallBack {
public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo); public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo);
public void updateTaskInstanceInfo(int taskInstanceId);
} }

2
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java

@ -115,7 +115,7 @@ public class ChunJunTask extends AbstractTask {
String jsonFilePath = buildChunJunJsonFile(paramsMap); String jsonFilePath = buildChunJunJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());

2
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -159,7 +159,7 @@ public class DataxTask extends AbstractTask {
// run datax processDataSourceService // run datax processDataSourceService
String jsonFilePath = buildDataxJsonFile(paramsMap); String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());

23
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskRunStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskRunStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
@ -67,7 +68,17 @@ public class DataxTaskTest {
private DataxTask dataxTask; private DataxTask dataxTask;
private final TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { private final TaskCallBack taskCallBack = new TaskCallBack() {
@Override
public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) {
}
@Override
public void updateTaskInstanceInfo(int taskInstanceId) {
}
}; };
@BeforeEach @BeforeEach
@ -99,7 +110,7 @@ public class DataxTaskTest {
taskResponse.setStatus(TaskRunStatus.SUCCESS); taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0); taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1); taskResponse.setProcessId(1);
when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse); when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse);
dataxTask.handle(taskCallBack); dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode()); Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@ -140,7 +151,7 @@ public class DataxTaskTest {
taskResponse.setStatus(TaskRunStatus.SUCCESS); taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0); taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1); taskResponse.setProcessId(1);
when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse); when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse);
dataxTask.handle(taskCallBack); dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode()); Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@ -176,7 +187,8 @@ public class DataxTaskTest {
shellCommandExecutorFiled.setAccessible(true); shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
when(shellCommandExecutor.run(anyString())).thenThrow(new InterruptedException("Command execution failed")); when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
.thenThrow(new InterruptedException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack)); Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
} }
@ -194,7 +206,8 @@ public class DataxTaskTest {
shellCommandExecutorFiled.setAccessible(true); shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
when(shellCommandExecutor.run(anyString())).thenThrow(new IOException("Command execution failed")); when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
.thenThrow(new IOException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack)); Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
} }

2
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java

@ -79,7 +79,7 @@ public class DvcTask extends AbstractTask {
try { try {
// construct process // construct process
String command = buildCommand(); String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command); TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool()); parameters.dealOutParam(shellCommandExecutor.getVarPool());

12
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -74,8 +75,17 @@ public class EmrAddStepsTaskTest {
private EmrAddStepsTask emrAddStepsTask; private EmrAddStepsTask emrAddStepsTask;
private AmazonElasticMapReduce emrClient; private AmazonElasticMapReduce emrClient;
private Step step; private Step step;
private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { private TaskCallBack taskCallBack = new TaskCallBack() {
@Override
public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) {
}
@Override
public void updateTaskInstanceInfo(int taskInstanceId) {
}
}; };
@BeforeEach @BeforeEach

12
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -96,8 +97,17 @@ public class EmrJobFlowTaskTest {
private EmrJobFlowTask emrJobFlowTask; private EmrJobFlowTask emrJobFlowTask;
private AmazonElasticMapReduce emrClient; private AmazonElasticMapReduce emrClient;
private Cluster cluster; private Cluster cluster;
private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { private TaskCallBack taskCallBack = new TaskCallBack() {
@Override
public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) {
}
@Override
public void updateTaskInstanceInfo(int taskInstanceId) {
}
}; };
@BeforeEach @BeforeEach

2
dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java

@ -76,7 +76,7 @@ public class HiveCliTask extends AbstractRemoteTask {
@Override @Override
public void handle(TaskCallBack taskCallBack) throws TaskException { public void handle(TaskCallBack taskCallBack) throws TaskException {
try { try {
final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand()); final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand(), taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode()); setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds()); setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId()); setProcessId(taskResponse.getProcessId());

2
dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java

@ -142,7 +142,7 @@ public class JavaTask extends AbstractTask {
throw new RunTypeNotFoundException("run type is required, but it is null now."); throw new RunTypeNotFoundException("run type is required, but it is null now.");
} }
Preconditions.checkNotNull(command, "command not be null."); Preconditions.checkNotNull(command, "command not be null.");
TaskResponse taskResponse = shellCommandExecutor.run(command); TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
log.info("java task run result: {}", taskResponse); log.info("java task run result: {}", taskResponse);
setExitStatusCode(taskResponse.getExitStatusCode()); setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds()); setAppIds(taskResponse.getAppIds());

12
dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java

@ -25,6 +25,7 @@ import static org.apache.dolphinscheduler.plugin.task.java.JavaConstants.RUN_TYP
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException; import org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException;
@ -44,8 +45,17 @@ import org.junit.jupiter.api.Test;
public class JavaTaskTest { public class JavaTaskTest {
private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { private TaskCallBack taskCallBack = new TaskCallBack() {
@Override
public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) {
}
@Override
public void updateTaskInstanceInfo(int taskInstanceId) {
}
}; };
@Test @Test

2
dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java

@ -83,7 +83,7 @@ public class JupyterTask extends AbstractRemoteTask {
@Override @Override
public void handle(TaskCallBack taskCallBack) throws TaskException { public void handle(TaskCallBack taskCallBack) throws TaskException {
try { try {
TaskResponse response = shellCommandExecutor.run(buildCommand()); TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
setExitStatusCode(response.getExitStatusCode()); setExitStatusCode(response.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(response.getProcessId()); setProcessId(response.getProcessId());

6
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java

@ -102,7 +102,7 @@ public class LinkisTask extends AbstractRemoteTask {
try { try {
// construct process // construct process
String command = buildCommand(); String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command); TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null);
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(findTaskId(commandExecuteResult.getResultString())); setAppIds(findTaskId(commandExecuteResult.getResultString()));
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
@ -128,7 +128,7 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.STATUS_OPTIONS); args.add(Constants.STATUS_OPTIONS);
args.add(taskId); args.add(taskId);
String command = String.join(Constants.SPACE, args); String command = String.join(Constants.SPACE, args);
TaskResponse commandExecuteResult = shellCommandExecutor.run(command); TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null);
String status = findStatus(commandExecuteResult.getResultString()); String status = findStatus(commandExecuteResult.getResultString());
LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status); LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status);
switch (jobStatus) { switch (jobStatus) {
@ -161,7 +161,7 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.KILL_OPTIONS); args.add(Constants.KILL_OPTIONS);
args.add(taskId); args.add(taskId);
String command = String.join(Constants.SPACE, args); String command = String.join(Constants.SPACE, args);
shellCommandExecutor.run(command); shellCommandExecutor.run(command, null);
setExitStatusCode(EXIT_CODE_KILL); setExitStatusCode(EXIT_CODE_KILL);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();

2
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java

@ -116,7 +116,7 @@ public class MlflowTask extends AbstractTask {
try { try {
// construct process // construct process
String command = buildCommand(); String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command); TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
int exitCode; int exitCode;
if (mlflowParameters.getIsDeployDocker()) { if (mlflowParameters.getIsDeployDocker()) {
exitCode = checkDockerHealth(); exitCode = checkDockerHealth();

2
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@ -110,7 +110,7 @@ public class PythonTask extends AbstractTask {
createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile); createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile);
String command = buildPythonExecuteCommand(pythonScriptFile); String command = buildPythonExecuteCommand(pythonScriptFile);
TaskResponse taskResponse = shellCommandExecutor.run(command); TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode()); setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId()); setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool()); setVarPool(shellCommandExecutor.getVarPool());

2
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java

@ -69,7 +69,7 @@ public class PytorchTask extends AbstractTask {
public void handle(TaskCallBack taskCallBack) throws TaskException { public void handle(TaskCallBack taskCallBack) throws TaskException {
try { try {
String command = buildPythonExecuteCommand(); String command = buildPythonExecuteCommand();
TaskResponse taskResponse = shellCommandExecutor.run(command); TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode()); setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId()); setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool()); setVarPool(shellCommandExecutor.getVarPool());

2
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -99,7 +99,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
try { try {
// construct process // construct process
String command = buildCommand(); String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command); TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());

2
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@ -90,7 +90,7 @@ public class ShellTask extends AbstractTask {
try { try {
// construct process // construct process
String command = buildCommand(); String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command); TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool()); shellParameters.dealOutParam(shellCommandExecutor.getVarPool());

12
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult; import org.apache.zeppelin.client.ParagraphResult;
@ -64,8 +65,17 @@ public class ZeppelinTaskTest {
private ZeppelinTask zeppelinTask; private ZeppelinTask zeppelinTask;
private ParagraphResult paragraphResult; private ParagraphResult paragraphResult;
private NoteResult noteResult; private NoteResult noteResult;
private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> { private TaskCallBack taskCallBack = new TaskCallBack() {
@Override
public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) {
}
@Override
public void updateTaskInstanceInfo(int taskInstanceId) {
}
}; };
@BeforeEach @BeforeEach

65
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskUpdatePidMessageSender.java

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.message;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskUpdatePidCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import lombok.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskUpdatePidMessageSender implements MessageSender<TaskUpdatePidCommand> {
@Autowired
private WorkerRpcClient workerRpcClient;
@Autowired
private WorkerConfig workerConfig;
@Override
public void sendMessage(TaskUpdatePidCommand message) throws RemotingException {
workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
}
@Override
public TaskUpdatePidCommand buildMessage(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String messageReceiverAddress) {
TaskUpdatePidCommand taskUpdatePidCommand =
new TaskUpdatePidCommand(workerConfig.getWorkerAddress(),
messageReceiverAddress,
System.currentTimeMillis());
taskUpdatePidCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskUpdatePidCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskUpdatePidCommand.setHost(taskExecutionContext.getHost());
taskUpdatePidCommand.setStartTime(taskExecutionContext.getStartTime());
return taskUpdatePidCommand;
}
@Override
public CommandType getMessageType() {
return CommandType.TASK_UPDATE_PID;
}
}

71
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskUpdatePidAckProcessor.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskUpdatePidAckMessage;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* task execute running ack processor
*/
@Component
@Slf4j
public class TaskUpdatePidAckProcessor implements NettyRequestProcessor {
@Resource
private MessageRetryRunner messageRetryRunner;
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_UPDATE_PID_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskUpdatePidAckMessage updatePidAckCommand = JSONUtils.parseObject(command.getBody(),
TaskUpdatePidAckMessage.class);
if (updatePidAckCommand == null) {
log.error("task execute update pid ack command is null");
return;
}
try {
LogUtils.setTaskInstanceIdMDC(updatePidAckCommand.getTaskInstanceId());
log.info("task execute update pid ack command : {}", updatePidAckCommand);
if (updatePidAckCommand.isSuccess()) {
messageRetryRunner.removeRetryMessage(updatePidAckCommand.getTaskInstanceId(),
CommandType.TASK_UPDATE_PID);
}
} finally {
LogUtils.removeTaskInstanceIdMDC();
}
}
}

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java

@ -26,6 +26,9 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskUpdatePidAckProcessor;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -45,6 +48,9 @@ public class WorkerRpcClient implements AutoCloseable {
@Autowired @Autowired
private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor; private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
@Resource
private TaskUpdatePidAckProcessor taskUpdatePidAckProcessor;
@Autowired @Autowired
private TaskRejectAckProcessor taskRejectAckProcessor; private TaskRejectAckProcessor taskRejectAckProcessor;
@ -57,6 +63,8 @@ public class WorkerRpcClient implements AutoCloseable {
// we only use the client to handle the ack message, we can optimize this, send ack to the nettyServer. // we only use the client to handle the ack message, we can optimize this, send ack to the nettyServer.
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
taskExecuteRunningAckProcessor); taskExecuteRunningAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_UPDATE_PID_ACK,
taskUpdatePidAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
log.info("Worker rpc client started"); log.info("Worker rpc client started");

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

@ -29,9 +29,12 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAck
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskUpdatePidAckProcessor;
import java.io.Closeable; import java.io.Closeable;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -56,6 +59,9 @@ public class WorkerRpcServer implements Closeable {
@Autowired @Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor; private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Resource
private TaskUpdatePidAckProcessor taskUpdatePidAckProcessor;
@Autowired @Autowired
private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor; private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
@ -79,6 +85,8 @@ public class WorkerRpcServer implements Closeable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
taskExecuteRunningAckProcessor); taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_UPDATE_PID_ACK,
taskUpdatePidAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);

14
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java

@ -59,4 +59,18 @@ public class TaskCallbackImpl implements TaskCallBack {
taskExecutionContext.setAppIds(applicationInfo.getAppIds()); taskExecutionContext.setAppIds(applicationInfo.getAppIds());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING);
} }
@Override
public void updateTaskInstanceInfo(int taskInstanceId) {
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
log.error("task execution context is empty, taskInstanceId: {}", taskInstanceId);
return;
}
log.info("send remote taskExecutionContext info {}", taskExecutionContext);
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_UPDATE_PID);
}
} }

Loading…
Cancel
Save