diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java index fa3bfec0ca..7351f972b4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java @@ -72,8 +72,6 @@ public class TaskInstanceUtils { target.setDelayTime(source.getDelayTime()); target.setDryRun(source.getDryRun()); target.setTaskGroupId(source.getTaskGroupId()); - target.setCpuQuota(source.getCpuQuota()); - target.setMemoryMax(source.getMemoryMax()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java deleted file mode 100644 index ac4dab50f2..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.event; - -import org.apache.dolphinscheduler.common.enums.TaskEventType; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand; -import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class TaskRejectByWorkerEventHandler implements TaskEventHandler { - - @Autowired - private ProcessInstanceExecCacheManager processInstanceExecCacheManager; - - @Autowired - private MasterConfig masterConfig; - - @Override - public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { - int taskInstanceId = taskEvent.getTaskInstanceId(); - int processInstanceId = taskEvent.getProcessInstanceId(); - - WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId( - processInstanceId); - if (workflowExecuteRunnable == null) { - sendAckToWorker(taskEvent); - throw new TaskEventHandleError( - "Handle task reject event error, cannot find related workflow instance from cache, will discard this event"); - } - TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> { - sendAckToWorker(taskEvent); - return new TaskEventHandleError( - "Handle task reject event error, cannot find the taskInstance from cache, will discord this event"); - }); - try { - // todo: If the worker submit multiple reject response to master, the task instance may be dispatch multiple, - // we need to control the worker overload by master rather than worker - // if the task resubmit and the worker failover, this task may be dispatch twice? - // todo: we need to clear the taskInstance host and rollback the status to submit. - workflowExecuteRunnable.resubmit(taskInstance.getTaskCode()); - sendAckToWorker(taskEvent); - } catch (Exception ex) { - throw new TaskEventHandleError("Handle task reject event error", ex); - } - - } - - public void sendAckToWorker(TaskEvent taskEvent) { - TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(), - taskEvent.getTaskInstanceId(), - masterConfig.getMasterAddress(), - taskEvent.getWorkerAddress(), - System.currentTimeMillis()); - taskEvent.getChannel().writeAndFlush(taskRejectAckMessage.convert2Command()); - } - - @Override - public TaskEventType getHandleEventType() { - return TaskEventType.WORKER_REJECT; - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java deleted file mode 100644 index aed647bb4c..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.remote.command; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import lombok.ToString; - -@Data -@NoArgsConstructor -@ToString(callSuper = true) -@EqualsAndHashCode(callSuper = true) -public class TaskRejectAckCommand extends BaseCommand { - - private int taskInstanceId; - private int status; - - public TaskRejectAckCommand(int status, - int taskInstanceId, - String messageSenderAddress, - String messageReceiverAddress, - long messageSendTime) { - super(messageSenderAddress, messageReceiverAddress, messageSendTime); - this.status = status; - this.taskInstanceId = taskInstanceId; - } - - /** - * package response command - * - * @return command - */ - public Command convert2Command() { - Command command = new Command(); - command.setType(CommandType.TASK_REJECT_ACK); - byte[] body = JSONUtils.toJsonByteArray(this); - command.setBody(body); - return command; - } - -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java deleted file mode 100644 index d50c5d8997..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.message; - -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskRejectCommand; -import org.apache.dolphinscheduler.remote.exceptions.RemotingException; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class TaskRejectMessageSender implements MessageSender { - - @Autowired - private WorkerRpcClient workerRpcClient; - - @Autowired - private WorkerConfig workerConfig; - - @Override - public void sendMessage(TaskRejectCommand message) throws RemotingException { - workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command()); - } - - public TaskRejectCommand buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) { - TaskRejectCommand taskRejectMessage = new TaskRejectCommand(workerConfig.getWorkerAddress(), - masterAddress, - System.currentTimeMillis()); - taskRejectMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskRejectMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); - taskRejectMessage.setHost(taskExecutionContext.getHost()); - return taskRejectMessage; - } - - @Override - public CommandType getMessageType() { - return CommandType.TASK_REJECT; - } -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java index e599046c57..8fcf6966f8 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java @@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; -import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor; import java.io.Closeable; @@ -47,9 +46,6 @@ public class WorkerRpcServer implements Closeable { @Autowired private TaskKillProcessor taskKillProcessor; - @Autowired - private TaskRejectAckProcessor taskRejectAckProcessor; - @Autowired private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor; @@ -76,7 +72,6 @@ public class WorkerRpcServer implements Closeable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); // logger server this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);