Browse Source

Fix master memory leak due to MasterTaskExecuteRunnableDoesn't be removed (#14162)

* Fix master memory leak due to MasterTaskExecuteRunnableDoesn't be removed

* Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
deca0a7ea1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
  2. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java
  3. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
  4. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
  5. 89
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java
  6. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
  7. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
  8. 45
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
  9. 51
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
  10. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
  11. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java

@ -99,6 +99,7 @@ public class MasterTaskDispatchProcessor implements MasterRpcProcessor {
}
} catch (Exception ex) {
log.error("Handle task dispatch request error, command: {}", taskDispatchRequest, ex);
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
sendDispatchFailedResult(channel, message, taskExecutionContext, ex);
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java

@ -26,8 +26,8 @@ import org.apache.dolphinscheduler.remote.command.task.TaskKillRequest;
import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
import org.apache.dolphinscheduler.server.master.runner.MasterTaskExecuteRunnableThreadPool;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import lombok.extern.slf4j.Slf4j;
@ -41,9 +41,6 @@ import io.netty.channel.Channel;
@Component
public class MasterTaskKillProcessor implements MasterRpcProcessor {
@Autowired
private MasterTaskExecuteRunnableThreadPool masterTaskExecuteRunnableThreadPool;
@Autowired
private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue;
@ -60,7 +57,7 @@ public class MasterTaskKillProcessor implements MasterRpcProcessor {
return;
}
MasterTaskExecuteRunnable masterTaskExecuteRunnable =
masterTaskExecuteRunnableThreadPool.getMasterTaskExecuteRunnable(taskInstanceId);
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstanceId);
if (masterTaskExecuteRunnable == null) {
log.error("Cannot find the MasterTaskExecuteRunnable, this task may already been killed");
return;
@ -71,6 +68,9 @@ public class MasterTaskKillProcessor implements MasterRpcProcessor {
.removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable);
} catch (MasterTaskExecuteException e) {
log.error("Cancel MasterTaskExecuteRunnable failed ", e);
} finally {
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
}
}
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java

