From 57414c4df70d8931157d4b4608170a4784193c2a Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Tue, 29 Jun 2021 22:55:01 +0800 Subject: [PATCH] [Feature][JsonSplit-api] merging from dev to json_split_two (#5712) * [BUG-#5678][Registry]fix registry init node miss (#5686) * [Improvement][UI] Update the update time after the user information is successfully modified (#5684) * improve edit the userinfo success, but the updatetime is not the latest. * Improved shell task execution result log information, adding process.waitFor() and process.exitValue() information to the original log (#5691) Co-authored-by: shenglm * [Feature-#5565][Master Worker-Server] Global Param passed by sense dependencies (#5603) * add globalParams new plan with varPool * add unit test * add python task varPoolParams Co-authored-by: wangxj * Issue robot translation judgment changed to Chinese (#5694) Co-authored-by: chenxingchun <438044805@qq.com> * the update function should use post instead of get (#5703) * enhance form verify (#5696) * checkState only supports %s not {} (#5711) * [Fix-5701]When deleting a user, the accessToken associated with the user should also be deleted (#5697) * update * fix the codestyle error * fix the compile error * support rollback Co-authored-by: Kirs Co-authored-by: kyoty Co-authored-by: ji04xiaogang Co-authored-by: shenglm Co-authored-by: wangxj3 <857234426@qq.com> Co-authored-by: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> Co-authored-by: chenxingchun <438044805@qq.com> Co-authored-by: JinyLeeChina <297062848@qq.com> --- .github/actions/translate-on-issue | 1 - .gitmodules | 3 - .../alert/plugin/AlertPluginManager.java | 2 +- .../AlertPluginInstanceController.java | 2 +- .../api/service/impl/UsersServiceImpl.java | 7 + .../api/service/UsersServiceTest.java | 18 +- .../common/enums/DataType.java | 3 +- .../common/task/AbstractParameters.java | 182 ++++++++++++++---- .../common/task/shell/ShellParameters.java | 60 +++--- .../common/task/sql/SqlParameters.java | 65 +++++++ .../common/utils/VarPoolUtils.java | 3 +- .../common/task/EntityTestUtils.java | 2 +- .../common/task/SqlParametersTest.java | 30 ++- .../dao/mapper/AccessTokenMapper.java | 14 +- .../dao/mapper/AccessTokenMapper.xml | 4 + .../dao/mapper/AccessTokenMapperTest.java | 47 +++-- .../command/TaskExecuteResponseCommand.java | 12 -- .../builder/TaskExecutionContextBuilder.java | 14 +- .../server/entity/TaskExecutionContext.java | 13 ++ .../processor/TaskResponseProcessor.java | 3 +- .../processor/queue/TaskResponseEvent.java | 15 +- .../processor/queue/TaskResponseService.java | 3 +- .../master/registry/MasterRegistryClient.java | 16 +- .../master/registry/ServerNodeManager.java | 7 +- .../master/runner/MasterExecThread.java | 90 +++++---- .../server/utils/ParamUtils.java | 8 +- .../worker/registry/WorkerRegistryClient.java | 3 +- .../worker/runner/TaskExecuteThread.java | 7 +- .../worker/task/AbstractCommandExecutor.java | 20 +- .../server/worker/task/AbstractTask.java | 28 +-- .../server/worker/task/datax/DataxTask.java | 1 + .../server/worker/task/flink/FlinkTask.java | 1 + .../server/worker/task/http/HttpTask.java | 1 + .../server/worker/task/mr/MapReduceTask.java | 1 + .../worker/task/procedure/ProcedureTask.java | 1 + .../server/worker/task/python/PythonTask.java | 3 +- .../server/worker/task/shell/ShellTask.java | 17 +- .../server/worker/task/spark/SparkTask.java | 1 + .../server/worker/task/sql/SqlTask.java | 47 +++-- .../server/worker/task/sqoop/SqoopTask.java | 1 + .../server/master/MasterExecThreadTest.java | 48 +++++ .../server/master/ParamsTest.java | 33 ++-- .../queue/TaskResponseServiceTest.java | 3 +- .../server/utils/ParamUtilsTest.java | 42 ++-- .../processor/TaskCallbackServiceTest.java | 2 - .../processor/TaskKillProcessorTest.java | 6 +- .../registry/WorkerRegistryClientTest.java | 2 - .../worker/runner/TaskExecuteThreadTest.java | 3 +- .../task/AbstractCommandExecutorTest.java | 53 ----- .../worker/task/ShellTaskReturnTest.java | 11 -- .../server/worker/task/TaskManagerTest.java | 5 - .../server/worker/task/TaskParamsTest.java | 77 ++++++++ .../worker/task/shell/ShellTaskTest.java | 12 +- .../server/worker/task/sql/SqlTaskTest.java | 2 + .../service/process/ProcessService.java | 55 ++---- .../service/registry/RegistryCenter.java | 40 +--- .../service/registry/RegistryClient.java | 8 +- .../service/process/ProcessServiceTest.java | 15 ++ .../spi/plugin/DolphinPluginLoader.java | 2 +- .../warningGroups/_source/createWarning.vue | 4 + .../_source/createWarningInstance.vue | 4 + .../pages/user/pages/account/_source/info.vue | 7 +- .../js/conf/home/store/security/actions.js | 2 +- .../src/js/module/i18n/locale/en_US.js | 2 + .../src/js/module/i18n/locale/zh_CN.js | 2 + pom.xml | 2 +- 66 files changed, 738 insertions(+), 460 deletions(-) delete mode 160000 .github/actions/translate-on-issue delete mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java diff --git a/.github/actions/translate-on-issue b/.github/actions/translate-on-issue deleted file mode 160000 index 959b66feb4..0000000000 --- a/.github/actions/translate-on-issue +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 959b66feb4231b08e8251422ac6d469cdc03d140 diff --git a/.gitmodules b/.gitmodules index d5c455f6da..11414db08c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -21,6 +21,3 @@ [submodule ".github/actions/lable-on-issue"] path = .github/actions/lable-on-issue url = https://github.com/xingchun-chen/labeler -[submodule ".github/actions/translate-on-issue"] - path = .github/actions/translate-on-issue - url = https://github.com/xingchun-chen/translation-helper.git diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java index 5788cf9809..4fbe2bd91a 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java @@ -76,7 +76,7 @@ public class AlertPluginManager extends AbstractDolphinPluginManager { requireNonNull(name, "name is null"); AlertChannelFactory alertChannelFactory = alertChannelFactoryMap.get(name); - checkState(alertChannelFactory != null, "Alert Plugin {} is not registered", name); + checkState(alertChannelFactory != null, "Alert Plugin %s is not registered", name); try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(alertChannelFactory.getClass().getClassLoader())) { AlertChannel alertChannel = alertChannelFactory.create(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AlertPluginInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AlertPluginInstanceController.java index 42c5536042..346e0418aa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AlertPluginInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AlertPluginInstanceController.java @@ -108,7 +108,7 @@ public class AlertPluginInstanceController extends BaseController { @ApiImplicitParam(name = "instanceName", value = "ALERT_PLUGIN_INSTANCE_NAME", required = true, dataType = "String", example = "DING TALK"), @ApiImplicitParam(name = "pluginInstanceParams", value = "ALERT_PLUGIN_INSTANCE_PARAMS", required = true, dataType = "String", example = "ALERT_PLUGIN_INSTANCE_PARAMS") }) - @GetMapping(value = "/update") + @PostMapping(value = "/update") @ResponseStatus(HttpStatus.OK) @ApiException(UPDATE_ALERT_PLUGIN_INSTANCE_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java index 23a6e895fa..6c6e4d5f9b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java @@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.ResourcesUser; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UDFUser; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper; import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; @@ -83,6 +84,9 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { private static final Logger logger = LoggerFactory.getLogger(UsersServiceImpl.class); + @Autowired + private AccessTokenMapper accessTokenMapper; + @Autowired private UserMapper userMapper; @@ -482,6 +486,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { * @throws Exception exception when operate hdfs */ @Override + @Transactional(rollbackFor = RuntimeException.class) public Map deleteUserById(User loginUser, int id) throws IOException { Map result = new HashMap<>(); //only admin can operate @@ -514,6 +519,8 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { } } + accessTokenMapper.deleteAccessTokenByUserId(id); + userMapper.deleteById(id); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java index f2df3c1862..bc0a922493 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper; import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; @@ -79,6 +80,9 @@ public class UsersServiceTest { @Mock private UserMapper userMapper; + @Mock + private AccessTokenMapper accessTokenMapper; + @Mock private TenantMapper tenantMapper; @@ -221,7 +225,6 @@ public class UsersServiceTest { Assert.assertEquals(user.getId(), userExistId); } - @Test public void testQueryUserList() { User user = new User(); @@ -265,13 +268,13 @@ public class UsersServiceTest { String userPassword = "userTest0001"; try { //user not exist - Map result = usersService.updateUser(getLoginUser(), 0,userName,userPassword,"3443@qq.com",1,"13457864543","queue", 1); + Map result = usersService.updateUser(getLoginUser(), 0, userName, userPassword, "3443@qq.com", 1, "13457864543", "queue", 1); Assert.assertEquals(Status.USER_NOT_EXIST, result.get(Constants.STATUS)); logger.info(result.toString()); //success when(userMapper.selectById(1)).thenReturn(getUser()); - result = usersService.updateUser(getLoginUser(), 1,userName,userPassword,"32222s@qq.com",1,"13457864543","queue", 1); + result = usersService.updateUser(getLoginUser(), 1, userName, userPassword, "32222s@qq.com", 1, "13457864543", "queue", 1); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } catch (Exception e) { @@ -286,7 +289,7 @@ public class UsersServiceTest { try { when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getUser()); when(userMapper.selectById(1)).thenReturn(getUser()); - + when(accessTokenMapper.deleteAccessTokenByUserId(1)).thenReturn(0); //no operate Map result = usersService.deleteUserById(loginUser, 3); logger.info(result.toString()); @@ -356,7 +359,6 @@ public class UsersServiceTest { } - @Test public void testGrantUDFFunction() { String udfIds = "100000,120000"; @@ -398,7 +400,7 @@ public class UsersServiceTest { } - private User getLoginUser(){ + private User getLoginUser() { User loginUser = new User(); loginUser.setId(1); loginUser.setUserType(UserType.ADMIN_USER); @@ -431,7 +433,6 @@ public class UsersServiceTest { Assert.assertEquals("userTest0001", tempUser.getUserName()); } - @Test public void testQueryAllGeneralUsers() { User loginUser = new User(); @@ -478,7 +479,6 @@ public class UsersServiceTest { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } - @Test public void testAuthorizedUser() { User loginUser = new User(); @@ -535,7 +535,6 @@ public class UsersServiceTest { } } - @Test public void testActivateUser() { User user = new User(); @@ -618,7 +617,6 @@ public class UsersServiceTest { return user; } - /** * get user */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java index eda00d819a..2b0930d32d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java @@ -30,6 +30,7 @@ public enum DataType { * 6 time, "HH:MM:SS" * 7 time stamp * 8 Boolean + * 9 list */ - VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN + VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN,LIST } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java index 929516c86b..686642dbdd 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java @@ -16,55 +16,163 @@ */ package org.apache.dolphinscheduler.common.task; +import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; + /** * job params related class */ public abstract class AbstractParameters implements IParameters { - @Override - public abstract boolean checkParameters(); - - @Override - public abstract List getResourceFilesList(); - - /** - * local parameters - */ - public List localParams; - - /** - * get local parameters list - * @return Property list - */ - public List getLocalParams() { - return localParams; - } - - public void setLocalParams(List localParams) { - this.localParams = localParams; - } - - /** - * get local parameters map - * @return parameters map - */ - public Map getLocalParametersMap() { - if (localParams != null) { - Map localParametersMaps = new LinkedHashMap<>(); - - for (Property property : localParams) { - localParametersMaps.put(property.getProp(),property); + @Override + public abstract boolean checkParameters(); + + @Override + public abstract List getResourceFilesList(); + + /** + * local parameters + */ + public List localParams; + + /** + * var pool + */ + public List varPool; + + /** + * get local parameters list + * + * @return Property list + */ + public List getLocalParams() { + return localParams; + } + + public void setLocalParams(List localParams) { + this.localParams = localParams; + } + + /** + * get local parameters map + * + * @return parameters map + */ + public Map getLocalParametersMap() { + if (localParams != null) { + Map localParametersMaps = new LinkedHashMap<>(); + + for (Property property : localParams) { + localParametersMaps.put(property.getProp(), property); + } + return localParametersMaps; + } + return null; + } + + /** + * get varPool map + * + * @return parameters map + */ + public Map getVarPoolMap() { + if (varPool != null) { + Map varPoolMap = new LinkedHashMap<>(); + for (Property property : varPool) { + varPoolMap.put(property.getProp(), property); + } + return varPoolMap; + } + return null; + } + + public List getVarPool() { + return varPool; + } + + public void setVarPool(String varPool) { + if (StringUtils.isEmpty(varPool)) { + this.varPool = new ArrayList<>(); + } else { + this.varPool = JSONUtils.toList(varPool, Property.class); + } + } + + public void dealOutParam(String result) { + if (CollectionUtils.isEmpty(localParams)) { + return; + } + List outProperty = getOutProperty(localParams); + if (CollectionUtils.isEmpty(outProperty)) { + return; + } + if (StringUtils.isEmpty(result)) { + varPool.addAll(outProperty); + return; + } + Map taskResult = getMapByString(result); + if (taskResult == null || taskResult.size() == 0) { + return; + } + for (Property info : outProperty) { + info.setValue(taskResult.get(info.getProp())); + varPool.add(info); + } + } + + public List getOutProperty(List params) { + if (CollectionUtils.isEmpty(params)) { + return new ArrayList<>(); + } + List result = new ArrayList<>(); + for (Property info : params) { + if (info.getDirect() == Direct.OUT) { + result.add(info); + } + } + return result; + } + + public List> getListMapByString(String json) { + List> allParams = new ArrayList<>(); + ArrayNode paramsByJson = JSONUtils.parseArray(json); + Iterator listIterator = paramsByJson.iterator(); + while (listIterator.hasNext()) { + Map param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class); + allParams.add(param); + } + return allParams; + } + + /** + * shell's result format is key=value$VarPool$key=value$VarPool$ + * @param result + * @return + */ + public static Map getMapByString(String result) { + String[] formatResult = result.split("\\$VarPool\\$"); + Map format = new HashMap<>(); + for (String info : formatResult) { + if (StringUtils.isNotEmpty(info) && info.contains("=")) { + String[] keyValue = info.split("="); + format.put(keyValue[0], keyValue[1]); + } } - return localParametersMaps; - } - return null; - } + return format; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java index e11e59600b..7388cd35da 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java @@ -14,52 +14,52 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.common.task.shell; +package org.apache.dolphinscheduler.common.task.shell; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import java.util.List; -import java.util.stream.Collectors; /** * shell parameters */ public class ShellParameters extends AbstractParameters { - /** - * shell script - */ - private String rawScript; + /** + * shell script + */ + private String rawScript; + + /** + * resource list + */ + private List resourceList; - /** - * resource list - */ - private List resourceList; + public String getRawScript() { + return rawScript; + } - public String getRawScript() { - return rawScript; - } + public void setRawScript(String rawScript) { + this.rawScript = rawScript; + } - public void setRawScript(String rawScript) { - this.rawScript = rawScript; - } + public List getResourceList() { + return resourceList; + } - public List getResourceList() { - return resourceList; - } + public void setResourceList(List resourceList) { + this.resourceList = resourceList; + } - public void setResourceList(List resourceList) { - this.resourceList = resourceList; - } + @Override + public boolean checkParameters() { + return rawScript != null && !rawScript.isEmpty(); + } - @Override - public boolean checkParameters() { - return rawScript != null && !rawScript.isEmpty(); - } + @Override + public List getResourceFilesList() { + return resourceList; + } - @Override - public List getResourceFilesList() { - return resourceList; - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java index a83cd64cca..59259a53ef 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java @@ -17,12 +17,19 @@ package org.apache.dolphinscheduler.common.task.sql; +import org.apache.dolphinscheduler.common.enums.DataType; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; /** * Sql/Hql parameter @@ -94,6 +101,16 @@ public class SqlParameters extends AbstractParameters { */ private String title; + private int limit; + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + public String getType() { return type; } @@ -208,6 +225,53 @@ public class SqlParameters extends AbstractParameters { return new ArrayList<>(); } + @Override + public void dealOutParam(String result) { + if (CollectionUtils.isEmpty(localParams)) { + return; + } + List outProperty = getOutProperty(localParams); + if (CollectionUtils.isEmpty(outProperty)) { + return; + } + if (StringUtils.isEmpty(result)) { + varPool.addAll(outProperty); + return; + } + List> sqlResult = getListMapByString(result); + if (CollectionUtils.isEmpty(sqlResult)) { + return; + } + //if sql return more than one line + if (sqlResult.size() > 1) { + Map> sqlResultFormat = new HashMap<>(); + //init sqlResultFormat + Set keySet = sqlResult.get(0).keySet(); + for (String key : keySet) { + sqlResultFormat.put(key, new ArrayList<>()); + } + for (Map info : sqlResult) { + for (String key : info.keySet()) { + sqlResultFormat.get(key).add(String.valueOf(info.get(key))); + } + } + for (Property info : outProperty) { + if (info.getType() == DataType.LIST) { + info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp()))); + varPool.add(info); + } + } + } else { + //result only one line + Map firstRow = sqlResult.get(0); + for (Property info : outProperty) { + info.setValue(String.valueOf(firstRow.get(info.getProp()))); + varPool.add(info); + } + } + + } + @Override public String toString() { return "SqlParameters{" @@ -217,6 +281,7 @@ public class SqlParameters extends AbstractParameters { + ", sqlType=" + sqlType + ", sendEmail=" + sendEmail + ", displayRows=" + displayRows + + ", limit=" + limit + ", udfs='" + udfs + '\'' + ", showType='" + showType + '\'' + ", connParams='" + connParams + '\'' diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java index cd300e3f03..f286300d0d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import java.text.ParseException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; public class VarPoolUtils { @@ -71,7 +70,7 @@ public class VarPoolUtils { if (kvs.length == 2) { propToValue.put(kvs[0], kvs[1]); } else { - throw new ParseException(kv, 2); + return; } } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java index 5d867bc4d9..8e9b451141 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java @@ -32,7 +32,7 @@ public class EntityTestUtils { static { OBJECT_MAP.put("java.lang.Long", 1L); - OBJECT_MAP.put("java.lang.String", "test"); + OBJECT_MAP.put("java.lang.String", "[{\"direct\":\"OUT\",\"prop\":\"percentage5\",\"type\":\"VARCHAR\",\"value\":\"qwe\"}]"); OBJECT_MAP.put("java.lang.Integer", 1); OBJECT_MAP.put("int", 1); OBJECT_MAP.put("long", 1L); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java index 6fc4d6c9c9..17e95cf9d6 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java @@ -17,9 +17,17 @@ package org.apache.dolphinscheduler.common.task; +import static org.junit.Assert.assertNotNull; + +import org.apache.dolphinscheduler.common.enums.DataType; +import org.apache.dolphinscheduler.common.enums.Direct; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import java.util.ArrayList; +import java.util.List; + import org.junit.Assert; import org.junit.Test; @@ -38,6 +46,14 @@ public class SqlParametersTest { @Test public void testSqlParameters() { + List properties = new ArrayList<>(); + Property property = new Property(); + property.setProp("test1"); + property.setDirect(Direct.OUT); + property.setType(DataType.VARCHAR); + property.setValue("test1"); + properties.add(property); + SqlParameters sqlParameters = new SqlParameters(); Assert.assertTrue(CollectionUtils.isEmpty(sqlParameters.getResourceFilesList())); @@ -63,6 +79,18 @@ public class SqlParametersTest { Assert.assertEquals(title, sqlParameters.getTitle()); Assert.assertEquals(groupId, sqlParameters.getGroupId()); - Assert.assertTrue(sqlParameters.checkParameters()); + String sqlResult = "[{\"id\":6,\"test1\":\"6\"},{\"id\":70002,\"test1\":\"+1\"}]"; + String sqlResult1 = "[{\"id\":6,\"test1\":\"6\"}]"; + sqlParameters.setLocalParams(properties); + sqlParameters.varPool = new ArrayList<>(); + sqlParameters.dealOutParam(sqlResult1); + assertNotNull(sqlParameters.getVarPool().get(0)); + + property.setType(DataType.LIST); + properties.clear(); + properties.add(property); + sqlParameters.setLocalParams(properties); + sqlParameters.dealOutParam(sqlResult); + assertNotNull(sqlParameters.getVarPool().get(0)); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java index 2e82744025..472ba35e3b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java @@ -14,13 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.AccessToken; + +import org.apache.ibatis.annotations.Param; + import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import org.apache.ibatis.annotations.Param; /** * accesstoken mapper interface @@ -30,6 +33,7 @@ public interface AccessTokenMapper extends BaseMapper { /** * access token page + * * @param page page * @param userName userName * @param userId userId @@ -39,4 +43,12 @@ public interface AccessTokenMapper extends BaseMapper { @Param("userName") String userName, @Param("userId") int userId ); + + /** + * delete by userId + * + * @param userId userId + * @return delete result + */ + int deleteAccessTokenByUserId(@Param("userId") int userId); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml index 02fc9526b3..35312fb88a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml @@ -31,4 +31,8 @@ order by t.update_time desc + + delete from t_ds_access_token + where user_id = #{userId} + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java index 30c8cdc7b9..31958a73e8 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java @@ -16,12 +16,22 @@ */ package org.apache.dolphinscheduler.dao.mapper; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.AccessToken; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -31,14 +41,8 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import javax.annotation.Resource; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.ThreadLocalRandom; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.*; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; /** * AccessToken mapper test @@ -57,8 +61,6 @@ public class AccessTokenMapperTest { /** * test insert - * - * @throws Exception */ @Test public void testInsert() throws Exception { @@ -68,6 +70,27 @@ public class AccessTokenMapperTest { assertThat(accessToken.getId(), greaterThan(0)); } + /** + * test delete AccessToken By UserId + */ + @Test + public void testDeleteAccessTokenByUserId() throws Exception { + Integer userId = 1; + int insertCount = 0; + + for (int i = 0; i < 10; i++) { + try { + createAccessToken(userId); + insertCount++; + } catch (Exception e) { + e.printStackTrace(); + } + } + + int deleteCount = accessTokenMapper.deleteAccessTokenByUserId(userId); + Assert.assertEquals(insertCount, deleteCount); + } + /** * test select by id diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index 93cc3eab12..de5b82c729 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -68,10 +68,6 @@ public class TaskExecuteResponseCommand implements Serializable { * varPool string */ private String varPool; - /** - * task return result - */ - private String result; public void setVarPool(String varPool) { this.varPool = varPool; @@ -143,12 +139,4 @@ public class TaskExecuteResponseCommand implements Serializable { + ", appIds='" + appIds + '\'' + '}'; } - - public String getResult() { - return result; - } - - public void setResult(String result) { - this.result = result; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index da46e4dce3..c1cca3a1bd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -21,8 +21,15 @@ import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UN import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.server.entity.*; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; /** * TaskExecutionContext builder @@ -41,7 +48,7 @@ public class TaskExecutionContextBuilder { * @param taskInstance taskInstance * @return TaskExecutionContextBuilder */ - public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){ + public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance) { taskExecutionContext.setTaskInstanceId(taskInstance.getId()); taskExecutionContext.setTaskName(taskInstance.getName()); taskExecutionContext.setFirstSubmitTime(taskInstance.getFirstSubmitTime()); @@ -52,6 +59,7 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setHost(taskInstance.getHost()); taskExecutionContext.setResources(taskInstance.getResources()); taskExecutionContext.setDelayTime(taskInstance.getDelayTime()); + taskExecutionContext.setVarPool(taskInstance.getVarPool()); return this; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 84908496d2..7a47107249 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -216,6 +216,11 @@ public class TaskExecutionContext implements Serializable { */ private SqoopTaskExecutionContext sqoopTaskExecutionContext; + /** + * taskInstance varPool + */ + private String varPool; + /** * procedure TaskExecutionContext */ @@ -556,4 +561,12 @@ public class TaskExecutionContext implements Serializable { + ", procedureTaskExecutionContext=" + procedureTaskExecutionContext + '}'; } + + public String getVarPool() { + return varPool; + } + + public void setVarPool(String varPool) { + this.varPool = varPool; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 186c4f35ba..c307b2ce83 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -80,8 +80,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { responseCommand.getAppIds(), responseCommand.getTaskInstanceId(), responseCommand.getVarPool(), - channel, - responseCommand.getResult() + channel ); taskResponseService.addResponse(taskResponseEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 9789bccb3c..05466e8747 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -92,10 +92,6 @@ public class TaskResponseEvent { * channel */ private Channel channel; - /** - * task return result - */ - private String result; public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, @@ -122,8 +118,7 @@ public class TaskResponseEvent { String appIds, int taskInstanceId, String varPool, - Channel channel, - String result) { + Channel channel) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setEndTime(endTime); @@ -133,7 +128,6 @@ public class TaskResponseEvent { event.setEvent(Event.RESULT); event.setVarPool(varPool); event.setChannel(channel); - event.setResult(result); return event; } @@ -233,11 +227,4 @@ public class TaskResponseEvent { this.channel = channel; } - public String getResult() { - return result; - } - - public void setResult(String result) { - this.result = result; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index f3f2e7f15b..1b5eddbd6f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -165,8 +165,7 @@ public class TaskResponseService { taskResponseEvent.getProcessId(), taskResponseEvent.getAppIds(), taskResponseEvent.getTaskInstanceId(), - taskResponseEvent.getVarPool(), - taskResponseEvent.getResult() + taskResponseEvent.getVarPool() ); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 3a2e3044ec..1286818d8b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.registry; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; @@ -134,18 +135,6 @@ public class MasterRegistryClient { unRegistry(); } - /** - * init system node - */ - private void initMasterSystemNode() { - try { - registryClient.persist(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, ""); - logger.info("initialize master server nodes success."); - } catch (Exception e) { - logger.error("init system node failed", e); - } - } - /** * remove zookeeper node path * @@ -346,7 +335,6 @@ public class MasterRegistryClient { * registry */ public void registry() { - initMasterSystemNode(); String address = NetUtils.getAddr(masterConfig.getListenPort()); localNodePath = getMasterPath(); registryClient.persistEphemeral(localNodePath, ""); @@ -395,7 +383,7 @@ public class MasterRegistryClient { */ public String getMasterPath() { String address = getLocalAddress(); - return registryClient.getMasterPath() + "/" + address; + return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 0162af6bac..6a9167e751 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.server.master.registry; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -131,11 +134,11 @@ public class ServerNodeManager implements InitializingBean { /** * init MasterNodeListener listener */ - registryClient.subscribe(registryClient.getMasterPath(), new MasterDataListener()); + registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener()); /** * init WorkerNodeListener listener */ - registryClient.subscribe(registryClient.getWorkerPath(), new MasterDataListener()); + registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new MasterDataListener()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 0720a9b77a..111262e8ff 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -22,7 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; import org.apache.dolphinscheduler.common.Constants; @@ -47,7 +46,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.common.utils.VarPoolUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; @@ -60,7 +58,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; -import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -470,8 +467,6 @@ public class MasterExecThread implements Runnable { * @return TaskInstance */ private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { - //update processInstance for update the globalParams - this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId()); TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion()); if (taskInstance == null) { taskInstance = new TaskInstance(); @@ -503,6 +498,9 @@ public class MasterExecThread implements Runnable { // retry task instance interval taskInstance.setRetryInterval(taskNode.getRetryInterval()); + //set task param + taskInstance.setTaskParams(taskNode.getTaskParams()); + // task instance priority if (taskNode.getTaskInstancePriority() == null) { taskInstance.setTaskInstancePriority(Priority.MEDIUM); @@ -518,54 +516,74 @@ public class MasterExecThread implements Runnable { } else { taskInstance.setWorkerGroup(taskWorkerGroup); } - taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getTaskParams())); // delay execution time taskInstance.setDelayTime(taskNode.getDelayTime()); } + + //get pre task ,get all the task varPool to this task + Set preTask = dag.getPreviousNodes(taskInstance.getName()); + getPreVarPool(taskInstance, preTask); return taskInstance; } - private String globalParamToTaskParams(String params) { - String globalParams = this.processInstance.getGlobalParams(); - if (StringUtils.isBlank(globalParams)) { - return params; - } - Map globalMap = processService.getGlobalParamMap(globalParams); - if (globalMap == null || globalMap.size() == 0) { - return params; - } - // the process global param save in localParams - Map result = JSONUtils.toMap(params, String.class, Object.class); - Object localParams = result.get(LOCAL_PARAMS); - if (localParams != null) { - List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); - for (Property info : allParam) { - String paramName = info.getProp(); - if (StringUtils.isNotEmpty(paramName) && propToValue.containsKey(paramName)) { - info.setValue((String) propToValue.get(paramName)); + public void getPreVarPool(TaskInstance taskInstance, Set preTask) { + Map allProperty = new HashMap<>(); + Map allTaskInstance = new HashMap<>(); + if (CollectionUtils.isNotEmpty(preTask)) { + for (String preTaskName : preTask) { + TaskInstance preTaskInstance = completeTaskList.get(preTaskName); + if (preTaskInstance == null) { + continue; } - if (info.getDirect().equals(Direct.IN)) { - String value = globalMap.get(paramName); - if (StringUtils.isNotEmpty(value)) { - info.setValue(value); + String preVarPool = preTaskInstance.getVarPool(); + if (StringUtils.isNotEmpty(preVarPool)) { + List properties = JSONUtils.toList(preVarPool, Property.class); + for (Property info : properties) { + setVarPoolValue(allProperty, allTaskInstance, preTaskInstance, info); } } } - result.put(LOCAL_PARAMS, allParam); + if (allProperty.size() > 0) { + taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values())); + } + } + } + + private void setVarPoolValue(Map allProperty, Map allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) { + //for this taskInstance all the param in this part is IN. + thisProperty.setDirect(Direct.IN); + //get the pre taskInstance Property's name + String proName = thisProperty.getProp(); + //if the Previous nodes have the Property of same name + if (allProperty.containsKey(proName)) { + //comparison the value of two Property + Property otherPro = allProperty.get(proName); + //if this property'value of loop is empty,use the other,whether the other's value is empty or not + if (StringUtils.isEmpty(thisProperty.getValue())) { + allProperty.put(proName, otherPro); + //if property'value of loop is not empty,and the other's value is not empty too, use the earlier value + } else if (StringUtils.isNotEmpty(otherPro.getValue())) { + TaskInstance otherTask = allTaskInstance.get(proName); + if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) { + allProperty.put(proName, thisProperty); + allTaskInstance.put(proName,preTaskInstance); + } else { + allProperty.put(proName, otherPro); + } + } else { + allProperty.put(proName, thisProperty); + allTaskInstance.put(proName,preTaskInstance); + } + } else { + allProperty.put(proName, thisProperty); + allTaskInstance.put(proName,preTaskInstance); } - return JSONUtils.toJsonString(result); } private void submitPostNode(String parentNodeName) { Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList); List taskInstances = new ArrayList<>(); for (String taskNode : submitTaskNodeList) { - try { - VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool()); - } catch (ParseException e) { - logger.error("parse {} exception", processInstance.getVarPool(), e); - throw new RuntimeException(); - } TaskNode taskNodeObject = dag.getNode(taskNode); taskInstances.add(createTaskInstance(processInstance, taskNodeObject)); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java index 875c69cb82..a49d915ff9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java @@ -47,6 +47,7 @@ public class ParamUtils { public static Map convert(Map globalParams, Map globalParamsMap, Map localParams, + Map varParams, CommandType commandType, Date scheduleTime) { if (globalParams == null && localParams == null) { @@ -64,10 +65,15 @@ public class ParamUtils { } if (globalParams != null && localParams != null) { - globalParams.putAll(localParams); + localParams.putAll(globalParams); + globalParams = localParams; } else if (globalParams == null && localParams != null) { globalParams = localParams; } + if (varParams != null) { + varParams.putAll(globalParams); + globalParams = varParams; + } Iterator> iter = globalParams.entrySet().iterator(); while (iter.hasNext()) { Map.Entry en = iter.next(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 4db4d17533..3b0dedb99d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; import static org.apache.dolphinscheduler.common.Constants.SLASH; import org.apache.dolphinscheduler.common.Constants; @@ -130,7 +131,7 @@ public class WorkerRegistryClient { public Set getWorkerZkPaths() { Set workerPaths = Sets.newHashSet(); String address = getLocalAddress(); - String workerZkPathPrefix = registryClient.getWorkerPath(); + String workerZkPathPrefix = REGISTRY_DOLPHINSCHEDULER_WORKERS; for (String workGroup : this.workerGroups) { StringJoiner workerPathJoiner = new StringJoiner(SLASH); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 6fd4f34b2f..50847f7e13 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -151,10 +151,10 @@ public class TaskExecuteThread implements Runnable, Delayed { taskExecutionContext.getTaskInstanceId())); task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService); - // task init task.init(); - + //init varPool + task.getParameters().setVarPool(taskExecutionContext.getVarPool()); // task handle task.handle(); @@ -165,8 +165,7 @@ public class TaskExecuteThread implements Runnable, Delayed { responseCommand.setEndTime(new Date()); responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); - responseCommand.setVarPool(task.getVarPool()); - responseCommand.setResult(task.getResultString()); + responseCommand.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool())); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); } catch (Exception e) { logger.error("task scheduler failure", e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index e408f11738..3ea7bd21d4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -88,11 +88,6 @@ public abstract class AbstractCommandExecutor { protected boolean logOutputIsScuccess = false; - /** - * SHELL result string - */ - protected String taskResultString; - /** * taskExecutionContext */ @@ -207,8 +202,8 @@ public abstract class AbstractCommandExecutor { // waiting for the run to finish boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); - logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}", - taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode()); + logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", + taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode(), status, process.exitValue()); // if SHELL task exit if (status) { @@ -224,7 +219,8 @@ public abstract class AbstractCommandExecutor { result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE); } } else { - logger.error("process has failure , exitStatusCode : {} , ready to kill ...", result.getExitStatusCode()); + logger.error("process has failure , exitStatusCode:{}, processExitValue:{}, ready to kill ...", + result.getExitStatusCode(), process.exitValue()); ProcessUtils.kill(taskExecutionContext); result.setExitStatusCode(EXIT_CODE_FAILURE); } @@ -364,7 +360,6 @@ public abstract class AbstractCommandExecutor { varPool.append("$VarPool$"); } else { logBuffer.add(line); - taskResultString = line; } } } catch (Exception e) { @@ -592,11 +587,4 @@ public abstract class AbstractCommandExecutor { protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; - public String getTaskResultString() { - return taskResultString; - } - - public void setTaskResultString(String taskResultString) { - this.taskResultString = taskResultString; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 45b94d2628..81b80974b6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -35,11 +35,6 @@ import org.slf4j.Logger; */ public abstract class AbstractTask { - /** - * varPool string - */ - protected String varPool; - /** * taskExecutionContext **/ @@ -56,11 +51,6 @@ public abstract class AbstractTask { */ protected int processId; - /** - * SHELL result string - */ - protected String resultString; - /** * other resource manager appId , for example : YARN etc */ @@ -81,7 +71,7 @@ public abstract class AbstractTask { * constructor * * @param taskExecutionContext taskExecutionContext - * @param logger logger + * @param logger logger */ protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger logger) { this.taskExecutionContext = taskExecutionContext; @@ -139,14 +129,6 @@ public abstract class AbstractTask { } } - public void setVarPool(String varPool) { - this.varPool = varPool; - } - - public String getVarPool() { - return varPool; - } - /** * get exit status code * @@ -176,14 +158,6 @@ public abstract class AbstractTask { this.processId = processId; } - public String getResultString() { - return resultString; - } - - public void setResultString(String resultString) { - this.resultString = resultString; - } - /** * get task parameters * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index a8aa132dc1..b785cb5d49 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -158,6 +158,7 @@ public class DataxTask extends AbstractTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), dataXParameters.getLocalParametersMap(), + dataXParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index 4d34190052..27e5b42f4c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -84,6 +84,7 @@ public class FlinkTask extends AbstractYarnTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), flinkParameters.getLocalParametersMap(), + flinkParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index 87adaab9af..7c68bc1c96 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -135,6 +135,7 @@ public class HttpTask extends AbstractTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), httpParameters.getLocalParametersMap(), + httpParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); List httpPropertyList = new ArrayList<>(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index f60b1cb426..ce908df596 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -88,6 +88,7 @@ public class MapReduceTask extends AbstractYarnTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), mapreduceParameters.getLocalParametersMap(), + mapreduceParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java index 2166b1f068..3748c7a492 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java @@ -122,6 +122,7 @@ public class ProcedureTask extends AbstractTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), procedureParameters.getLocalParametersMap(), + procedureParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index 6e561c1cab..e784a79b24 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -92,7 +92,7 @@ public class PythonTask extends AbstractTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); - setVarPool(pythonCommandExecutor.getVarPool()); + pythonParameters.dealOutParam(pythonCommandExecutor.getVarPool()); } catch (Exception e) { logger.error("python task failure", e); @@ -119,6 +119,7 @@ public class PythonTask extends AbstractTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), pythonParameters.getLocalParametersMap(), + pythonParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 85f8ea094b..e193571ce0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -105,8 +105,7 @@ public class ShellTask extends AbstractTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); - setResult(shellCommandExecutor.getTaskResultString()); - setVarPool(shellCommandExecutor.getVarPool()); + shellParameters.dealOutParam(shellCommandExecutor.getVarPool()); } catch (Exception e) { logger.error("shell task error", e); setExitStatusCode(Constants.EXIT_CODE_FAILURE); @@ -169,6 +168,7 @@ public class ShellTask extends AbstractTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), shellParameters.getLocalParametersMap(), + shellParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job @@ -188,17 +188,4 @@ public class ShellTask extends AbstractTask { } return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); } - - public void setResult(String result) { - Map localParams = shellParameters.getLocalParametersMap(); - List> outProperties = new ArrayList<>(); - Map p = new HashMap<>(); - localParams.forEach((k,v) -> { - if (v.getDirect() == Direct.OUT) { - p.put(k, result); - } - }); - outProperties.add(p); - resultString = JSONUtils.toJsonString(outProperties); - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index f6fec0fe48..a5a641cca9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -113,6 +113,7 @@ public class SparkTask extends AbstractYarnTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), sparkParameters.getLocalParametersMap(), + sparkParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 100f344614..b174734e01 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -86,12 +86,6 @@ public class SqlTask extends AbstractTask { */ private TaskExecutionContext taskExecutionContext; - /** - * default query sql limit - */ - private static final int LIMIT = 10000; - - private AlertClientService alertClientService; public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) { @@ -117,14 +111,16 @@ public class SqlTask extends AbstractTask { Thread.currentThread().setName(threadLoggerInfoName); logger.info("Full sql parameters: {}", sqlParameters); - logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}", + logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}", sqlParameters.getType(), sqlParameters.getDatasource(), sqlParameters.getSql(), sqlParameters.getLocalParams(), sqlParameters.getUdfs(), sqlParameters.getShowType(), - sqlParameters.getConnParams()); + sqlParameters.getConnParams(), + sqlParameters.getVarPool(), + sqlParameters.getLimit()); try { SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext(); @@ -175,6 +171,7 @@ public class SqlTask extends AbstractTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), sqlParameters.getLocalParametersMap(), + sqlParameters.getVarPoolMap(), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); @@ -268,10 +265,9 @@ public class SqlTask extends AbstractTask { String updateResult = String.valueOf(stmt.executeUpdate()); result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams()); } - + //deal out params + sqlParameters.dealOutParam(result); postSql(connection, postStatementsBinds); - this.setResultString(result); - } catch (Exception e) { logger.error("execute sql error: {}", e.getMessage()); throw e; @@ -280,6 +276,7 @@ public class SqlTask extends AbstractTask { } } + public String setNonQuerySqlReturn(String updateResult, List properties) { String result = null; for (Property info :properties) { @@ -309,7 +306,7 @@ public class SqlTask extends AbstractTask { int rowCount = 0; - while (rowCount < LIMIT && resultSet.next()) { + while (rowCount < sqlParameters.getLimit() && resultSet.next()) { ObjectNode mapOfColValues = JSONUtils.createObjectNode(); for (int i = 1; i <= num; i++) { mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i))); @@ -326,12 +323,11 @@ public class SqlTask extends AbstractTask { logger.info("row {} : {}", i + 1, row); } } - String result = JSONUtils.toJsonString(resultJSONArray); if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) { sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) - ? sqlParameters.getTitle() - : taskExecutionContext.getTaskName() + " query result sets", result); + ? sqlParameters.getTitle() + : taskExecutionContext.getTaskName() + " query result sets", result); } logger.debug("execute sql result : {}", result); return result; @@ -478,8 +474,16 @@ public class SqlTask extends AbstractTask { String paramName = m.group(1); Property prop = paramsPropsMap.get(paramName); - sqlParamsMap.put(index, prop); - index++; + if (prop == null) { + logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance" + + " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId()); + } + else { + sqlParamsMap.put(index, prop); + index++; + logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content); + } + } } @@ -495,8 +499,13 @@ public class SqlTask extends AbstractTask { //parameter print style logger.info("after replace sql , preparing : {}", formatSql); StringBuilder logPrint = new StringBuilder("replaced sql , parameters:"); - for (int i = 1; i <= sqlParamsMap.size(); i++) { - logPrint.append(sqlParamsMap.get(i).getValue() + "(" + sqlParamsMap.get(i).getType() + ")"); + if (sqlParamsMap == null) { + logger.info("printReplacedSql: sqlParamsMap is null."); + } + else { + for (int i = 1; i <= sqlParamsMap.size(); i++) { + logPrint.append(sqlParamsMap.get(i).getValue() + "(" + sqlParamsMap.get(i).getType() + ")"); + } } logger.info("Sql Params are {}", logPrint); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java index 00d94f01bf..1d1b32de00 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -76,6 +76,7 @@ public class SqoopTask extends AbstractYarnTask { Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()), sqoopTaskExecutionContext.getDefinedParams(), sqoopParameters.getLocalParametersMap(), + sqoopParameters.getVarPoolMap(), CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()), sqoopTaskExecutionContext.getScheduleTime()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index a42f1877e2..fbc4ed800d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -44,10 +44,14 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.text.ParseException; import java.util.Collections; +import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.junit.Assert; import org.junit.Before; @@ -207,6 +211,50 @@ public class MasterExecThreadTest { } } + @Test + public void testGetPreVarPool() { + try { + Set preTaskName = new HashSet<>(); + preTaskName.add("test1"); + preTaskName.add("test2"); + Map completeTaskList = new ConcurrentHashMap<>(); + + TaskInstance taskInstance = new TaskInstance(); + + TaskInstance taskInstance1 = new TaskInstance(); + taskInstance1.setId(1); + taskInstance1.setName("test1"); + taskInstance1.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"1\"}]"); + taskInstance1.setEndTime(new Date()); + + TaskInstance taskInstance2 = new TaskInstance(); + taskInstance2.setId(2); + taskInstance2.setName("test2"); + taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test2\",\"type\":\"VARCHAR\",\"value\":\"2\"}]"); + taskInstance2.setEndTime(new Date()); + + completeTaskList.put("test1", taskInstance1); + completeTaskList.put("test2", taskInstance2); + + Class masterExecThreadClass = MasterExecThread.class; + + Field field = masterExecThreadClass.getDeclaredField("completeTaskList"); + field.setAccessible(true); + field.set(masterExecThread, completeTaskList); + + masterExecThread.getPreVarPool(taskInstance, preTaskName); + Assert.assertNotNull(taskInstance.getVarPool()); + taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]"); + completeTaskList.put("test2", taskInstance2); + field.setAccessible(true); + field.set(masterExecThread, completeTaskList); + masterExecThread.getPreVarPool(taskInstance, preTaskName); + Assert.assertNotNull(taskInstance.getVarPool()); + } catch (Exception e) { + Assert.fail(); + } + } + private List zeroSchedulerList() { return Collections.EMPTY_LIST; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java index 48b34d5f88..12613c61c6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java @@ -20,20 +20,20 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; -import org.apache.dolphinscheduler.common.utils.*; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.Map; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * user define param @@ -73,20 +73,19 @@ public class ParamsTest { } @Test - public void convertTest()throws Exception{ - Map globalParams = new HashMap<>(); + public void convertTest() throws Exception { + Map globalParams = new HashMap<>(); Property property = new Property(); property.setProp("global_param"); property.setDirect(Direct.IN); property.setType(DataType.VARCHAR); property.setValue("${system.biz.date}"); - globalParams.put("global_param",property); + globalParams.put("global_param", property); - Map globalParamsMap = new HashMap<>(); - globalParamsMap.put("global_param","${system.biz.date}"); + Map globalParamsMap = new HashMap<>(); + globalParamsMap.put("global_param", "${system.biz.date}"); - - Map localParams = new HashMap<>(); + Map localParams = new HashMap<>(); Property localProperty = new Property(); localProperty.setProp("local_param"); localProperty.setDirect(Direct.IN); @@ -94,8 +93,16 @@ public class ParamsTest { localProperty.setValue("${global_param}"); localParams.put("local_param", localProperty); + Map varPoolParams = new HashMap<>(); + Property varProperty = new Property(); + varProperty.setProp("local_param"); + varProperty.setDirect(Direct.IN); + varProperty.setType(DataType.VARCHAR); + varProperty.setValue("${global_param}"); + varPoolParams.put("varPool", varProperty); + Map paramsMap = ParamUtils.convert(globalParams, globalParamsMap, - localParams, CommandType.START_PROCESS, new Date()); + localParams,varPoolParams, CommandType.START_PROCESS, new Date()); logger.info(JSONUtils.toJsonString(paramsMap)); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index ec0807cbdd..5d10f849c5 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -70,8 +70,7 @@ public class TaskResponseServiceTest { "ids", 22, "varPol", - channel, - "[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]"); + channel); taskInstance = new TaskInstance(); taskInstance.setId(22); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java index 220cce5d18..a9a1b89371 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java @@ -17,23 +17,24 @@ package org.apache.dolphinscheduler.server.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.utils.*; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test ParamUtils @@ -49,8 +50,11 @@ public class ParamUtilsTest { public Map localParams = new HashMap<>(); + public Map varPoolParams = new HashMap<>(); + /** * Init params + * * @throws Exception */ @Before @@ -71,6 +75,14 @@ public class ParamUtilsTest { localProperty.setType(DataType.VARCHAR); localProperty.setValue("${global_param}"); localParams.put("local_param", localProperty); + + Property varProperty = new Property(); + varProperty.setProp("local_param"); + varProperty.setDirect(Direct.IN); + varProperty.setType(DataType.VARCHAR); + varProperty.setValue("${global_param}"); + varPoolParams.put("varPool", varProperty); + } /** @@ -80,16 +92,20 @@ public class ParamUtilsTest { public void testConvert() { //The expected value - String expected = "{\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; + String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; //The expected value when globalParams is null but localParams is not null - String expected1 = "{\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; + String expected1 = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; //Define expected date , the month is 0-base Calendar calendar = Calendar.getInstance(); - calendar.set(2019,11,30); + calendar.set(2019, 11, 30); Date date = calendar.getTime(); //Invoke convert - Map paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, CommandType.START_PROCESS, date); + Map paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, varPoolParams,CommandType.START_PROCESS, date); String result = JSONUtils.toJsonString(paramsMap); assertEquals(expected, result); @@ -101,12 +117,12 @@ public class ParamUtilsTest { } //Invoke convert with null globalParams - Map paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams, CommandType.START_PROCESS, date); + Map paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams,varPoolParams, CommandType.START_PROCESS, date); String result1 = JSONUtils.toJsonString(paramsMap1); assertEquals(expected1, result1); //Null check, invoke convert with null globalParams and null localParams - Map paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, CommandType.START_PROCESS, date); + Map paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, varPoolParams,CommandType.START_PROCESS, date); assertNull(paramsMap2); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index c3f6478ce7..53c60d7601 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -67,8 +67,6 @@ public class TaskCallbackServiceTest { taskCallbackService.sendAck(1, ackCommand.convert2Command()); TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); - String result = responseCommand.getResult(); - responseCommand.setResult("return string"); taskCallbackService.sendResult(1, responseCommand.convert2Command()); Stopper.stop(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java index 36a758ab1f..25fa22a734 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.log.LogClientService; @@ -53,6 +54,8 @@ public class TaskKillProcessorTest { private TaskKillProcessor taskKillProcessor; + private WorkerManagerThread workerManager; + private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; private Channel channel; @@ -85,6 +88,8 @@ public class TaskKillProcessorTest { PowerMockito.mockStatic(LoggerUtils.class); PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)).thenReturn(taskCallbackService); PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig); + WorkerManagerThread workerManager = PowerMockito.mock(WorkerManagerThread.class); + PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager); PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager); PowerMockito.doNothing().when(taskCallbackService).addRemoteChannel(anyInt(), any()); PowerMockito.whenNew(NettyRemoteChannel.class).withAnyArguments().thenReturn(null); @@ -102,7 +107,6 @@ public class TaskKillProcessorTest { @Test public void testProcess() { - PowerMockito.when(taskExecutionContextCacheManager.getByTaskInstanceId(1)).thenReturn(taskExecutionContext); taskKillProcessor.process(channel, command); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index b3517d3cd4..bbc131dc95 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -71,8 +71,6 @@ public class WorkerRegistryClientTest { @Before public void before() { - - given(registryClient.getWorkerPath()).willReturn("/nodes/worker"); given(workerConfig.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1")); //given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1")); //scheduleAtFixedRate diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java index e1764620da..0c337e0823 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -165,7 +166,7 @@ public class TaskExecuteThreadTest { @Override public AbstractParameters getParameters() { - return null; + return new SqlParameters(); } @Override diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java deleted file mode 100644 index 348775cf67..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.task; - -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({SpringApplicationContext.class}) -public class AbstractCommandExecutorTest { - - private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutorTest.class); - - private ShellCommandExecutor shellCommandExecutor; - - @Before - public void before() throws Exception { - System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); - shellCommandExecutor = new ShellCommandExecutor(null); - } - - @Test - public void testSetTaskResultString() { - shellCommandExecutor.setTaskResultString("shellReturn"); - } - - @Test - public void testGetTaskResultString() { - logger.info(shellCommandExecutor.getTaskResultString()); - } -} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java index 892299cab0..574f0e796c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java @@ -110,17 +110,6 @@ public class ShellTaskReturnTest { } catch (Exception e) { e.printStackTrace(); } - shellTask.setResult("shell return string"); - logger.info("shell return string:{}", shellTask.getResultString()); } - @Test - public void testSetTaskResultString() { - shellCommandExecutor.setTaskResultString("shellReturn"); - } - - @Test - public void testGetTaskResultString() { - logger.info(shellCommandExecutor.getTaskResultString()); - } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java index 46d2713522..cb8a189396 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java @@ -164,11 +164,6 @@ public class TaskManagerTest { definedParams.put("time_gb", "2020-12-16 00:00:00"); taskExecutionContext.setDefinedParams(definedParams); ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService); - shellTask.setResultString("shell return"); - String shellReturn = shellTask.getResultString(); - shellTask.init(); - shellTask.setResult(shellReturn); - Assert.assertSame(shellReturn, "shell return"); } @Test diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java new file mode 100644 index 0000000000..f384f83a7d --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task; + +import static org.junit.Assert.assertNotNull; + +import org.apache.dolphinscheduler.common.enums.DataType; +import org.apache.dolphinscheduler.common.enums.Direct; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.shell.ShellParameters; +import org.apache.dolphinscheduler.common.task.sql.SqlParameters; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * shell task return test. + */ +@RunWith(PowerMockRunner.class) +public class TaskParamsTest { + private static final Logger logger = LoggerFactory.getLogger(TaskParamsTest.class); + + @Test + public void testDealOutParam() { + List properties = new ArrayList<>(); + Property property = new Property(); + property.setProp("test1"); + property.setDirect(Direct.OUT); + property.setType(DataType.VARCHAR); + property.setValue("test1"); + properties.add(property); + + ShellParameters shellParameters = new ShellParameters(); + String resultShell = "key1=value1$VarPoolkey2=value2"; + shellParameters.varPool = new ArrayList<>(); + shellParameters.setLocalParams(properties); + shellParameters.dealOutParam(resultShell); + assertNotNull(shellParameters.getVarPool().get(0)); + + String sqlResult = "[{\"id\":6,\"test1\":\"6\"},{\"id\":70002,\"test1\":\"+1\"}]"; + SqlParameters sqlParameters = new SqlParameters(); + String sqlResult1 = "[{\"id\":6,\"test1\":\"6\"}]"; + sqlParameters.setLocalParams(properties); + sqlParameters.varPool = new ArrayList<>(); + sqlParameters.dealOutParam(sqlResult1); + assertNotNull(sqlParameters.getVarPool().get(0)); + + property.setType(DataType.LIST); + properties.clear(); + properties.add(property); + sqlParameters.setLocalParams(properties); + sqlParameters.dealOutParam(sqlResult); + assertNotNull(sqlParameters.getVarPool().get(0)); + } + +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java index bd02f61f17..c992a0a610 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -62,7 +62,6 @@ public class ShellTaskTest { System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); - shellCommandExecutor.setTaskResultString("shellReturn"); taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskName("kris test"); @@ -85,6 +84,7 @@ public class ShellTaskTest { taskExecutionContext.setTenantCode("roo"); taskExecutionContext.setScheduleTime(new Date()); taskExecutionContext.setQueue("default"); + taskExecutionContext.setVarPool("[{\"direct\":\"IN\",\"prop\":\"test\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); taskExecutionContext.setTaskParams( "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":" + @@ -105,6 +105,7 @@ public class ShellTaskTest { public void testComplementData() throws Exception { shellTask = new ShellTask(taskExecutionContext, logger); shellTask.init(); + shellTask.getParameters().setVarPool(taskExecutionContext.getVarPool()); shellCommandExecutor.isSuccessOfYarnState(new ArrayList<>()); shellCommandExecutor.isSuccessOfYarnState(null); PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); @@ -116,16 +117,9 @@ public class ShellTaskTest { taskExecutionContext.setCmdTypeIfComplement(0); shellTask = new ShellTask(taskExecutionContext, logger); shellTask.init(); + shellTask.getParameters().setVarPool(taskExecutionContext.getVarPool()); PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult); shellTask.handle(); } - @Test - public void testSetResult() { - shellTask = new ShellTask(taskExecutionContext, logger); - shellTask.init(); - String r = "return"; - shellTask.setResult(r); - } - } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java index 4dbcb2b048..1266fd1bcc 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java @@ -89,6 +89,7 @@ public class SqlTaskTest { PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date()); PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000); PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx"); + PowerMockito.when(taskExecutionContext.getVarPool()).thenReturn("[{\"direct\":\"IN\",\"prop\":\"test\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS); @@ -98,6 +99,7 @@ public class SqlTaskTest { PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao()); alertClientService = PowerMockito.mock(AlertClientService.class); sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService); + sqlTask.getParameters().setVarPool(taskExecutionContext.getVarPool()); sqlTask.init(); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 69f3c16e08..ddead50852 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -134,7 +134,6 @@ import org.springframework.transaction.annotation.Transactional; import com.cronutils.model.Cron; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; /** @@ -1585,71 +1584,51 @@ public class ProcessService { int processId, String appIds, int taskInstId, - String varPool, - String result) { + String varPool) { taskInstance.setPid(processId); taskInstance.setAppLink(appIds); taskInstance.setState(state); taskInstance.setEndTime(endTime); taskInstance.setVarPool(varPool); - changeOutParam(result, taskInstance); + changeOutParam(taskInstance); saveTaskInstance(taskInstance); } - public void changeOutParam(String result, TaskInstance taskInstance) { - if (StringUtils.isEmpty(result)) { + /** + * for show in page of taskInstance + * @param taskInstance + */ + public void changeOutParam(TaskInstance taskInstance) { + if (StringUtils.isEmpty(taskInstance.getVarPool())) { return; } - List> workerResultParam = getListMapByString(result); - if (CollectionUtils.isEmpty(workerResultParam)) { + List properties = JSONUtils.toList(taskInstance.getVarPool(), Property.class); + if (CollectionUtils.isEmpty(properties)) { return; } //if the result more than one line,just get the first . - Map row = workerResultParam.get(0); - if (row == null || row.size() == 0) { - return; - } Map taskParams = JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class); Object localParams = taskParams.get(LOCAL_PARAMS); if (localParams == null) { return; } - ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId()); - List params4Property = JSONUtils.toList(processInstance.getGlobalParams(), Property.class); - Map allParamMap = params4Property.stream().collect(Collectors.toMap(Property::getProp, Property -> Property)); - List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + Map outProperty = new HashMap<>(); + for (Property info : properties) { + if (info.getDirect() == Direct.OUT) { + outProperty.put(info.getProp(), info.getValue()); + } + } for (Property info : allParam) { if (info.getDirect() == Direct.OUT) { String paramName = info.getProp(); - Property property = allParamMap.get(paramName); - if (property == null) { - continue; - } - String value = String.valueOf(row.get(paramName)); - if (StringUtils.isNotEmpty(value)) { - property.setValue(value); - info.setValue(value); - } + info.setValue(outProperty.get(paramName)); } } taskParams.put(LOCAL_PARAMS, allParam); taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams)); - String params4ProcessString = JSONUtils.toJsonString(params4Property); - int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId()); - logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId()); } - public List> getListMapByString(String json) { - List> allParams = new ArrayList<>(); - ArrayNode paramsByJson = JSONUtils.parseArray(json); - Iterator listIterator = paramsByJson.iterator(); - while (listIterator.hasNext()) { - Map param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class); - allParams.add(param); - } - return allParams; - } /** * convert integer list to string list diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java index 143821fe41..119a60ad58 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java @@ -18,6 +18,8 @@ package org.apache.dolphinscheduler.service.registry; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.utils.PropertyUtils; @@ -57,16 +59,7 @@ public class RegistryCenter { */ protected static String NODES; - /** - * master path - */ - protected static String MASTER_PATH = "/nodes/master"; - private RegistryPluginManager registryPluginManager; - /** - * worker path - */ - protected static String WORKER_PATH = "/nodes/worker"; protected static final String EMPTY = ""; @@ -113,8 +106,9 @@ public class RegistryCenter { * init nodes */ private void initNodes() { - persist(MASTER_PATH, EMPTY); - persist(WORKER_PATH, EMPTY); + persist(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY); + persist(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY); + persist(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY); } /** @@ -205,15 +199,6 @@ public class RegistryCenter { return stoppable; } - /** - * get master path - * - * @return master path - */ - public String getMasterPath() { - return MASTER_PATH; - } - /** * whether master path * @@ -221,16 +206,7 @@ public class RegistryCenter { * @return result */ public boolean isMasterPath(String path) { - return path != null && path.contains(MASTER_PATH); - } - - /** - * get worker path - * - * @return worker path - */ - public String getWorkerPath() { - return WORKER_PATH; + return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_MASTERS); } /** @@ -240,7 +216,7 @@ public class RegistryCenter { * @return worker group path */ public String getWorkerGroupPath(String workerGroup) { - return WORKER_PATH + "/" + workerGroup; + return REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup; } /** @@ -250,7 +226,7 @@ public class RegistryCenter { * @return result */ public boolean isWorkerPath(String path) { - return path != null && path.contains(WORKER_PATH); + return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_WORKERS); } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java index d7afcd9000..d9ebf18492 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java @@ -22,6 +22,8 @@ import static org.apache.dolphinscheduler.common.Constants.COLON; import static org.apache.dolphinscheduler.common.Constants.DELETE_OP; import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE; @@ -344,7 +346,7 @@ public class RegistryClient extends RegistryCenter { * @return master nodes */ public Set getMasterNodesDirectly() { - List masters = getChildrenKeys(MASTER_PATH); + List masters = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS); return new HashSet<>(masters); } @@ -354,7 +356,7 @@ public class RegistryClient extends RegistryCenter { * @return master nodes */ public Set getWorkerNodesDirectly() { - List workers = getChildrenKeys(WORKER_PATH); + List workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS); return new HashSet<>(workers); } @@ -364,7 +366,7 @@ public class RegistryClient extends RegistryCenter { * @return worker group nodes */ public Set getWorkerGroupDirectly() { - List workers = getChildrenKeys(getWorkerPath()); + List workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS); return new HashSet<>(workers); } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 79be9ec1ae..e00cf879e2 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -509,4 +509,19 @@ public class ProcessServiceTest { Mockito.verify(commandMapper, Mockito.times(1)).insert(command); } + @Test + public void testChangeOutParam() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setProcessInstanceId(62); + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(62); + taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); + taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select id from tb_test limit 1\"," + + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\"," + + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}]," + + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," + + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}"); + processService.changeOutParam(taskInstance); + } + } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/DolphinPluginLoader.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/DolphinPluginLoader.java index 5d2ad5642d..66244b2ddd 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/DolphinPluginLoader.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/DolphinPluginLoader.java @@ -104,7 +104,7 @@ public class DolphinPluginLoader { private void loadPlugin(URLClassLoader pluginClassLoader) { ServiceLoader serviceLoader = ServiceLoader.load(DolphinSchedulerPlugin.class, pluginClassLoader); List plugins = ImmutableList.copyOf(serviceLoader); - Preconditions.checkState(!plugins.isEmpty(), "No service providers the plugin {}", DolphinSchedulerPlugin.class.getName()); + Preconditions.checkState(!plugins.isEmpty(), "No service providers the plugin %s", DolphinSchedulerPlugin.class.getName()); for (DolphinSchedulerPlugin plugin : plugins) { logger.info("Installing {}", plugin.getClass().getName()); for (AbstractDolphinPluginManager dolphinPluginManager : dolphinPluginManagerList) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/createWarning.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/createWarning.vue index 28960935d2..a2edec8f32 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/createWarning.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/createWarning.vue @@ -109,6 +109,10 @@ this.$message.warning(`${i18n.$t('Please enter group name')}`) return false } + if (this.alertInstanceIds) { + this.$message.warning(`${i18n.$t('Select Alarm plugin instance')}`) + return false + } return true }, _submit () { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningInstance/_source/createWarningInstance.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningInstance/_source/createWarningInstance.vue index 07c9faa8ae..6e09918eed 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningInstance/_source/createWarningInstance.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningInstance/_source/createWarningInstance.vue @@ -111,6 +111,10 @@ this.$message.warning(`${i18n.$t('Please enter group name')}`) return false } + if (!this.pluginDefineId) { + this.$message.warning(`${i18n.$t('Select Alarm plugin')}`) + return false + } return true }, // Select plugin diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/user/pages/account/_source/info.vue b/dolphinscheduler-ui/src/js/conf/home/pages/user/pages/account/_source/info.vue index ce9e1fb9a7..5f30ed8fb3 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/user/pages/account/_source/info.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/user/pages/account/_source/info.vue @@ -80,7 +80,7 @@