From 864a90820de8b8470714574dd506f757a73db874 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 16 Sep 2022 15:39:04 +0800 Subject: [PATCH] Fix insertOrUpdate plugin may failed due to concurrent operation (#11471) --- .../api/ApiApplicationServer.java | 5 +-- .../dolphinscheduler/dao/PluginDao.java | 34 +++++++++++++++---- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java index 078506898a..08a73e69e3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java @@ -68,10 +68,7 @@ public class ApiApplicationServer { String paramsJson = PluginParamsTransfer.transferParamsToJson(params); PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson); - int count = pluginDao.addOrUpdatePluginDefine(pluginDefine); - if (count <= 0) { - throw new TaskPluginException("Failed to update task plugin: " + taskPluginName); - } + pluginDao.addOrUpdatePluginDefine(pluginDefine); } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java index 0527cfd705..71e3be70c4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java @@ -21,10 +21,17 @@ import static java.util.Objects.requireNonNull; import org.apache.dolphinscheduler.dao.entity.PluginDefine; import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper; +import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; + +import java.util.Objects; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +@Slf4j @Component public class PluginDao { @@ -44,21 +51,36 @@ public class PluginDao { * add or update plugin define * * @param pluginDefine new pluginDefine + * @return plugin id */ - public int addOrUpdatePluginDefine(PluginDefine pluginDefine) { - requireNonNull(pluginDefine, "pluginDefine is null"); + public int addOrUpdatePluginDefine(@NonNull PluginDefine pluginDefine) { requireNonNull(pluginDefine.getPluginName(), "pluginName is null"); requireNonNull(pluginDefine.getPluginType(), "pluginType is null"); PluginDefine currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType()); if (currPluginDefine == null) { - if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() != null) { - return pluginDefine.getId(); + try { + if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() != null) { + return pluginDefine.getId(); + } + throw new TaskPluginException( + String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s", + pluginDefine.getPluginName(), pluginDefine.getPluginType())); + } catch (TaskPluginException ex) { + throw ex; + } catch (Exception ex) { + log.error("Insert plugin definition error, there may already exist a plugin", ex); + currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), + pluginDefine.getPluginType()); + if (currPluginDefine == null) { + throw new TaskPluginException( + String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s", + pluginDefine.getPluginName(), pluginDefine.getPluginType())); + } } - throw new IllegalStateException("Failed to insert plugin definition"); } - if (!currPluginDefine.getPluginParams().equals(pluginDefine.getPluginParams())) { + if (!Objects.equals(currPluginDefine.getPluginParams(), pluginDefine.getPluginParams())) { currPluginDefine.setUpdateTime(pluginDefine.getUpdateTime()); currPluginDefine.setPluginParams(pluginDefine.getPluginParams()); pluginDefineMapper.updateById(currPluginDefine);