Browse Source

Fix compile error

3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
5a2ea0b76b
  1. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
  2. 84
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
  3. 59
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
  4. 0
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
  5. 59
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
  6. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java

@ -72,8 +72,6 @@ public class TaskInstanceUtils {
target.setDelayTime(source.getDelayTime());
target.setDryRun(source.getDryRun());
target.setTaskGroupId(source.getTaskGroupId());
target.setCpuQuota(source.getCpuQuota());
target.setMemoryMax(source.getMemoryMax());
}
}

84
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private MasterConfig masterConfig;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task reject event error, cannot find related workflow instance from cache, will discard this event");
}
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> {
sendAckToWorker(taskEvent);
return new TaskEventHandleError(
"Handle task reject event error, cannot find the taskInstance from cache, will discord this event");
});
try {
// todo: If the worker submit multiple reject response to master, the task instance may be dispatch multiple,
// we need to control the worker overload by master rather than worker
// if the task resubmit and the worker failover, this task may be dispatch twice?
// todo: we need to clear the taskInstance host and rollback the status to submit.
workflowExecuteRunnable.resubmit(taskInstance.getTaskCode());
sendAckToWorker(taskEvent);
} catch (Exception ex) {
throw new TaskEventHandleError("Handle task reject event error", ex);
}
}
public void sendAckToWorker(TaskEvent taskEvent) {
TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
System.currentTimeMillis());
taskEvent.getChannel().writeAndFlush(taskRejectAckMessage.convert2Command());
}
@Override
public TaskEventType getHandleEventType() {
return TaskEventType.WORKER_REJECT;
}
}

59
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskRejectAckCommand extends BaseCommand {
private int taskInstanceId;
private int status;
public TaskRejectAckCommand(int status,
int taskInstanceId,
String messageSenderAddress,
String messageReceiverAddress,
long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
this.status = status;
this.taskInstanceId = taskInstanceId;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_REJECT_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}

0
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java

59
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.message;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskRejectMessageSender implements MessageSender<TaskRejectCommand> {
@Autowired
private WorkerRpcClient workerRpcClient;
@Autowired
private WorkerConfig workerConfig;
@Override
public void sendMessage(TaskRejectCommand message) throws RemotingException {
workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
}
public TaskRejectCommand buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) {
TaskRejectCommand taskRejectMessage = new TaskRejectCommand(workerConfig.getWorkerAddress(),
masterAddress,
System.currentTimeMillis());
taskRejectMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskRejectMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskRejectMessage.setHost(taskExecutionContext.getHost());
return taskRejectMessage;
}
@Override
public CommandType getMessageType() {
return CommandType.TASK_REJECT;
}
}

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import java.io.Closeable;
@ -47,9 +46,6 @@ public class WorkerRpcServer implements Closeable {
@Autowired
private TaskKillProcessor taskKillProcessor;
@Autowired
private TaskRejectAckProcessor taskRejectAckProcessor;
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@ -76,7 +72,6 @@ public class WorkerRpcServer implements Closeable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);

Loading…
Cancel
Save