|
|
|
@ -16,7 +16,6 @@
|
|
|
|
|
*/ |
|
|
|
|
package org.apache.dolphinscheduler.api.service; |
|
|
|
|
|
|
|
|
|
import com.alibaba.druid.pool.DruidDataSource; |
|
|
|
|
import com.alibaba.fastjson.JSONArray; |
|
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
|
import org.apache.dolphinscheduler.api.ApiApplicationServer; |
|
|
|
@ -29,9 +28,7 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
|
|
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.*; |
|
|
|
|
import org.apache.dolphinscheduler.dao.mapper.*; |
|
|
|
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
|
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService; |
|
|
|
|
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; |
|
|
|
|
import org.apache.http.entity.ContentType; |
|
|
|
|
import org.json.JSONException; |
|
|
|
|
import org.junit.Assert; |
|
|
|
@ -41,12 +38,8 @@ import org.mockito.InjectMocks;
|
|
|
|
|
import org.mockito.Mock; |
|
|
|
|
import org.mockito.Mockito; |
|
|
|
|
import org.mockito.junit.MockitoJUnitRunner; |
|
|
|
|
import org.quartz.Scheduler; |
|
|
|
|
import org.skyscreamer.jsonassert.JSONAssert; |
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
import org.springframework.boot.test.context.SpringBootTest; |
|
|
|
|
import org.springframework.context.ApplicationContext; |
|
|
|
|
import org.springframework.mock.web.MockMultipartFile; |
|
|
|
|
import org.springframework.web.multipart.MultipartFile; |
|
|
|
|
|
|
|
|
@ -59,7 +52,6 @@ import java.util.*;
|
|
|
|
|
@RunWith(MockitoJUnitRunner.Silent.class) |
|
|
|
|
@SpringBootTest(classes = ApiApplicationServer.class) |
|
|
|
|
public class ProcessDefinitionServiceTest { |
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceTest.class); |
|
|
|
|
|
|
|
|
|
@InjectMocks |
|
|
|
|
ProcessDefinitionService processDefinitionService; |
|
|
|
@ -79,8 +71,7 @@ public class ProcessDefinitionServiceTest {
|
|
|
|
|
@Mock |
|
|
|
|
private ScheduleMapper scheduleMapper; |
|
|
|
|
|
|
|
|
|
@Mock |
|
|
|
|
private WorkerGroupMapper workerGroupMapper; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Mock |
|
|
|
|
private ProcessService processService; |
|
|
|
@ -347,7 +338,7 @@ public class ProcessDefinitionServiceTest {
|
|
|
|
|
|
|
|
|
|
//release error code
|
|
|
|
|
Map<String, Object> failRes = processDefinitionService.releaseProcessDefinition(loginUser, "project_test1", |
|
|
|
|
46, 2); |
|
|
|
|
46, 2); |
|
|
|
|
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS)); |
|
|
|
|
|
|
|
|
|
//FIXME has function exit code 1 when exception
|
|
|
|
@ -530,7 +521,6 @@ public class ProcessDefinitionServiceTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testExportProcessMetaDataStr() { |
|
|
|
|
Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(getSchedulerList()); |
|
|
|
|
Mockito.when(workerGroupMapper.selectById(-1)).thenReturn(null); |
|
|
|
|
|
|
|
|
|
ProcessDefinition processDefinition = getProcessDefinition(); |
|
|
|
|
processDefinition.setProcessDefinitionJson(sqlDependentJson); |
|
|
|
@ -573,17 +563,14 @@ public class ProcessDefinitionServiceTest {
|
|
|
|
|
|
|
|
|
|
WorkerGroup workerGroup = new WorkerGroup(); |
|
|
|
|
workerGroup.setName("ds-test-workergroup"); |
|
|
|
|
workerGroup.setId(2); |
|
|
|
|
List<WorkerGroup> workerGroups = new ArrayList<>(); |
|
|
|
|
workerGroups.add(workerGroup); |
|
|
|
|
Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(workerGroups); |
|
|
|
|
|
|
|
|
|
processMetaCron.setScheduleWorkerGroupName("ds-test"); |
|
|
|
|
int insertFlagWorker = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, |
|
|
|
|
processDefinitionName, processDefinitionId); |
|
|
|
|
Assert.assertEquals(0, insertFlagWorker); |
|
|
|
|
|
|
|
|
|
Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(null); |
|
|
|
|
int workerNullFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, |
|
|
|
|
processDefinitionName, processDefinitionId); |
|
|
|
|
Assert.assertEquals(0, workerNullFlag); |
|
|
|
@ -659,7 +646,7 @@ public class ProcessDefinitionServiceTest {
|
|
|
|
|
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-4")).thenReturn(null); |
|
|
|
|
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "testProject")).thenReturn(shellDefinition2); |
|
|
|
|
|
|
|
|
|
processDefinitionService.importSubProcess(loginUser,testProject,jsonArray,subProcessIdMap); |
|
|
|
|
processDefinitionService.importSubProcess(loginUser,testProject, jsonArray, subProcessIdMap); |
|
|
|
|
|
|
|
|
|
String correctSubJson = jsonArray.toString(); |
|
|
|
|
|
|
|
|
@ -667,60 +654,32 @@ public class ProcessDefinitionServiceTest {
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void testCreateProcess() throws IOException{ |
|
|
|
|
|
|
|
|
|
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; |
|
|
|
|
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; |
|
|
|
|
|
|
|
|
|
String projectName = "test"; |
|
|
|
|
String name = "dag_test"; |
|
|
|
|
String description = "desc test"; |
|
|
|
|
String connects = "[]"; |
|
|
|
|
Map<String, Object> result = new HashMap<>(5); |
|
|
|
|
putMsg(result, Status.SUCCESS); |
|
|
|
|
result.put("processDefinitionId",1); |
|
|
|
|
|
|
|
|
|
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); |
|
|
|
|
User loginUser = new User(); |
|
|
|
|
loginUser.setId(1); |
|
|
|
|
loginUser.setUserType(UserType.ADMIN_USER); |
|
|
|
|
Project project = getProject(projectName); |
|
|
|
|
|
|
|
|
|
//project not found
|
|
|
|
|
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result); |
|
|
|
|
Mockito.when(processDefineMapper.insert(getProcessDefinition())).thenReturn(1); |
|
|
|
|
Map<String, Object> result1 = processDefinitionService.createProcessDefinition(loginUser,projectName,name,json,description,locations,connects); |
|
|
|
|
|
|
|
|
|
Assert.assertEquals(Status.SUCCESS,result1.get(Constants.STATUS)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void testImportProcessDefinitionById() throws IOException { |
|
|
|
|
|
|
|
|
|
String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; |
|
|
|
|
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; |
|
|
|
|
|
|
|
|
|
String projectName = "test"; |
|
|
|
|
String name = "dag_test"; |
|
|
|
|
String description = "desc test"; |
|
|
|
|
String connects = "[]"; |
|
|
|
|
Map<String, Object> result = new HashMap<>(5); |
|
|
|
|
putMsg(result, Status.SUCCESS); |
|
|
|
|
result.put("processDefinitionId",1); |
|
|
|
|
|
|
|
|
|
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); |
|
|
|
|
User loginUser = new User(); |
|
|
|
|
loginUser.setId(1); |
|
|
|
|
loginUser.setUserType(UserType.ADMIN_USER); |
|
|
|
|
Project project = getProject(projectName); |
|
|
|
|
|
|
|
|
|
//project not found
|
|
|
|
|
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result); |
|
|
|
|
Mockito.when(processDefineMapper.insert(getProcessDefinition())).thenReturn(1); |
|
|
|
|
Map<String, Object> result1 = processDefinitionService.createProcessDefinition(loginUser,projectName,name,json,description,locations,connects); |
|
|
|
|
|
|
|
|
|
String processJson = "[{\"processDefinitionConnects\":\"[]\",\"processDefinitionJson\":\"{\\\"tenantId\\\":-1,\\\"globalParams\\\":[],\\\"tasks\\\":[{\\\"workerGroupId\\\":-1,\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"aa=\\\\\\\"1234\\\\\\\"\\\\necho ${aa}\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"},\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"ssh_test1\\\",\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-36196\\\",\\\"desc\\\":\\\"\\\"}],\\\"timeout\\\":0}\",\"processDefinitionLocations\":\"{\\\"tasks-36196\\\":{\\\"name\\\":\\\"ssh_test1\\\",\\\"targetarr\\\":\\\"\\\",\\\"x\\\":141,\\\"y\\\":70}}\",\"processDefinitionName\":\"dag_test\",\"projectName\":\"test\"}]"; |
|
|
|
|
String processJson = "[{\"projectName\":\"testProject\",\"processDefinitionName\":\"shell-4\"," + |
|
|
|
|
"\"processDefinitionJson\":\"{\\\"tenantId\\\":1,\\\"globalParams\\\":[]," + |
|
|
|
|
"\\\"tasks\\\":[{\\\"workerGroupId\\\":\\\"default\\\",\\\"description\\\":\\\"\\\",\\\"runFlag\\\":\\\"NORMAL\\\"," + |
|
|
|
|
"\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"#!/bin/bash\\\\necho \\\\\\\"shell-4\\\\\\\"\\\"," + |
|
|
|
|
"\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}," + |
|
|
|
|
"\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-4\\\"," + |
|
|
|
|
"\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-84090\\\"}," + |
|
|
|
|
"{\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-5\\\",\\\"workerGroupId\\\":\\\"default\\\\," + |
|
|
|
|
"\\\"description\\\":\\\"\\\",\\\"dependence\\\":{},\\\"preTasks\\\":[\\\"shell-4\\\"],\\\"id\\\":\\\"tasks-87364\\\"," + |
|
|
|
|
"\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SUB_PROCESS\\\",\\\"params\\\":{\\\"processDefinitionId\\\":46}," + |
|
|
|
|
"\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}}],\\\"timeout\\\":0}\"," + |
|
|
|
|
"\"processDefinitionDescription\":\"\",\"processDefinitionLocations\":\"{\\\"tasks-84090\\\":{\\\"name\\\":\\\"shell-4\\\"," + |
|
|
|
|
"\\\"targetarr\\\":\\\"\\\",\\\"x\\\":128,\\\"y\\\":114},\\\"tasks-87364\\\":{\\\"name\\\":\\\"shell-5\\\"," + |
|
|
|
|
"\\\"targetarr\\\":\\\"tasks-84090\\\",\\\"x\\\":266,\\\"y\\\":115}}\"," + |
|
|
|
|
"\"processDefinitionConnects\":\"[{\\\"endPointSourceId\\\":\\\"tasks-84090\\\"," + |
|
|
|
|
"\\\"endPointTargetId\\\":\\\"tasks-87364\\\"}]\"}]"; |
|
|
|
|
|
|
|
|
|
String subProcessJson = "{\"globalParams\":[]," + |
|
|
|
|
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," + |
|
|
|
|
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," + |
|
|
|
|
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + |
|
|
|
|
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":\\\"default\\\\," + |
|
|
|
|
"\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; |
|
|
|
|
|
|
|
|
|
FileUtils.writeStringToFile(new File("/tmp/task.json"),processJson); |
|
|
|
|
|
|
|
|
@ -731,37 +690,45 @@ public class ProcessDefinitionServiceTest {
|
|
|
|
|
MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(), |
|
|
|
|
ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream); |
|
|
|
|
|
|
|
|
|
String currentProjectName = "test"; |
|
|
|
|
User loginUser = new User(); |
|
|
|
|
loginUser.setId(1); |
|
|
|
|
loginUser.setUserType(UserType.ADMIN_USER); |
|
|
|
|
|
|
|
|
|
String currentProjectName = "testProject"; |
|
|
|
|
Map<String, Object> result = new HashMap<>(5); |
|
|
|
|
putMsg(result, Status.SUCCESS, currentProjectName); |
|
|
|
|
|
|
|
|
|
ProcessDefinition shellDefinition2 = new ProcessDefinition(); |
|
|
|
|
shellDefinition2.setId(25); |
|
|
|
|
shellDefinition2.setName("B"); |
|
|
|
|
shellDefinition2.setProjectId(1); |
|
|
|
|
shellDefinition2.setId(46); |
|
|
|
|
shellDefinition2.setName("shell-5"); |
|
|
|
|
shellDefinition2.setProjectId(2); |
|
|
|
|
shellDefinition2.setProcessDefinitionJson(subProcessJson); |
|
|
|
|
|
|
|
|
|
Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName)); |
|
|
|
|
Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result); |
|
|
|
|
Mockito.when(processDefineMapper.queryByDefineId(25)).thenReturn(shellDefinition2); |
|
|
|
|
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2); |
|
|
|
|
|
|
|
|
|
//import process
|
|
|
|
|
Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName); |
|
|
|
|
|
|
|
|
|
Assert.assertEquals(Status.SUCCESS, importProcessResult.get(Constants.STATUS)); |
|
|
|
|
|
|
|
|
|
boolean delete = file.delete(); |
|
|
|
|
|
|
|
|
|
Assert.assertTrue(delete); |
|
|
|
|
|
|
|
|
|
String processMetaJson = "[]"; |
|
|
|
|
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); |
|
|
|
|
// Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName);
|
|
|
|
|
//
|
|
|
|
|
// Assert.assertEquals(Status.SUCCESS, importProcessResult.get(Constants.STATUS));
|
|
|
|
|
//
|
|
|
|
|
// boolean delete = file.delete();
|
|
|
|
|
//
|
|
|
|
|
processMetaJson = "[{\"scheduleWorkerGroupId\":-1}]"; |
|
|
|
|
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); |
|
|
|
|
// Assert.assertTrue(delete);
|
|
|
|
|
|
|
|
|
|
processMetaJson = "[{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}]"; |
|
|
|
|
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); |
|
|
|
|
// String processMetaJson = "";
|
|
|
|
|
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
|
|
|
|
|
//
|
|
|
|
|
// processMetaJson = "{\"scheduleWorkerGroupId\":-1}";
|
|
|
|
|
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
|
|
|
|
|
//
|
|
|
|
|
// processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}";
|
|
|
|
|
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
|
|
|
|
|
//
|
|
|
|
|
// processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}";
|
|
|
|
|
// improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
|
|
|
|
|
|
|
|
|
|
processMetaJson = "[{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}]"; |
|
|
|
|
importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -773,7 +740,7 @@ public class ProcessDefinitionServiceTest {
|
|
|
|
|
* @param processMetaJson process meta json |
|
|
|
|
* @throws IOException IO exception |
|
|
|
|
*/ |
|
|
|
|
private void importProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException { |
|
|
|
|
private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException { |
|
|
|
|
//check null
|
|
|
|
|
FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson); |
|
|
|
|
|
|
|
|
|