Browse Source

[2.0.3][Bug] [dolphinscheduler-remote] process is always running: netty writeAndFlush without retry when failed leads to worker response to master failed (#8081)

* to #8046

* to #8046

* to #8046

* to #8046

* to #8046

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
2.0.7-release
zwZjut 3 years ago committed by GitHub
parent
commit
d2bd95d453
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  2. 71
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java
  3. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
  4. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  5. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
  6. 60
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java
  7. 24
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  8. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -98,6 +98,11 @@ public enum CommandType {
*/
TASK_KILL_RESPONSE,
/**
* kill task response ack
*/
TASK_KILL_RESPONSE_ACK,
/**
* HEART_BEAT
*/

71
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
public class TaskKillAckCommand implements Serializable {
private int taskInstanceId;
private int status;
public TaskKillAckCommand() {
super();
}
public TaskKillAckCommand(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_KILL_RESPONSE_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "KillTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
}
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
@ -157,6 +158,8 @@ public class TaskResponsePersistThread implements Runnable {
logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId());
}
}
TaskKillAckCommand taskKillAckCommand = new TaskKillAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskKillAckCommand.convert2Command());
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService, taskPluginManager));
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK,new TaskKillAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java vendored

@ -38,6 +38,7 @@ public class ResponceCache {
private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
private final Map<Integer,Command> killResponseCache = new ConcurrentHashMap<>();
/**
@ -54,6 +55,9 @@ public class ResponceCache {
case RESULT:
responseCache.put(taskInstanceId,command);
break;
case ACTION_STOP:
killResponseCache.put(taskInstanceId,command);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
@ -68,6 +72,19 @@ public class ResponceCache {
ackCache.remove(taskInstanceId);
}
/**
* remove kill response cache
*
* @param taskInstanceId taskInstanceId
*/
public void removeKillResponseCache(Integer taskInstanceId) {
killResponseCache.remove(taskInstanceId);
}
public Map<Integer, Command> getKillResponseCache() {
return killResponseCache;
}
/**
* remove reponse cache
* @param taskInstanceId taskInstanceId

60
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
public class TaskKillAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskKillAckCommand taskKillAckCommand = JSONUtils.parseObject(
command.getBody(), TaskKillAckCommand.class);
if (taskKillAckCommand == null) {
return;
}
if (taskKillAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponceCache.get().removeKillResponseCache(taskKillAckCommand.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillAckCommand.getTaskInstanceId());
logger.debug("removeKillResponseCache: taskinstance id:{}", taskKillAckCommand.getTaskInstanceId());
TaskCallbackService.remove(taskKillAckCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskKillAckCommand.getTaskInstanceId());
}
}
}

24
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -31,6 +32,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@ -93,16 +95,19 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
logger.info("received kill command : {}", killCommand);
Pair<Boolean, List<String>> result = doKill(killCommand);
taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
Pair<Boolean, List<String>> result = doKill(killCommand);
TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
if (taskRequest == null) {
return;
}
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskRequest, result);
ResponceCache.get().cache(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command(), Event.ACTION_STOP);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
TaskCallbackService.remove(killCommand.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
}
/**
@ -116,7 +121,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
int taskInstanceId = killCommand.getTaskInstanceId();
TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class);
try {
Integer processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) {
@ -161,15 +165,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param result exe result
* @return build TaskKillResponseCommand
*/
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskRequest taskRequest,
Pair<Boolean, List<String>> result) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
taskKillResponseCommand.setAppIds(result.getRight());
TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
if (taskRequest == null) {
return taskKillResponseCommand;
}
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class);
if (taskExecutionContext != null) {
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

@ -88,6 +88,14 @@ public class RetryReportTaskStatusThread implements Runnable {
taskCallbackService.sendResult(taskInstanceId,responseCommand);
}
}
if (!responceCache.getKillResponseCache().isEmpty()) {
Map<Integer, Command> killResponseCache = responceCache.getKillResponseCache();
for (Map.Entry<Integer, Command> entry : killResponseCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command killResponseCommand = entry.getValue();
taskCallbackService.sendResult(taskInstanceId, killResponseCommand);
}
}
}catch (Exception e){
logger.warn("retry report task status error", e);
}

Loading…
Cancel
Save