Browse Source

[Bug] [dolphinscheduler-server] workflow is always running when worker sendResult success but master not received (#7610)

* [Feature][dolphinscheduler-api] parse traceId in http header for Cross system delivery to #7237 (#7238)

* to #7237

* rerun test

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 05aef27 and handle conflicts

* to #7065: fix ExecutorService and schedulerService (#7072)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* [Feature][dolphinscheduler-api] access control of taskDefinition and taskInstance in project to #7081  (#7082)

* to #7081

* fix #7081

* to #7081

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 8ebe060 and handle conflicts

* cherry-pick 1f18444 and handle conflicts

* fix #6807: dolphinscheduler.zookeeper.env_vars - > dolphinscheduler.registry.env_vars (#6808)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: Kirs <acm_master@163.com>

* add default constructor (#6780)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* to #7108 (#7109)

* to #7609

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: Kirs <acm_master@163.com>
2.0.7-release
zwZjut 3 years ago committed by GitHub
parent
commit
6c819ee16d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
  2. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  3. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java

@ -47,13 +47,15 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor {
DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject( DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject(
command.getBody(), DBTaskResponseCommand.class); command.getBody(), DBTaskResponseCommand.class);
if (taskResponseCommand == null){ if (taskResponseCommand == null) {
return; return;
} }
if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
logger.debug("removeResponseCache: taskinstance id:{}", taskResponseCommand.getTaskInstanceId()); logger.debug("removeResponseCache: taskinstance id:{}", taskResponseCommand.getTaskInstanceId());
TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId());
} }
} }

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -125,7 +124,7 @@ public class TaskCallbackService {
* *
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
*/ */
public void remove(int taskInstanceId) { public static void remove(int taskInstanceId) {
REMOTE_CHANNELS.remove(taskInstanceId); REMOTE_CHANNELS.remove(taskInstanceId);
} }
@ -156,7 +155,7 @@ public class TaskCallbackService {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
remove(taskInstanceId); // remove(taskInstanceId);
return; return;
} }
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -99,6 +99,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result); TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
TaskCallbackService.remove(killCommand.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
} }
/** /**

Loading…
Cancel
Save