@ -25,12 +25,11 @@ import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.command.task.TaskPauseRequest;
import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.runner.MasterTaskExecuteRunnableThreadPool;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
@ -39,14 +38,11 @@ import io.netty.channel.Channel;
@Component
public class MasterTaskPauseProcessor implements MasterRpcProcessor {
@Autowired
private MasterTaskExecuteRunnableThreadPool masterTaskExecuteRunnableThreadPool;
@Override
public void process(Channel channel, Message message) {
TaskPauseRequest taskPauseRequest = JSONUtils.parseObject(message.getBody(), TaskPauseRequest.class);
MasterTaskExecuteRunnable masterTaskExecuteRunnable =
masterTaskExecuteRunnableThreadPool.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
if (masterTaskExecuteRunnable == null) {
log.info("Cannot find the MasterTaskExecuteRunnable");
return;

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool;
import java.util.concurrent.atomic.AtomicBoolean;
@ -62,6 +64,7 @@ public class MasterDelayTaskExecuteRunnableDelayQueueLooper extends BaseDaemonTh
final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable =
masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable();
masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has been interrupted, will stop loop");

89
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java

@ -1,89 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@Slf4j
@Component
public class MasterTaskExecuteRunnableThreadPool {
@Autowired
private MasterConfig masterConfig;
private static final Map<Integer, MasterTaskExecuteRunnable> SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
private ListeningExecutorService listeningExecutorService;
public synchronized void start() {
log.info("MasterTaskExecuteRunnableThreadPool starting...");
this.listeningExecutorService = MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
"MasterTaskExecuteRunnableThread", masterConfig.getMasterTaskExecuteThreadPoolSize()));
log.info("MasterTaskExecuteRunnableThreadPool started...");
}
public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) {
ListenableFuture<?> future = listeningExecutorService.submit(masterTaskExecuteRunnable);
Futures.addCallback(future, new MasterTaskExecuteCallback(masterTaskExecuteRunnable),
this.listeningExecutorService);
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(),
masterTaskExecuteRunnable);
}
public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer taskInstanceId) {
return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId);
}
private static class MasterTaskExecuteCallback implements FutureCallback {
private MasterTaskExecuteRunnable masterTaskExecuteRunnable;
public MasterTaskExecuteCallback(MasterTaskExecuteRunnable masterTaskExecuteRunnable) {
this.masterTaskExecuteRunnable = masterTaskExecuteRunnable;
}
@Override
public void onSuccess(Object result) {
log.info("MasterTaskExecuteRunnable execute success, will remove this task");
SUBMITTED_MASTER_TASK_MAP.remove(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
}
@Override
public void onFailure(Throwable t) {
log.info("MasterTaskExecuteRunnable execute failed, will remove this task");
SUBMITTED_MASTER_TASK_MAP.remove(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
}
}
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
@ -54,8 +53,9 @@ public class AsyncTaskCallbackFunctionImpl implements AsyncTaskCallbackFunction
private void executeFinished() {
TaskInstanceLogHeader.printFinalizeTaskHeader();
TaskExecutionContextCacheManager.removeByTaskInstanceId(
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
int taskInstanceId = asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
log.info("Task execute finished, removed the TaskExecutionContext");
asyncMasterDelayTaskExecuteRunnable.sendTaskResult();
log.info(

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java

@ -22,7 +22,6 @@ import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@ -59,7 +58,7 @@ public abstract class MasterTaskExecuteRunnable implements Runnable {
protected void afterThrowing(Throwable throwable) {
try {
cancelTask();
log.info("Get a exception when execute the task, canceled the task");
log.error("Get a exception when execute the task, canceled the task", throwable);
} catch (Exception e) {
log.error("Cancel task failed,", e);
}
@ -68,7 +67,8 @@ public abstract class MasterTaskExecuteRunnable implements Runnable {
log.info(
"Get a exception when execute the task, sent the task execute result to master, the current task execute result is {}",
taskExecutionContext.getCurrentExecutionStatus());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
log.info("Get a exception when execute the task, removed the TaskExecutionContext");
}

45
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.execute;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@UtilityClass
public class MasterTaskExecuteRunnableHolder {
private static final Map<Integer, MasterTaskExecuteRunnable> SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
public void putMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) {
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(),
masterTaskExecuteRunnable);
}
public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer taskInstanceId) {
return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId);
}
public void removeMasterTaskExecuteRunnable(Integer taskInstanceId) {
SUBMITTED_MASTER_TASK_MAP.remove(taskInstanceId);
}
}

51
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@Slf4j
@Component
public class MasterTaskExecuteRunnableThreadPool {
@Autowired
private MasterConfig masterConfig;
private ListeningExecutorService listeningExecutorService;
public synchronized void start() {
log.info("MasterTaskExecuteRunnableThreadPool starting...");
this.listeningExecutorService = MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
"MasterTaskExecuteRunnableThread", masterConfig.getMasterTaskExecuteThreadPoolSize()));
log.info("MasterTaskExecuteRunnableThreadPool started...");
}
public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) {
listeningExecutorService.submit(masterTaskExecuteRunnable);
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java

@ -55,6 +55,8 @@ public class SyncMasterDelayTaskExecuteRunnable extends MasterDelayTaskExecuteRu
"Execute task finished, will send the task execute result to master, the current task execute result is {}",
taskExecutionContext.getCurrentExecutionStatus().name());
closeLogAppender();
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
}
}

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java

@ -19,12 +19,9 @@ package org.apache.dolphinscheduler.server.master.runner.operator;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -51,9 +48,8 @@ public class TaskTimeoutOperator implements TaskOperator {
taskInstance.getName(), taskTimeoutStrategy.name());
return;
}
log.info("TaskInstance: {} timeout, will kill the task instance", taskInstance.getName());
taskInstance.setState(TaskExecutionStatus.FAILURE);
taskInstance.setEndTime(new Date());
taskInstanceDao.upsertTaskInstance(taskInstance);
taskExecuteRunnable.kill();
log.info("TaskInstance: {} timeout, killed the task instance", taskInstance.getName());
}
}

Loading…
Cancel
Save