From 75440c618137f55bcce081b779ec84be45b740d9 Mon Sep 17 00:00:00 2001 From: zixi0825 <649790970@qq.com> Date: Fri, 24 Apr 2020 11:49:29 +0800 Subject: [PATCH 1/2] add license (#2511) Co-authored-by: sunchaohe --- kubernetes/dolphinscheduler/requirements.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/kubernetes/dolphinscheduler/requirements.yaml b/kubernetes/dolphinscheduler/requirements.yaml index a2fde1b40c..e219975995 100644 --- a/kubernetes/dolphinscheduler/requirements.yaml +++ b/kubernetes/dolphinscheduler/requirements.yaml @@ -1,3 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# dependencies: - name: postgresql version: 8.x.x From 64d335d9f2dc0cfb1a9ea8986437696c204b48ca Mon Sep 17 00:00:00 2001 From: zixi0825 <649790970@qq.com> Date: Fri, 24 Apr 2020 12:19:30 +0800 Subject: [PATCH 2/2] copy process feature #2412 (#2471) * Update pom.xml (#2467) * add copy process * add copy process 2 * add copy process 3 * add copy process 4 * add copy process 5 Co-authored-by: dailidong Co-authored-by: sunchaohe --- .../ProcessDefinitionController.java | 24 +++++++ .../dolphinscheduler/api/enums/Status.java | 16 ++--- .../api/service/ProcessDefinitionService.java | 44 +++++++++++- .../main/resources/i18n/messages.properties | 1 + .../resources/i18n/messages_en_US.properties | 1 + .../resources/i18n/messages_zh_CN.properties | 1 + .../ProcessDefinitionControllerTest.java | 15 ++++ .../service/ProcessDefinitionServiceTest.java | 43 +++++++++++ .../processor/TaskCallbackServiceTest.java | 71 +++++++++---------- .../definition/pages/list/_source/list.vue | 20 +++++- .../src/js/conf/home/store/dag/actions.js | 17 +++++ pom.xml | 1 + 12 files changed, 205 insertions(+), 49 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index b3fe1a9eef..4f3dafdf27 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -94,6 +94,30 @@ public class ProcessDefinitionController extends BaseController { return returnDataList(result); } + /** + * copy process definition + * + * @param loginUser login user + * @param projectName project name + * @param processId process definition id + * @return copy result code + */ + @ApiOperation(value = "copyProcessDefinition", notes= "COPY_PROCESS_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + }) + @PostMapping(value = "/copy") + @ResponseStatus(HttpStatus.OK) + @ApiException(COPY_PROCESS_DEFINITION_ERROR) + public Result copyProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, + @RequestParam(value = "processId", required = true) int processId) throws JsonProcessingException { + logger.info("copy process definition, login user:{}, project name:{}, process definition id:{}", + loginUser.getUserName(), projectName, processId); + Map result = processDefinitionService.copyProcessDefinition(loginUser, projectName, processId); + return returnDataList(result); + } + /** * verify process definition name unique * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 00665aae71..8c52dd4d50 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -168,15 +168,13 @@ public enum Status { PREVIEW_SCHEDULE_ERROR(10139,"preview schedule error", "预览调度配置错误"), PARSE_TO_CRON_EXPRESSION_ERROR(10140,"parse cron to cron expression error", "解析调度表达式错误"), SCHEDULE_START_TIME_END_TIME_SAME(10141,"The start time must not be the same as the end", "开始时间不能和结束时间一样"), - DELETE_TENANT_BY_ID_FAIL(100142,"delete tenant by id fail, for there are {0} process instances in executing using it", "删除租户失败,有[{0}]个运行中的工作流实例正在使用"), - DELETE_TENANT_BY_ID_FAIL_DEFINES(100143,"delete tenant by id fail, for there are {0} process definitions using it", "删除租户失败,有[{0}]个工作流定义正在使用"), - DELETE_TENANT_BY_ID_FAIL_USERS(100144,"delete tenant by id fail, for there are {0} users using it", "删除租户失败,有[{0}]个用户正在使用"), - - DELETE_WORKER_GROUP_BY_ID_FAIL(100145,"delete worker group by id fail, for there are {0} process instances in executing using it", "删除Worker分组失败,有[{0}]个运行中的工作流实例正在使用"), - - QUERY_WORKER_GROUP_FAIL(100146,"query worker group fail ", "查询worker分组失败"), - DELETE_WORKER_GROUP_FAIL(100147,"delete worker group fail ", "删除worker分组失败"), - + DELETE_TENANT_BY_ID_FAIL(10142,"delete tenant by id fail, for there are {0} process instances in executing using it", "删除租户失败,有[{0}]个运行中的工作流实例正在使用"), + DELETE_TENANT_BY_ID_FAIL_DEFINES(10143,"delete tenant by id fail, for there are {0} process definitions using it", "删除租户失败,有[{0}]个工作流定义正在使用"), + DELETE_TENANT_BY_ID_FAIL_USERS(10144,"delete tenant by id fail, for there are {0} users using it", "删除租户失败,有[{0}]个用户正在使用"), + DELETE_WORKER_GROUP_BY_ID_FAIL(10145,"delete worker group by id fail, for there are {0} process instances in executing using it", "删除Worker分组失败,有[{0}]个运行中的工作流实例正在使用"), + QUERY_WORKER_GROUP_FAIL(10146,"query worker group fail ", "查询worker分组失败"), + DELETE_WORKER_GROUP_FAIL(10147,"delete worker group fail ", "删除worker分组失败"), + COPY_PROCESS_DEFINITION_ERROR(10148,"copy process definition error", "复制工作流错误"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 4081cab732..14cadbf189 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -112,8 +112,13 @@ public class ProcessDefinitionService extends BaseDAGService { * @return create result code * @throws JsonProcessingException JsonProcessingException */ - public Map createProcessDefinition(User loginUser, String projectName, String name, - String processDefinitionJson, String desc, String locations, String connects) throws JsonProcessingException { + public Map createProcessDefinition(User loginUser, + String projectName, + String name, + String processDefinitionJson, + String desc, + String locations, + String connects) throws JsonProcessingException { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -281,6 +286,41 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } + /** + * copy process definition + * + * @param loginUser login user + * @param projectName project name + * @param processId process definition id + * @return copy result code + */ + public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException{ + + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); + return result; + } else { + return createProcessDefinition( + loginUser, + projectName, + processDefinition.getName()+"_copy_"+System.currentTimeMillis(), + processDefinition.getProcessDefinitionJson(), + processDefinition.getDescription(), + processDefinition.getLocations(), + processDefinition.getConnects()); + } + } + /** * update process definition * diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties index c4ca13168d..369e5e3c72 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties @@ -173,6 +173,7 @@ PROCESS_DEFINITION_ID=process definition id PROCESS_DEFINITION_IDS=process definition ids RELEASE_PROCESS_DEFINITION_NOTES=release process definition QUERY_PROCESS_DEFINITION_BY_ID_NOTES=query process definition by id +COPY_PROCESS_DEFINITION_NOTES=copy process definition QUERY_PROCESS_DEFINITION_LIST_NOTES=query process definition list QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=query process definition list paging QUERY_ALL_DEFINITION_LIST_NOTES=query all definition list diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index e0c1c286d1..92df742613 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -173,6 +173,7 @@ PROCESS_DEFINITION_ID=process definition id PROCESS_DEFINITION_IDS=process definition ids RELEASE_PROCESS_DEFINITION_NOTES=release process definition QUERY_PROCESS_DEFINITION_BY_ID_NOTES=query process definition by id +COPY_PROCESS_DEFINITION_NOTES=copy process definition QUERY_PROCESS_DEFINITION_LIST_NOTES=query process definition list QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=query process definition list paging QUERY_ALL_DEFINITION_LIST_NOTES=query all definition list diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index af7fde5068..3b427912b5 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -171,6 +171,7 @@ UPDATE_PROCESS_DEFINITION_NOTES=更新流程定义 PROCESS_DEFINITION_ID=流程定义ID RELEASE_PROCESS_DEFINITION_NOTES=发布流程定义 QUERY_PROCESS_DEFINITION_BY_ID_NOTES=查询流程定义通过流程定义ID +COPY_PROCESS_DEFINITION_NOTES=复制流程定义 QUERY_PROCESS_DEFINITION_LIST_NOTES=查询流程定义列表 QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=分页查询流程定义列表 QUERY_ALL_DEFINITION_LIST_NOTES=查询所有流程定义 diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index c028dd4167..a69df9744e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -174,6 +174,21 @@ public class ProcessDefinitionControllerTest{ Assert.assertEquals(Status.SUCCESS.getCode(),response.getCode().intValue()); } + @Test + public void testCopyProcessDefinition() throws Exception { + + String projectName = "test"; + int id = 1; + + Map result = new HashMap<>(5); + putMsg(result, Status.SUCCESS); + + Mockito.when(processDefinitionService.copyProcessDefinition(user, projectName,id)).thenReturn(result); + Result response = processDefinitionController.copyProcessDefinition(user, projectName,id); + + Assert.assertEquals(Status.SUCCESS.getCode(),response.getCode().intValue()); + } + @Test public void testQueryProcessDefinitionList() throws Exception { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index a0da2289dc..5a03cdb268 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -198,6 +198,47 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } + @Test + public void testCopyProcessDefinition() throws Exception{ + String projectName = "project_test1"; + Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); + + Project project = getProject(projectName); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + + Map result = new HashMap<>(5); + //project check auth success, instance not exist + putMsg(result, Status.SUCCESS, projectName); + Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result); + + ProcessDefinition definition = getProcessDefinition(); + definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); + definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"); + definition.setConnects("[]"); + //instance exit + Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition); + + Map createProcessResult = new HashMap<>(5); + putMsg(result, Status.SUCCESS); + + Mockito.when(processDefinitionService.createProcessDefinition( + loginUser, + definition.getProjectName(), + definition.getName(), + definition.getProcessDefinitionJson(), + definition.getDescription(), + definition.getLocations(), + definition.getConnects())).thenReturn(createProcessResult); + + Map successRes = processDefinitionService.copyProcessDefinition(loginUser, + "project_test1", 46); + + Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + } + @Test public void deleteProcessDefinitionByIdTest() throws Exception { String projectName = "project_test1"; @@ -770,12 +811,14 @@ public class ProcessDefinitionServiceTest { * @return ProcessDefinition */ private ProcessDefinition getProcessDefinition(){ + ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(46); processDefinition.setName("test_pdf"); processDefinition.setProjectId(2); processDefinition.setTenantId(1); processDefinition.setDescription(""); + return processDefinition; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index a0fee7c36e..78ba3a6b44 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -133,8 +133,6 @@ public class TaskCallbackServiceTest { nettyRemotingClient.close(); } - - @Test(expected = IllegalArgumentException.class) public void testSendAckWithIllegalArgumentException(){ TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); @@ -178,39 +176,40 @@ public class TaskCallbackServiceTest { } } - @Test(expected = IllegalStateException.class) - public void testSendAckWithIllegalStateException2(){ - masterRegistry.registry(); - final NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(30000); - NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); - nettyRemotingServer.start(); - - final NettyClientConfig clientConfig = new NettyClientConfig(); - NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); - Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); - taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); - channel.close(); - TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); - ackCommand.setTaskInstanceId(1); - ackCommand.setStartTime(new Date()); +// @Test(expected = IllegalStateException.class) +// public void testSendAckWithIllegalStateException2(){ +// masterRegistry.registry(); +// final NettyServerConfig serverConfig = new NettyServerConfig(); +// serverConfig.setListenPort(30000); +// NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); +// nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); +// nettyRemotingServer.start(); +// +// final NettyClientConfig clientConfig = new NettyClientConfig(); +// NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); +// Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); +// taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); +// channel.close(); +// TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); +// ackCommand.setTaskInstanceId(1); +// ackCommand.setStartTime(new Date()); +// +// nettyRemotingServer.close(); +// +// taskCallbackService.sendAck(1, ackCommand.convert2Command()); +// try { +// Thread.sleep(5000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// +// Stopper.stop(); +// +// try { +// Thread.sleep(5000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } - nettyRemotingServer.close(); - - taskCallbackService.sendAck(1, ackCommand.convert2Command()); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - Stopper.stop(); - - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue index 53939f3f7b..95bdc2930c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue @@ -46,7 +46,7 @@ {{$t('Timing state')}} - + {{$t('Operation')}} @@ -90,6 +90,7 @@ + v.code === code)[0].desc }, @@ -306,6 +307,21 @@ releaseState: 1 }) }, + /** + * copy + */ + _copyProcess (item) { + this.copyProcess({ + processId: item.id + }).then(res => { + this.$message.success(res.msg) + $('body').find('.tooltip.fade.top.in').remove() + this._onUpdate() + }).catch(e => { + this.$message.error(e.msg || '') + }) + }, + _export (item) { this.exportDefinition({ processDefinitionId: item.id, diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js index a63c9edb8c..f282c8e30a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js @@ -90,6 +90,7 @@ export default { }) }) }, + /** * Get process definition DAG diagram details */ @@ -127,6 +128,22 @@ export default { }) }) }, + +/** + * Get process definition DAG diagram details + */ + copyProcess ({ state }, payload) { + return new Promise((resolve, reject) => { + io.post(`projects/${state.projectName}/process/copy`, { + processId: payload.processId + }, res => { + resolve(res) + }).catch(e => { + reject(e) + }) + }) + }, + /** * Get the process instance DAG diagram details */ diff --git a/pom.xml b/pom.xml index 8f81e2aea9..0647724ed0 100644 --- a/pom.xml +++ b/pom.xml @@ -764,6 +764,7 @@ **/common/utils/HadoopUtilsTest.java **/common/utils/HttpUtilsTest.java **/common/ConstantsTest.java + **/common/utils/HadoopUtils.java **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/CommandMapperTest.java