From bc629f8f58ba9ab5b90c9026ecadfdbbd0cbdead Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sun, 18 Sep 2022 09:37:57 +0800 Subject: [PATCH] [3.0.1-prepare]cherry-pick Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469) (#12013) * Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469) (cherry picked from commit 4d13a5104b9a4ab16e16253ad2936107031e59e4) --- dolphinscheduler-bom/pom.xml | 2 +- .../pom.xml | 2 -- dolphinscheduler-datasource-plugin/pom.xml | 7 +++++ .../runner/task/CommonTaskProcessor.java | 3 +-- .../service/process/ProcessServiceImpl.java | 26 +++++++++++++------ pom.xml | 1 + tools/dependencies/known-dependencies.txt | 1 - 7 files changed, 28 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index a35d3722a1..d0d2dff4fd 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -22,7 +22,7 @@ org.apache.dolphinscheduler dolphinscheduler - dev-SNAPSHOT + 3.0.1-SNAPSHOT dolphinscheduler-bom ${project.artifactId} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-clickhouse/pom.xml b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-clickhouse/pom.xml index 840fe726c9..189be954b8 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-clickhouse/pom.xml +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-clickhouse/pom.xml @@ -65,8 +65,6 @@ ${clickhouse.jdbc.version} - - org.lz4 lz4-java diff --git a/dolphinscheduler-datasource-plugin/pom.xml b/dolphinscheduler-datasource-plugin/pom.xml index 4e172e6e5a..e43a3a9d84 100644 --- a/dolphinscheduler-datasource-plugin/pom.xml +++ b/dolphinscheduler-datasource-plugin/pom.xml @@ -48,6 +48,13 @@ dolphinscheduler-datasource-hive ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-bom + ${project.version} + pom + import + diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 679ffc2920..1e1506a9fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.task; +import com.google.auto.service.AutoService; import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -34,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import java.util.Date; -import com.google.auto.service.AutoService; - /** * common task processor */ diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index d91f630633..8d210ac33b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -136,7 +136,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.math.NumberUtils; -import java.sql.SQLIntegrityConstraintViolationException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -2903,8 +2902,10 @@ public class ProcessServiceImpl implements ProcessService { //try to get taskGroup int count = taskGroupMapper.selectAvailableCountById(groupId); if (count == 1 && robTaskGroupResource(taskGroupQueue)) { + logger.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId); return true; } + logger.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId); this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); return false; } @@ -2919,11 +2920,13 @@ public class ProcessServiceImpl implements ProcessService { taskGroupQueue.getId(), TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (affectedCount > 0) { + logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId()); taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); this.taskGroupQueueMapper.updateById(taskGroupQueue); this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); return true; } + logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId()); return false; } @@ -2945,26 +2948,31 @@ public class ProcessServiceImpl implements ProcessService { TaskGroup taskGroup; TaskGroupQueue thisTaskGroupQueue; + logger.info("Begin to release task group: {}", taskInstance.getTaskGroupId()); try { do { taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); if (taskGroup == null) { + logger.error("The taskGroup is null, taskGroupId: {}", taskInstance.getTaskGroupId()); return null; } thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { + logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId()); return null; } } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() - && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), - taskGroup.getUseSize(), - thisTaskGroupQueue.getId(), - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1); + && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), + taskGroup.getUseSize(), + thisTaskGroupQueue.getId(), + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1); } catch (Exception e) { logger.error("release the task group error", e); return null; } - logger.info("updateTask:{}", taskInstance.getName()); + logger.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId()); + + logger.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId()); changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); TaskGroupQueue taskGroupQueue; do { @@ -2973,11 +2981,13 @@ public class ProcessServiceImpl implements ProcessService { Flag.NO.getCode(), Flag.NO.getCode()); if (taskGroupQueue == null) { + logger.info("The taskGroupQueue is null, taskGroup: {}", taskGroup.getId()); return null; } } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), - Flag.YES.getCode(), - taskGroupQueue.getId()) != 1); + Flag.YES.getCode(), + taskGroupQueue.getId()) != 1); + logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId()); return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); } diff --git a/pom.xml b/pom.xml index 0d647b74ba..02e2261aca 100644 --- a/pom.xml +++ b/pom.xml @@ -1240,6 +1240,7 @@ + dolphinscheduler-bom dolphinscheduler-alert dolphinscheduler-spi dolphinscheduler-registry diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 9e2e07043b..1e35f78b75 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -32,7 +32,6 @@ commons-compiler-3.1.6.jar commons-compress-1.21.jar commons-configuration-1.10.jar commons-daemon-1.0.13.jar -commons-beanutils-1.9.4.jar commons-dbcp-1.4.jar commons-httpclient-3.0.1.jar commons-io-2.11.0.jar