diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java new file mode 100644 index 0000000000..087bb80ccb --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.dependent; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.DependentRelation; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.DateInterval; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.utils.DependentUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * dependent item execute + */ +public class DependentExecute { + /** + * process service + */ + private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + + /** + * depend item list + */ + private List dependItemList; + + /** + * dependent relation + */ + private DependentRelation relation; + + /** + * depend result + */ + private DependResult modelDependResult = DependResult.WAITING; + + /** + * depend result map + */ + private Map dependResultMap = new HashMap<>(); + + /** + * logger + */ + private Logger logger = LoggerFactory.getLogger(DependentExecute.class); + + /** + * constructor + * @param itemList item list + * @param relation relation + */ + public DependentExecute(List itemList, DependentRelation relation){ + this.dependItemList = itemList; + this.relation = relation; + } + + /** + * get dependent item for one dependent item + * @param dependentItem dependent item + * @param currentTime current time + * @return DependResult + */ + private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){ + List dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue()); + return calculateResultForTasks(dependentItem, dateIntervals ); + } + + /** + * calculate dependent result for one dependent item. + * @param dependentItem dependent item + * @param dateIntervals date intervals + * @return dateIntervals + */ + private DependResult calculateResultForTasks(DependentItem dependentItem, + List dateIntervals) { + + DependResult result = DependResult.FAILED; + for(DateInterval dateInterval : dateIntervals){ + ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), + dateInterval); + if(processInstance == null){ + logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}", + dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() ); + return DependResult.FAILED; + } + // need to check workflow for updates, so get all task and check the task state + if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){ + List taskNodes = + processService.getTaskNodeListByDefinitionId(dependentItem.getDefinitionId()); + + if(taskNodes != null && taskNodes.size() > 0){ + List results = new ArrayList<>(); + DependResult tmpResult = DependResult.FAILED; + for(TaskNode taskNode:taskNodes){ + tmpResult = getDependTaskResult(taskNode.getName(),processInstance); + if(DependResult.FAILED == tmpResult){ + break; + }else{ + results.add(getDependTaskResult(taskNode.getName(),processInstance)); + } + } + + if(DependResult.FAILED == tmpResult){ + result = DependResult.FAILED; + }else if(results.contains(DependResult.WAITING)){ + result = DependResult.WAITING; + }else{ + result = DependResult.SUCCESS; + } + }else{ + result = DependResult.FAILED; + } + }else{ + result = getDependTaskResult(dependentItem.getDepTasks(),processInstance); + } + if(result != DependResult.SUCCESS){ + break; + } + } + return result; + } + + /** + * get depend task result + * @param taskName + * @param processInstance + * @return + */ + private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) { + DependResult result = DependResult.FAILED; + TaskInstance taskInstance = null; + List taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); + + for(TaskInstance task : taskInstanceList){ + if(task.getName().equals(taskName)){ + taskInstance = task; + break; + } + } + + if(taskInstance == null){ + // cannot find task in the process instance + // maybe because process instance is running or failed. + result = getDependResultByProcessStateWhenTaskNull(processInstance.getState()); + }else{ + result = getDependResultByState(taskInstance.getState()); + } + + return result; + } + + /** + * find the last one process instance that : + * 1. manual run and finish between the interval + * 2. schedule run and schedule time between the interval + * @param definitionId definition id + * @param dateInterval date interval + * @return ProcessInstance + */ + private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) { + + ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval); + if(runningProcess != null){ + return runningProcess; + } + + ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval( + definitionId, dateInterval + ); + + ProcessInstance lastManualProcess = processService.findLastManualProcessInterval( + definitionId, dateInterval + ); + + if(lastManualProcess ==null){ + return lastSchedulerProcess; + } + if(lastSchedulerProcess == null){ + return lastManualProcess; + } + + return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime()))? + lastManualProcess : lastSchedulerProcess; + } + + /** + * get dependent result by task/process instance state + * @param state state + * @return DependResult + */ + private DependResult getDependResultByState(ExecutionStatus state) { + + if(state.typeIsRunning() + || state == ExecutionStatus.SUBMITTED_SUCCESS + || state == ExecutionStatus.WAITTING_THREAD){ + return DependResult.WAITING; + }else if(state.typeIsSuccess()){ + return DependResult.SUCCESS; + }else{ + return DependResult.FAILED; + } + } + + /** + * get dependent result by task instance state when task instance is null + * @param state state + * @return DependResult + */ + private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) { + + if(state.typeIsRunning() + || state == ExecutionStatus.SUBMITTED_SUCCESS + || state == ExecutionStatus.WAITTING_THREAD){ + return DependResult.WAITING; + }else{ + return DependResult.FAILED; + } + } + + /** + * judge depend item finished + * @param currentTime current time + * @return boolean + */ + public boolean finish(Date currentTime){ + if(modelDependResult == DependResult.WAITING){ + modelDependResult = getModelDependResult(currentTime); + return false; + } + return true; + } + + /** + * get model depend result + * @param currentTime current time + * @return DependResult + */ + public DependResult getModelDependResult(Date currentTime){ + + List dependResultList = new ArrayList<>(); + + for(DependentItem dependentItem : dependItemList){ + DependResult dependResult = getDependResultForItem(dependentItem, currentTime); + if(dependResult != DependResult.WAITING){ + dependResultMap.put(dependentItem.getKey(), dependResult); + } + dependResultList.add(dependResult); + } + modelDependResult = DependentUtils.getDependResultForRelation( + this.relation, dependResultList + ); + return modelDependResult; + } + + /** + * get dependent item result + * @param item item + * @param currentTime current time + * @return DependResult + */ + private DependResult getDependResultForItem(DependentItem item, Date currentTime){ + String key = item.getKey(); + if(dependResultMap.containsKey(key)){ + return dependResultMap.get(key); + } + return getDependentResultForItem(item, currentTime); + } + + public Map getDependResultMap(){ + return dependResultMap; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java new file mode 100644 index 0000000000..b426d32502 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.dependent; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.DependentUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; + +import java.util.*; + +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; + +/** + * Dependent Task + */ +public class DependentTask extends AbstractTask { + + /** + * dependent task list + */ + private List dependentTaskList = new ArrayList<>(); + + /** + * depend item result map + * save the result to log file + */ + private Map dependResultMap = new HashMap<>(); + + /** + * dependent parameters + */ + private DependentParameters dependentParameters; + + /** + * dependent date + */ + private Date dependentDate; + + /** + * process service + */ + private ProcessService processService; + + /** + * constructor + * @param props props + * @param logger logger + */ + public DependentTask(TaskProps props, Logger logger) { + super(props, logger); + } + + @Override + public void init(){ + logger.info("dependent task initialize"); + + this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(), + DependentParameters.class); + if(dependentParameters != null){ + for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ + this.dependentTaskList.add(new DependentExecute( + taskModel.getDependItemList(), taskModel.getRelation())); + } + } + + this.processService = SpringApplicationContext.getBean(ProcessService.class); + + if(taskProps.getScheduleTime() != null){ + this.dependentDate = taskProps.getScheduleTime(); + }else{ + this.dependentDate = taskProps.getTaskStartTime(); + } + + } + + @Override + public void handle() throws Exception { + // set the name of the current thread + String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); + Thread.currentThread().setName(threadLoggerInfoName); + + try{ + TaskInstance taskInstance = null; + while(Stopper.isRunning()){ + taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstId()); + + if(taskInstance == null){ + exitStatusCode = -1; + break; + } + + if(taskInstance.getState() == ExecutionStatus.KILL){ + this.cancel = true; + } + + if(this.cancel || allDependentTaskFinish()){ + break; + } + + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + } + + if(cancel){ + exitStatusCode = Constants.EXIT_CODE_KILL; + }else{ + DependResult result = getTaskDependResult(); + exitStatusCode = (result == DependResult.SUCCESS) ? + Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; + } + }catch (Exception e){ + logger.error(e.getMessage(),e); + exitStatusCode = -1; + throw e; + } + } + + /** + * get dependent result + * @return DependResult + */ + private DependResult getTaskDependResult(){ + List dependResultList = new ArrayList<>(); + for(DependentExecute dependentExecute : dependentTaskList){ + DependResult dependResult = dependentExecute.getModelDependResult(dependentDate); + dependResultList.add(dependResult); + } + DependResult result = DependentUtils.getDependResultForRelation( + this.dependentParameters.getRelation(), dependResultList + ); + return result; + } + + /** + * judge all dependent tasks finish + * @return whether all dependent tasks finish + */ + private boolean allDependentTaskFinish(){ + boolean finish = true; + for(DependentExecute dependentExecute : dependentTaskList){ + for(Map.Entry entry: dependentExecute.getDependResultMap().entrySet()) { + if(!dependResultMap.containsKey(entry.getKey())){ + dependResultMap.put(entry.getKey(), entry.getValue()); + //save depend result to log + logger.info("dependent item complete {} {},{}", + DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString()); + } + } + if(!dependentExecute.finish(dependentDate)){ + finish = false; + } + } + return finish; + } + + + @Override + public void cancelApplication(boolean cancelApplication) throws Exception { + // cancel process + this.cancel = true; + } + + @Override + public AbstractParameters getParameters() { + return null; + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java index 17bd552bc3..c13a7647fe 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java @@ -17,47 +17,106 @@ package org.apache.dolphinscheduler.server.worker.task.dependent; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.DateInterval; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +@RunWith(MockitoJUnitRunner.Silent.class) public class DependentTaskTest { private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); + private ProcessService processService; + private ApplicationContext applicationContext; + + + @Before + public void before() throws Exception{ + processService = Mockito.mock(ProcessService.class); + Mockito.when(processService + .findLastRunningProcess(4,DependentDateUtils.getTodayInterval(new Date()).get(0))) + .thenReturn(findLastProcessInterval()); + Mockito.when(processService + .getTaskNodeListByDefinitionId(4)) + .thenReturn(getTaskNodes()); + Mockito.when(processService + .findValidTaskListByProcessId(11)) + .thenReturn(getTaskInstances()); + + Mockito.when(processService + .findTaskInstanceById(252612)) + .thenReturn(getTaskInstance()); + applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + } @Test - public void testDependInit() throws Exception{ + public void test() throws Exception{ TaskProps taskProps = new TaskProps(); - - String dependString = "{\n" + - "\"dependTaskList\":[\n" + - " {\n" + - " \"dependItemList\":[\n" + - " {\n" + - " \"definitionId\": 101,\n" + - " \"depTasks\": \"ALL\",\n" + - " \"cycle\": \"day\",\n" + - " \"dateValue\": \"last1Day\"\n" + - " }\n" + - " ],\n" + - " \"relation\": \"AND\"\n" + - " }\n" + - " ],\n" + - "\"relation\":\"OR\"\n" + - "}"; - - taskProps.setTaskInstanceId(252612); + String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; + taskProps.setTaskInstId(252612); taskProps.setDependence(dependString); -// DependentTask dependentTask = new DependentTask(taskProps, logger); -// dependentTask.init(); -// dependentTask.handle(); -// Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_FAILURE ); + taskProps.setTaskStartTime(new Date()); + DependentTask dependentTask = new DependentTask(taskProps, logger); + dependentTask.init(); + dependentTask.handle(); + Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_SUCCESS ); } + private ProcessInstance findLastProcessInterval(){ + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(11); + processInstance.setState(ExecutionStatus.SUCCESS); + return processInstance; + } + + private List getTaskNodes(){ + List list = new ArrayList<>(); + TaskNode taskNode = new TaskNode(); + taskNode.setName("C"); + taskNode.setType("SQL"); + list.add(taskNode); + return list; + } + private List getTaskInstances(){ + List list = new ArrayList<>(); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setName("C"); + taskInstance.setState(ExecutionStatus.SUCCESS); + taskInstance.setDependency("1231"); + list.add(taskInstance); + return list; + } + + private TaskInstance getTaskInstance(){ + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(252612); + taskInstance.setName("C"); + taskInstance.setState(ExecutionStatus.SUCCESS); + return taskInstance; + } } \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 0aa6a3f604..d2a13aebab 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -230,6 +230,30 @@ public class ProcessService { return processInstanceMapper.queryDetailById(processId); } + /** + * get task node list by definitionId + * @param defineId + * @return + */ + public List getTaskNodeListByDefinitionId(Integer defineId){ + ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); + if (processDefinition == null) { + logger.info("process define not exists"); + return null; + } + + String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + + //process data check + if (null == processData) { + logger.error("process data is null"); + return null; + } + + return processData.getTasks(); + } + /** * find process instance by id * @param processId processId diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index ac04e2187d..abf04571fd 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -218,6 +218,19 @@ }, mixins: [disabledState], methods: { + /** + * getResourceId + */ + marjarId(name) { + this.store.dispatch('dag/getResourceId',{ + type: 'FILE', + fullName: '/'+name + }).then(res => { + this.mainJar = res.id + }).catch(e => { + this.$message.error(e.msg || '') + }) + }, /** * return localParams */ @@ -366,7 +379,13 @@ // Non-null objects represent backfill if (!_.isEmpty(o)) { this.mainClass = o.params.mainClass || '' - this.mainJar = o.params.mainJar && o.params.mainJar.id ? o.params.mainJar.id : '' + if(o.params.mainJar.res) { + this.marjarId(o.params.mainJar.res) + } else if(o.params.mainJar.res=='') { + this.mainJar = '' + } else { + this.mainJar = o.params.mainJar.id || '' + } this.deployMode = o.params.deployMode || '' this.slot = o.params.slot || 1 this.taskManager = o.params.taskManager || '2' @@ -380,8 +399,19 @@ // backfill resourceList let resourceList = o.params.resourceList || [] if (resourceList.length) { - this.resourceList = _.map(resourceList, v => { - return v.id + _.map(resourceList, v => { + if(v.res) { + this.store.dispatch('dag/getResourceId',{ + type: 'FILE', + fullName: '/'+v.res + }).then(res => { + this.resourceList.push(res.id) + }).catch(e => { + this.$message.error(e.msg || '') + }) + } else { + this.resourceList.push(v.id) + } }) this.cacheResourceList = resourceList } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue index 3691985d7d..121147d7e5 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue @@ -142,6 +142,19 @@ }, mixins: [disabledState], methods: { + /** + * getResourceId + */ + marjarId(name) { + this.store.dispatch('dag/getResourceId',{ + type: 'FILE', + fullName: '/'+name + }).then(res => { + this.mainJar = res.id + }).catch(e => { + this.$message.error(e.msg || '') + }) + }, /** * return localParams */ @@ -245,7 +258,13 @@ // Non-null objects represent backfill if (!_.isEmpty(o)) { this.mainClass = o.params.mainClass || '' - this.mainJar = o.params.mainJar.id || '' + if(o.params.mainJar.res) { + this.marjarId(o.params.mainJar.res) + } else if(o.params.mainJar.res=='') { + this.mainJar = '' + } else { + this.mainJar = o.params.mainJar.id || '' + } this.mainArgs = o.params.mainArgs || '' this.others = o.params.others this.programType = o.params.programType || 'JAVA' @@ -253,8 +272,19 @@ // backfill resourceList let resourceList = o.params.resourceList || [] if (resourceList.length) { - this.resourceList = _.map(resourceList, v => { - return v.id + _.map(resourceList, v => { + if(v.res) { + this.store.dispatch('dag/getResourceId',{ + type: 'FILE', + fullName: '/'+v.res + }).then(res => { + this.resourceList.push(res.id) + }).catch(e => { + this.$message.error(e.msg || '') + }) + } else { + this.resourceList.push(v.id) + } }) this.cacheResourceList = resourceList } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue index e8e7e58771..28fded41d3 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue @@ -161,8 +161,7 @@ resourceList: _.map(this.resourceList, v => { return {id: v} }), - localParams: this.localParams, - rawScript: editor ? editor.getValue() : '' + localParams: this.localParams } } }, @@ -176,8 +175,19 @@ // backfill resourceList let resourceList = o.params.resourceList || [] if (resourceList.length) { - this.resourceList = _.map(resourceList, v => { - return v.id + _.map(resourceList, v => { + if(v.res) { + this.store.dispatch('dag/getResourceId',{ + type: 'FILE', + fullName: '/'+v.res + }).then(res => { + this.resourceList.push(res.id) + }).catch(e => { + this.$message.error(e.msg || '') + }) + } else { + this.resourceList.push(v.id) + } }) this.cacheResourceList = resourceList } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue index 952fd3a38d..bee095acd5 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue @@ -95,7 +95,7 @@ return { label: node.name } - }, + } } }, mixins: [disabledState], @@ -221,8 +221,7 @@ resourceList: _.map(this.resourceList, v => { return {id: v} }), - localParams: this.localParams, - rawScript: editor ? editor.getValue() : '' + localParams: this.localParams } } }, @@ -238,12 +237,22 @@ // backfill resourceList let resourceList = o.params.resourceList || [] if (resourceList.length) { - this.resourceList = _.map(resourceList, v => { - return v.id + _.map(resourceList, v => { + if(v.res) { + this.store.dispatch('dag/getResourceId',{ + type: 'FILE', + fullName: '/'+v.res + }).then(res => { + this.resourceList.push(res.id) + }).catch(e => { + this.$message.error(e.msg || '') + }) + } else { + this.resourceList.push(v.id) + } }) this.cacheResourceList = resourceList } - // backfill localParams let localParams = o.params.localParams || [] if (localParams.length) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue index de7a21d990..61662d96bc 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue @@ -261,6 +261,19 @@ }, mixins: [disabledState], methods: { + /** + * getResourceId + */ + marjarId(name) { + this.store.dispatch('dag/getResourceId',{ + type: 'FILE', + fullName: '/'+name + }).then(res => { + this.mainJar = res.id + }).catch(e => { + this.$message.error(e.msg || '') + }) + }, /** * return localParams */ @@ -414,7 +427,13 @@ // Non-null objects represent backfill if (!_.isEmpty(o)) { this.mainClass = o.params.mainClass || '' - this.mainJar = o.params.mainJar && o.params.mainJar.id ? o.params.mainJar.id : '' + if(o.params.mainJar.res) { + this.marjarId(o.params.mainJar.res) + } else if(o.params.mainJar.res=='') { + this.mainJar = '' + } else { + this.mainJar = o.params.mainJar.id || '' + } this.deployMode = o.params.deployMode || '' this.driverCores = o.params.driverCores || 1 this.driverMemory = o.params.driverMemory || '512M' @@ -429,9 +448,20 @@ // backfill resourceList let resourceList = o.params.resourceList || [] if (resourceList.length) { - this.resourceList = _.map(resourceList, v => { - return v.id - }) + _.map(resourceList, v => { + if(v.res) { + this.store.dispatch('dag/getResourceId',{ + type: 'FILE', + fullName: '/'+v.res + }).then(res => { + this.resourceList.push(res.id) + }).catch(e => { + this.$message.error(e.msg || '') + }) + } else { + this.resourceList.push(v.id) + } + }) this.cacheResourceList = resourceList } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index c5992de20f..513b8ec6dd 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -22,7 +22,7 @@ - + {{$t('#')}} diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js index 3d6746f147..fd1b200fa6 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js @@ -715,5 +715,14 @@ export default { reject(e) }) }) - } -} + }, + getResourceId ({ state }, payload) { + return new Promise((resolve, reject) => { + io.get(`resources/queryResource`, payload, res => { + resolve(res.data) + }).catch(e => { + reject(e) + }) + }) + }, +} \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 80cb2fc783..c25b021c0d 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -115,7 +115,7 @@ export default { 'SQL Type': 'sql类型', 'Title': '主题', 'Please enter the title of email': '请输入邮件主题', - 'Table': '表格', + 'Table': '表名', 'Attachment': '附件', 'SQL Parameter': 'sql参数', 'SQL Statement': 'sql语句', diff --git a/pom.xml b/pom.xml index 1bad82a31e..6eec7c6e6e 100644 --- a/pom.xml +++ b/pom.xml @@ -683,183 +683,70 @@ ${maven-surefire-plugin.version} - **/alert/template/AlertTemplateFactoryTest.java - **/alert/template/impl/DefaultHTMLTemplateTest.java - **/alert/utils/EnterpriseWeChatUtilsTest.java - **/alert/utils/ExcelUtilsTest.java - **/alert/utils/FuncUtilsTest.java - **/alert/utils/JSONUtilsTest.java - **/alert/utils/MailUtilsTest.java - **/alert/utils/PropertyUtilsTest.java - - - - - - - - - - - - - - - - - - - - **/api/dto/resources/filter/ResourceFilterTest.java - **/api/dto/resources/visitor/ResourceTreeVisitorTest.java - **/api/enums/testGetEnum.java - **/api/enums/StatusTest.java - **/api/interceptor/LoginHandlerInterceptorTest.java - **/api/security/PasswordAuthenticatorTest.java - **/api/security/SecurityConfigTest.java + **/common/utils/*.java + **/common/utils/process/ProcessBuilderForWin32Test.java + **/common/utils/process/ProcessEnvironmentForWin32Test.java + **/common/utils/process/ProcessImplForWin32Test.java + **/common/log/*.java + **/common/threadutils/*.java + **/common/graph/*.java + **/common/queue/*.java + **/common/task/FlinkParametersTest.java + **/common/task/SqoopParameterEntityTest.java + **/api/utils/CheckUtilsTest.java + **/api/utils/FileUtilsTest.java + **/api/utils/FourLetterWordTest.java + **/api/utils/exportprocess/DataSourceParamTest.java + **/api/utils/exportprocess/DependentParamTest.java + **/api/enums/*.java + **/api/controller/DataSourceControllerTest.java **/api/service/AccessTokenServiceTest.java + **/api/service/QueueServiceTest.java + **/api/service/MonitorServiceTest.java + **/api/service/SessionServiceTest.java + **/api/service/UsersServiceTest.java + **/api/service/TenantServiceTest.java + **/api/service/WorkerGroupServiceTest.java **/api/service/AlertGroupServiceTest.java - **/api/service/BaseDAGServiceTest.java - **/api/service/BaseServiceTest.java - **/api/service/DataAnalysisServiceTest.java - **/api/service/DataSourceServiceTest.java + **/api/service/UserAlertGroupServiceTest.java + **/api/service/ProjectServiceTest.java + **/api/service/ProcessDefinitionServiceTest.java + **/api/service/UdfFuncServiceTest.java + **/api/service/ResourcesServiceTest.java **/api/service/ExecutorService2Test.java - **/api/service/ExecutorServiceTest.java + **/api/service/BaseServiceTest.java + **/api/service/BaseDAGServiceTest.java **/api/service/LoggerServiceTest.java - **/api/service/MonitorServiceTest.java - **/api/service/ProcessDefinitionServiceTest.java + **/api/service/DataAnalysisServiceTest.java **/api/service/ProcessInstanceServiceTest.java - **/api/service/ProjectServiceTest.java - **/api/service/QueueServiceTest.java - **/api/service/ResourcesServiceTest.java - **/api/service/SchedulerServiceTest.java - **/api/service/SessionServiceTest.java **/api/service/TaskInstanceServiceTest.java - **/api/service/TenantServiceTest.java - **/api/service/UdfFuncServiceTest.java - **/api/service/UserAlertGroupServiceTest.java - **/api/service/UsersServiceTest.java - **/api/service/WorkerGroupServiceTest.java - **/api/utils/exportprocess/DataSourceParamTest.java - **/api/utils/exportprocess/DependentParamTest.java - **/api/utils/CheckUtilsTest.java - **/api/utils/FileUtilsTest.java - **/api/utils/FourLetterWordMainTest.java - **/api/utils/ZookeeperMonitorUtilsTest.java - **/api/utils/CheckUtilsTest.java - **/api/utils/CheckUtilsTest.java - **/api/HttpClientTest.java - **/common/graph/DAGTest.java - **/common/os/OshiTest.java - **/common/os/OSUtilsTest.java - **/common/shell/ShellExecutorTest.java - **/common/task/EntityTestUtils.java - **/common/task/FlinkParametersTest.java - **/common/task/SqoopParameterEntityTest.java - **/common/threadutils/ThreadPoolExecutorsTest.java - **/common/threadutils/ThreadUtilsTest.java - **/common/utils/placeholder/TimePlaceholderUtilsTest.java - **/common/utils/process/ProcessBuilderForWin32Test.java - **/common/utils/process/ProcessEnvironmentForWin32Test.java - **/common/utils/process/ProcessImplForWin32Test.java - **/common/utils/CollectionUtilsTest.java - **/common/utils/CommonUtilsTest.java - **/common/utils/DateUtilsTest.java - **/common/utils/DependentUtilsTest.java - **/common/utils/EncryptionUtilsTest.java - **/common/utils/FileUtilsTest.java - **/common/utils/HadoopUtilsTest.java - **/common/utils/IpUtilsTest.java - **/common/utils/JSONUtilsTest.java - **/common/utils/LoggerUtilsTest.java - **/common/utils/OSUtilsTest.java - **/common/utils/ParameterUtilsTest.java - **/common/utils/PreconditionsTest.java - **/common/utils/PropertyUtilsTest.java - **/common/utils/SchemaUtilsTest.java - **/common/utils/ScriptRunnerTest.java - **/common/utils/SensitiveLogUtilsTest.java - **/common/utils/StringTest.java - **/common/utils/StringUtilsTest.java - **/common/utils/TaskParametersUtilsTest.java - **/common/ConstantsTest.java + **/alert/utils/ExcelUtilsTest.java + **/alert/utils/FuncUtilsTest.java + **/alert/utils/JSONUtilsTest.java + **/alert/utils/PropertyUtilsTest.java + **/server/utils/SparkArgsUtilsTest.java + **/server/utils/FlinkArgsUtilsTest.java + **/server/utils/ParamUtilsTest.java + **/server/master/MasterExecThreadTest.java **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/AlertMapperTest.java **/dao/mapper/CommandMapperTest.java - **/dao/mapper/ConnectionFactoryTest.java - **/dao/mapper/DataSourceMapperTest.java - **/dao/mapper/DataSourceUserMapperTest.java - **/dao/mapper/ErrorCommandMapperTest.java - **/dao/mapper/ProcessDefinitionMapperTest.java - **/dao/mapper/ProcessInstanceMapMapperTest.java - **/dao/mapper/ProcessInstanceMapperTest.java - **/dao/mapper/ProjectMapperTest.java - **/dao/mapper/ProjectUserMapperTest.java - **/dao/mapper/QueueMapperTest.java - **/dao/mapper/ResourceMapperTest.java - **/dao/mapper/ResourceUserMapperTest.java - **/dao/mapper/ScheduleMapperTest.java - **/dao/mapper/SessionMapperTest.java - **/dao/mapper/TaskInstanceMapperTest.java - **/dao/mapper/TenantMapperTest.java - **/dao/mapper/UdfFuncMapperTest.java - **/dao/mapper/UDFUserMapperTest.java - **/dao/mapper/UserAlertGroupMapperTest.java - **/dao/mapper/UserMapperTest.java - **/dao/mapper/WorkerGroupMapperTest.java + **/dao/entity/TaskInstanceTest.java + **/dao/cron/CronUtilsTest.java **/dao/utils/DagHelperTest.java - **/dao/AlertDaoTest.java - **/remote/FastJsonSerializerTest.java - **/remote/NettyRemotingClientTest.java - **/remote/ResponseFutureTest.java - **/server/log/MasterLogFilterTest.java - **/server/log/SensitiveDataConverterTest.java - **/server/log/TaskLogDiscriminatorTest.java - **/server/log/TaskLogFilterTest.java - **/server/log/WorkerLogFilterTest.java - **/server/master/dispatch/executor/NettyExecutorManagerTest.java - **/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java - **/server/master/dispatch/host/assign/RandomSelectorTest.java - **/server/master/dispatch/host/assign/RoundRobinSelectorTest.java - **/server/master/dispatch/host/RoundRobinHostManagerTest.java - **/server/master/dispatch/ExecutorDispatcherTest.java - **/server/master/register/MasterRegistryTest.java - **/server/master/AlertManagerTest.java - **/server/master/MasterCommandTest.java - **/server/master/MasterExecThreadTest.java - **/server/master/ParamsTest.java - **/server/register/ZookeeperNodeManagerTest.java - **/server/utils/DataxUtilsTest.java - **/server/utils/ExecutionContextTestUtils.java - **/server/utils/FlinkArgsUtilsTest.java - **/server/utils/ParamUtilsTest.java - **/server/utils/ProcessUtilsTest.java - **/server/utils/SparkArgsUtilsTest.java - **/server/worker/processor/TaskCallbackServiceTest.java - **/server/worker/registry/WorkerRegistryTest.java - **/server/worker/shell/ShellCommandExecutorTest.java - **/server/worker/sql/SqlExecutorTest.java - **/server/worker/task/datax/DataxTaskTest.java - **/server/worker/task/dependent/DependentTaskTest.java - **/server/worker/task/spark/SparkTaskTest.java - **/server/worker/task/EnvFileTest.java + **/alert/template/AlertTemplateFactoryTest.java + **/alert/template/impl/DefaultHTMLTemplateTest.java **/server/worker/task/datax/DataxTaskTest.java - **/server/worker/task/dependent/DependentTaskTest.java **/server/worker/task/shell/ShellTaskTest.java - **/server/worker/task/spark/SparkTaskTest.java **/server/worker/task/sqoop/SqoopTaskTest.java - **/server/worker/EnvFileTest.java - **/service/quartz/cron/CronUtilsTest.java + **/server/utils/DataxUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java - **/service/zk/ZKServerTest.java - **/service/queue/TaskUpdateQueueTest.java + **/dao/datasource/BaseDataSourceTest.java + **/alert/utils/MailUtilsTest.java + **/dao/AlertDaoTest.java - -Xmx2048m - 3 - 3 - true