Browse Source

Fix insertOrUpdate plugin may failed due to concurrent operation (#11471)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
864a90820d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
  2. 34
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

@ -68,10 +68,7 @@ public class ApiApplicationServer {
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);
int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
if (count <= 0) {
throw new TaskPluginException("Failed to update task plugin: " + taskPluginName);
}
pluginDao.addOrUpdatePluginDefine(pluginDefine);
}
}
}

34
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java

@ -21,10 +21,17 @@ import static java.util.Objects.requireNonNull;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import java.util.Objects;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class PluginDao {
@ -44,21 +51,36 @@ public class PluginDao {
* add or update plugin define
*
* @param pluginDefine new pluginDefine
* @return plugin id
*/
public int addOrUpdatePluginDefine(PluginDefine pluginDefine) {
requireNonNull(pluginDefine, "pluginDefine is null");
public int addOrUpdatePluginDefine(@NonNull PluginDefine pluginDefine) {
requireNonNull(pluginDefine.getPluginName(), "pluginName is null");
requireNonNull(pluginDefine.getPluginType(), "pluginType is null");
PluginDefine currPluginDefine =
pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
if (currPluginDefine == null) {
if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() != null) {
return pluginDefine.getId();
try {
if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() != null) {
return pluginDefine.getId();
}
throw new TaskPluginException(
String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
pluginDefine.getPluginName(), pluginDefine.getPluginType()));
} catch (TaskPluginException ex) {
throw ex;
} catch (Exception ex) {
log.error("Insert plugin definition error, there may already exist a plugin", ex);
currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(),
pluginDefine.getPluginType());
if (currPluginDefine == null) {
throw new TaskPluginException(
String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
pluginDefine.getPluginName(), pluginDefine.getPluginType()));
}
}
throw new IllegalStateException("Failed to insert plugin definition");
}
if (!currPluginDefine.getPluginParams().equals(pluginDefine.getPluginParams())) {
if (!Objects.equals(currPluginDefine.getPluginParams(), pluginDefine.getPluginParams())) {
currPluginDefine.setUpdateTime(pluginDefine.getUpdateTime());
currPluginDefine.setPluginParams(pluginDefine.getPluginParams());
pluginDefineMapper.updateById(currPluginDefine);

Loading…
Cancel
Save