Browse Source

Fix insertOrUpdate plugin may failed due to concurrent operation (#11471) (#11996)

(cherry picked from commit 864a90820d)
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
8eadf5e5aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
  2. 34
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
@ -68,10 +67,7 @@ public class ApiApplicationServer {
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);
int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
if (count <= 0) {
throw new TaskPluginException("Failed to update task plugin: " + taskPluginName);
}
pluginDao.addOrUpdatePluginDefine(pluginDefine);
}
}
}

34
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java

@ -21,12 +21,20 @@ import static java.util.Objects.requireNonNull;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import java.util.Objects;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class PluginDao {
@Autowired
private PluginDefineMapper pluginDefineMapper;
@ -43,20 +51,36 @@ public class PluginDao {
* add or update plugin define
*
* @param pluginDefine new pluginDefine
* @return plugin id
*/
public int addOrUpdatePluginDefine(PluginDefine pluginDefine) {
requireNonNull(pluginDefine, "pluginDefine is null");
public int addOrUpdatePluginDefine(@NonNull PluginDefine pluginDefine) {
requireNonNull(pluginDefine.getPluginName(), "pluginName is null");
requireNonNull(pluginDefine.getPluginType(), "pluginType is null");
PluginDefine currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
PluginDefine currPluginDefine =
pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
if (currPluginDefine == null) {
try {
if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() > 0) {
return pluginDefine.getId();
}
throw new IllegalStateException("Failed to insert plugin definition");
throw new TaskPluginException(
String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
pluginDefine.getPluginName(), pluginDefine.getPluginType()));
} catch (TaskPluginException ex) {
throw ex;
} catch (Exception ex) {
log.error("Insert plugin definition error, there may already exist a plugin", ex);
currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(),
pluginDefine.getPluginType());
if (currPluginDefine == null) {
throw new TaskPluginException(
String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
pluginDefine.getPluginName(), pluginDefine.getPluginType()));
}
}
}
if (!currPluginDefine.getPluginParams().equals(pluginDefine.getPluginParams())) {
if (!Objects.equals(currPluginDefine.getPluginParams(), pluginDefine.getPluginParams())) {
currPluginDefine.setUpdateTime(pluginDefine.getUpdateTime());
currPluginDefine.setPluginParams(pluginDefine.getPluginParams());
pluginDefineMapper.updateById(currPluginDefine);

Loading…
Cancel
Save