Browse Source

[cherry-pick-7221/7232] pick some api bug (#7234)

* fix worker group display (#7208)

* [Bug] [API] queryProcessDefinitionByCode bug (#7221)

* pick-7232/7221
2.0.7-release
JinYong Li 3 years ago committed by GitHub
parent
commit
d7eab830a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
  2. 31
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  3. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
  4. 16
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  5. 12
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java

@ -148,6 +148,7 @@ public class SchedulerController extends BaseController {
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long"),
})
@PutMapping("/{id}")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_SCHEDULE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateSchedule(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@ -330,11 +331,12 @@ public class SchedulerController extends BaseController {
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", type = "WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", type = "FailureStrategy"),
@ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", type = "Priority"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long"),
})
@PutMapping("/{code}")
@PutMapping("/update/{code}")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_SCHEDULE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateScheduleByProcessDefinitionCode(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,

31
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -206,6 +206,19 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
int delete = taskDefinitionMapper.deleteByCode(taskCode);
if (delete > 0) {
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!processTaskRelationList.isEmpty()) {
int deleteRelation = 0;
int deleteRelationLog = 0;
for (ProcessTaskRelation processTaskRelation : taskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
deleteRelation += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteRelationLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
}
if ((deleteRelation & deleteRelationLog) == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
}
}
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
@ -492,6 +505,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* @param releaseState releaseState
* @return update result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> releaseTaskDefinition(User loginUser, long projectCode, long code, ReleaseState releaseState) {
Project project = projectMapper.queryByCode(projectCode);
@ -510,11 +524,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
return result;
}
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion());
if (taskDefinitionLog == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
return result;
}
switch (releaseState) {
case OFFLINE:
taskDefinition.setFlag(Flag.NO);
taskDefinitionMapper.updateById(taskDefinition);
taskDefinitionLog.setFlag(Flag.NO);
break;
case ONLINE:
String resourceIds = taskDefinition.getResourceIds();
@ -530,13 +548,18 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
taskDefinition.setFlag(Flag.YES);
taskDefinitionMapper.updateById(taskDefinition);
taskDefinitionLog.setFlag(Flag.NO);
break;
default:
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
return result;
}
int update = taskDefinitionMapper.updateById(taskDefinition);
int updateLog = taskDefinitionLogMapper.updateById(taskDefinitionLog);
if ((update == 0 && updateLog == 1) || (update == 1 && updateLog == 0)) {
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
putMsg(result, Status.SUCCESS);
return result;
}

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java

@ -177,6 +177,9 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
private Set<Long> querySourceWorkFlowCodes(long projectCode, long workFlowCode, List<TaskDefinition> taskDefinitionList) {
Set<Long> sourceWorkFlowCodes = new HashSet<>();
if (taskDefinitionList == null || taskDefinitionList.isEmpty()) {
return sourceWorkFlowCodes;
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList);
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (taskDefinitionLog.getProjectCode() == projectCode) {

16
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -207,9 +207,10 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version))
.thenReturn(new TaskDefinitionLog());
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
Mockito.when(taskDefinitionMapper.queryByCode(taskCode))
.thenReturn(new TaskDefinition());
.thenReturn(taskDefinition);
Mockito.when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.switchVersion(loginUser, projectCode, taskCode, version);
@ -306,7 +307,16 @@ public class TaskDefinitionServiceImplTest {
// process definition offline
putMsg(result, Status.SUCCESS);
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(new TaskDefinition());
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setVersion(1);
taskDefinition.setCode(taskCode);
String params = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
taskDefinition.setTaskParams(params);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinition.getVersion())).thenReturn(taskDefinitionLog);
Map<String, Object> offlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS));

12
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -126,9 +126,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.facebook.presto.jdbc.internal.guava.collect.Lists;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
/**
* process relative dao that some mappers in this.
@ -330,7 +330,7 @@ public class ProcessService {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(defineCode);
if (processDefinition == null) {
logger.error("process define not exists");
return new ArrayList<>();
return Lists.newArrayList();
}
List<ProcessTaskRelationLog> processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
@ -339,8 +339,11 @@ public class ProcessService {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
}
}
if (taskDefinitionSet.isEmpty()) {
return Lists.newArrayList();
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
return new ArrayList<>(taskDefinitionLogs);
return Lists.newArrayList(taskDefinitionLogs);
}
/**
@ -2367,6 +2370,9 @@ public class ProcessService {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
}
}
if (taskDefinitionSet.isEmpty()) {
return Lists.newArrayList();
}
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}

Loading…
Cancel
Save