Browse Source

[FIX-#4083][server]fix taskInstance state change error

Concurrent processing of ack message and result message causes the execution sequence to be wrong

# this close # 4083
pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
5355927b79
  1. 128
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  2. 68
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  3. 149
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

128
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
@ -33,10 +33,24 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.*; import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskPriority;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -44,21 +58,27 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
*/ */
@Component @Component
public class TaskPriorityQueueConsumer extends Thread{ public class TaskPriorityQueueConsumer extends Thread {
/** /**
* logger of TaskUpdateQueueConsumer * logger of TaskUpdateQueueConsumer
@ -91,7 +111,7 @@ public class TaskPriorityQueueConsumer extends Thread{
private MasterConfig masterConfig; private MasterConfig masterConfig;
@PostConstruct @PostConstruct
public void init(){ public void init() {
super.setName("TaskUpdateQueueConsumerThread"); super.setName("TaskUpdateQueueConsumerThread");
super.start(); super.start();
} }
@ -99,12 +119,12 @@ public class TaskPriorityQueueConsumer extends Thread{
@Override @Override
public void run() { public void run() {
List<String> failedDispatchTasks = new ArrayList<>(); List<String> failedDispatchTasks = new ArrayList<>();
while (Stopper.isRunning()){ while (Stopper.isRunning()) {
try { try {
int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
failedDispatchTasks.clear(); failedDispatchTasks.clear();
for(int i = 0; i < fetchTaskNum; i++){ for (int i = 0; i < fetchTaskNum; i++) {
if(taskPriorityQueue.size() <= 0){ if (taskPriorityQueue.size() <= 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue; continue;
} }
@ -112,62 +132,62 @@ public class TaskPriorityQueueConsumer extends Thread{
String taskPriorityInfo = taskPriorityQueue.take(); String taskPriorityInfo = taskPriorityQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
boolean dispatchResult = dispatch(taskPriority.getTaskId()); boolean dispatchResult = dispatch(taskPriority.getTaskId());
if(!dispatchResult){ if (!dispatchResult) {
failedDispatchTasks.add(taskPriorityInfo); failedDispatchTasks.add(taskPriorityInfo);
} }
} }
for(String dispatchFailedTask : failedDispatchTasks){ for (String dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask); taskPriorityQueue.put(dispatchFailedTask);
} }
}catch (Exception e){ } catch (Exception e) {
logger.error("dispatcher task error",e); logger.error("dispatcher task error", e);
} }
} }
} }
/** /**
* dispatch task * dispatch task
* *
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
* @return result * @return result
*/ */
private boolean dispatch(int taskInstanceId){ private boolean dispatch(int taskInstanceId) {
boolean result = false; boolean result = false;
try { try {
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
if (taskInstanceIsFinalState(taskInstanceId)){ if (taskInstanceIsFinalState(taskInstanceId)) {
// when task finish, ignore this task, there is no need to dispatch anymore // when task finish, ignore this task, there is no need to dispatch anymore
return true; return true;
}else{ } else {
result = dispatcher.dispatch(executionContext); result = dispatcher.dispatch(executionContext);
} }
} catch (ExecuteException e) { } catch (ExecuteException e) {
logger.error("dispatch error",e); logger.error("dispatch error", e);
} }
return result; return result;
} }
/** /**
* taskInstance is final state * taskInstance is final state
* successfailurekillstoppausethreadwaiting is final state * successfailurekillstoppausethreadwaiting is final state
*
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
* @return taskInstance is final state * @return taskInstance is final state
*/ */
public Boolean taskInstanceIsFinalState(int taskInstanceId){ public Boolean taskInstanceIsFinalState(int taskInstanceId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
return taskInstance.getState().typeIsFinished(); return taskInstance.getState().typeIsFinished();
} }
/** /**
* get TaskExecutionContext * get TaskExecutionContext
*
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
* @return TaskExecutionContext * @return TaskExecutionContext
*/ */
protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId){ protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId) {
TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId); TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId);
// task type // task type
@ -181,7 +201,7 @@ public class TaskPriorityQueueConsumer extends Thread{
// verify tenant is null // verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) { if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(ExecutionStatus.FAILURE, processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE,
taskInstance.getStartTime(), taskInstance.getStartTime(),
taskInstance.getHost(), taskInstance.getHost(),
null, null,
@ -196,35 +216,31 @@ public class TaskPriorityQueueConsumer extends Thread{
taskInstance.setExecutePath(getExecLocalPath(taskInstance)); taskInstance.setExecutePath(getExecLocalPath(taskInstance));
taskInstance.setResources(getResourceFullNames(taskNode)); taskInstance.setResources(getResourceFullNames(taskNode));
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext(); ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext();
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
// SQL task // SQL task
if (taskType == TaskType.SQL){ if (taskType == TaskType.SQL) {
setSQLTaskRelation(sqlTaskExecutionContext, taskNode); setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
} }
// DATAX task // DATAX task
if (taskType == TaskType.DATAX){ if (taskType == TaskType.DATAX) {
setDataxTaskRelation(dataxTaskExecutionContext, taskNode); setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
} }
// procedure task // procedure task
if (taskType == TaskType.PROCEDURE){ if (taskType == TaskType.PROCEDURE) {
setProcedureTaskRelation(procedureTaskExecutionContext, taskNode); setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
} }
if (taskType == TaskType.SQOOP){ if (taskType == TaskType.SQOOP) {
setSqoopTaskRelation(sqoopTaskExecutionContext,taskNode); setSqoopTaskRelation(sqoopTaskExecutionContext, taskNode);
} }
return TaskExecutionContextBuilder.get() return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
@ -238,6 +254,7 @@ public class TaskPriorityQueueConsumer extends Thread{
/** /**
* set procedure task relation * set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext * @param procedureTaskExecutionContext procedureTaskExecutionContext
* @param taskNode taskNode * @param taskNode taskNode
*/ */
@ -250,6 +267,7 @@ public class TaskPriorityQueueConsumer extends Thread{
/** /**
* set datax task relation * set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext * @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskNode taskNode * @param taskNode taskNode
*/ */
@ -259,23 +277,22 @@ public class TaskPriorityQueueConsumer extends Thread{
DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
if (dataSource != null) {
if (dataSource != null){
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
} }
if (dataTarget != null){ if (dataTarget != null) {
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
} }
} }
/** /**
* set sqoop task relation * set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
* @param taskNode taskNode * @param taskNode taskNode
*/ */
@ -290,13 +307,13 @@ public class TaskPriorityQueueConsumer extends Thread{
DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
if (dataSource != null){ if (dataSource != null) {
sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
} }
if (dataTarget != null){ if (dataTarget != null) {
sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
@ -306,6 +323,7 @@ public class TaskPriorityQueueConsumer extends Thread{
/** /**
* set SQL task relation * set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext * @param sqlTaskExecutionContext sqlTaskExecutionContext
* @param taskNode taskNode * @param taskNode taskNode
*/ */
@ -319,18 +337,18 @@ public class TaskPriorityQueueConsumer extends Thread{
boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
&& StringUtils.isNotEmpty(sqlParameters.getUdfs()); && StringUtils.isNotEmpty(sqlParameters.getUdfs());
if (udfTypeFlag){ if (udfTypeFlag) {
String[] udfFunIds = sqlParameters.getUdfs().split(","); String[] udfFunIds = sqlParameters.getUdfs().split(",");
int[] udfFunIdsArray = new int[udfFunIds.length]; int[] udfFunIdsArray = new int[udfFunIds.length];
for(int i = 0 ; i < udfFunIds.length;i++){ for (int i = 0; i < udfFunIds.length; i++) {
udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]); udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
} }
List<UdfFunc> udfFuncList = processService.queryUdfFunListByIds(udfFunIdsArray); List<UdfFunc> udfFuncList = processService.queryUdfFunListByIds(udfFunIdsArray);
Map<UdfFunc,String> udfFuncMap = new HashMap<>(); Map<UdfFunc, String> udfFuncMap = new HashMap<>();
for(UdfFunc udfFunc : udfFuncList) { for (UdfFunc udfFunc : udfFuncList) {
String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF); String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
udfFuncMap.put(udfFunc,tenantCode); udfFuncMap.put(udfFunc, tenantCode);
} }
sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap); sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
@ -342,22 +360,22 @@ public class TaskPriorityQueueConsumer extends Thread{
* *
* @return execute local path * @return execute local path
*/ */
private String getExecLocalPath(TaskInstance taskInstance){ private String getExecLocalPath(TaskInstance taskInstance) {
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(), taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(), taskInstance.getProcessInstance().getId(),
taskInstance.getId()); taskInstance.getId());
} }
/** /**
* whehter tenant is null * whehter tenant is null
*
* @param tenant tenant * @param tenant tenant
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return result * @return result
*/ */
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if(tenant == null){ if (tenant == null) {
logger.error("tenant not exists,process instance id : {},task instance id : {}", logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(), taskInstance.getProcessInstance().getId(),
taskInstance.getId()); taskInstance.getId());
@ -369,8 +387,8 @@ public class TaskPriorityQueueConsumer extends Thread{
/** /**
* get resource map key is full name and value is tenantCode * get resource map key is full name and value is tenantCode
*/ */
private Map<String,String> getResourceFullNames(TaskNode taskNode) { private Map<String, String> getResourceFullNames(TaskNode taskNode) {
Map<String,String> resourceMap = new HashMap<>(); Map<String, String> resourceMap = new HashMap<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) { if (baseParam != null) {
@ -382,7 +400,7 @@ public class TaskPriorityQueueConsumer extends Thread{
if (CollectionUtils.isNotEmpty(oldVersionResources)) { if (CollectionUtils.isNotEmpty(oldVersionResources)) {
oldVersionResources.forEach( oldVersionResources.forEach(
(t)->resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)) (t) -> resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
); );
} }
@ -395,7 +413,7 @@ public class TaskPriorityQueueConsumer extends Thread{
List<Resource> resources = processService.listResourceByIds(resourceIds); List<Resource> resources = processService.listResourceByIds(resourceIds);
resources.forEach( resources.forEach(
(t)->resourceMap.put(t.getFullName(),processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)) (t) -> resourceMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
); );
} }
} }

68
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.processor.queue; package org.apache.dolphinscheduler.server.master.processor.queue;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
@ -25,18 +24,22 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
/** /**
* task manager * task manager
*/ */
@ -65,21 +68,20 @@ public class TaskResponseService {
*/ */
private Thread taskResponseWorker; private Thread taskResponseWorker;
@PostConstruct @PostConstruct
public void start(){ public void start() {
this.taskResponseWorker = new TaskResponseWorker(); this.taskResponseWorker = new TaskResponseWorker();
this.taskResponseWorker.setName("TaskResponseWorker"); this.taskResponseWorker.setName("TaskResponseWorker");
this.taskResponseWorker.start(); this.taskResponseWorker.start();
} }
@PreDestroy @PreDestroy
public void stop(){ public void stop() {
this.taskResponseWorker.interrupt(); this.taskResponseWorker.interrupt();
if(!eventQueue.isEmpty()){ if (!eventQueue.isEmpty()) {
List<TaskResponseEvent> remainEvents = new ArrayList<>(eventQueue.size()); List<TaskResponseEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents); eventQueue.drainTo(remainEvents);
for(TaskResponseEvent event : remainEvents){ for (TaskResponseEvent event : remainEvents) {
this.persist(event); this.persist(event);
} }
} }
@ -90,16 +92,15 @@ public class TaskResponseService {
* *
* @param taskResponseEvent taskResponseEvent * @param taskResponseEvent taskResponseEvent
*/ */
public void addResponse(TaskResponseEvent taskResponseEvent){ public void addResponse(TaskResponseEvent taskResponseEvent) {
try { try {
eventQueue.put(taskResponseEvent); eventQueue.put(taskResponseEvent);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("put task : {} error :{}", taskResponseEvent,e); logger.error("put task : {} error :{}", taskResponseEvent, e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
/** /**
* task worker thread * task worker thread
*/ */
@ -108,16 +109,16 @@ public class TaskResponseService {
@Override @Override
public void run() { public void run() {
while (Stopper.isRunning()){ while (Stopper.isRunning()) {
try { try {
// if not task , blocking here // if not task , blocking here
TaskResponseEvent taskResponseEvent = eventQueue.take(); TaskResponseEvent taskResponseEvent = eventQueue.take();
persist(taskResponseEvent); persist(taskResponseEvent);
} catch (InterruptedException e){ } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
break; break;
} catch (Exception e){ } catch (Exception e) {
logger.error("persist task error",e); logger.error("persist task error", e);
} }
} }
logger.info("TaskResponseWorker stopped"); logger.info("TaskResponseWorker stopped");
@ -126,18 +127,19 @@ public class TaskResponseService {
/** /**
* persist taskResponseEvent * persist taskResponseEvent
*
* @param taskResponseEvent taskResponseEvent * @param taskResponseEvent taskResponseEvent
*/ */
private void persist(TaskResponseEvent taskResponseEvent){ private void persist(TaskResponseEvent taskResponseEvent) {
Event event = taskResponseEvent.getEvent(); Event event = taskResponseEvent.getEvent();
Channel channel = taskResponseEvent.getChannel(); Channel channel = taskResponseEvent.getChannel();
switch (event){ switch (event) {
case ACK: case ACK:
try { try {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null){ if (taskInstance != null && ExecutionStatus.RUNNING_EXECUTION.getCode() >= taskInstance.getState().getCode()) {
processService.changeTaskState(taskResponseEvent.getState(), processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getStartTime(), taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(), taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(), taskResponseEvent.getExecutePath(),
@ -145,19 +147,19 @@ public class TaskResponseService {
taskResponseEvent.getTaskInstanceId()); taskResponseEvent.getTaskInstanceId());
} }
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId()); DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command()); channel.writeAndFlush(taskAckCommand.convert2Command());
}catch (Exception e){ } catch (Exception e) {
logger.error("worker ack master error",e); logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1); DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskAckCommand.convert2Command()); channel.writeAndFlush(taskAckCommand.convert2Command());
} }
break; break;
case RESULT: case RESULT:
try { try {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null){ if (taskInstance != null) {
processService.changeTaskState(taskResponseEvent.getState(), processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(), taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(), taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(), taskResponseEvent.getAppIds(),
@ -166,11 +168,11 @@ public class TaskResponseService {
); );
} }
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId()); DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command()); channel.writeAndFlush(taskResponseCommand.convert2Command());
}catch (Exception e){ } catch (Exception e) {
logger.error("worker response master error",e); logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1); DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskResponseCommand.convert2Command()); channel.writeAndFlush(taskResponseCommand.convert2Command());
} }
break; break;

149
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -79,8 +79,6 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.commons.lang.ArrayUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Calendar; import java.util.Calendar;
@ -93,6 +91,8 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang.ArrayUtils;
import org.quartz.CronExpression; import org.quartz.CronExpression;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -111,7 +111,7 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(), ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(), ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(), ExecutionStatus.READY_PAUSE.ordinal(),
@ -158,6 +158,7 @@ public class ProcessService {
/** /**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction * handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
* @param logger logger * @param logger logger
* @param host host * @param host host
* @param validThreadNum validThreadNum * @param validThreadNum validThreadNum
@ -187,6 +188,7 @@ public class ProcessService {
/** /**
* save error command, and delete original command * save error command, and delete original command
*
* @param command command * @param command command
* @param message message * @param message message
*/ */
@ -199,6 +201,7 @@ public class ProcessService {
/** /**
* set process waiting thread * set process waiting thread
*
* @param command command * @param command command
* @param processInstance processInstance * @param processInstance processInstance
* @return process instance * @return process instance
@ -216,6 +219,7 @@ public class ProcessService {
/** /**
* check thread num * check thread num
*
* @param command command * @param command command
* @param validThreadNum validThreadNum * @param validThreadNum validThreadNum
* @return if thread is enough * @return if thread is enough
@ -227,6 +231,7 @@ public class ProcessService {
/** /**
* insert one command * insert one command
*
* @param command command * @param command command
* @return create result * @return create result
*/ */
@ -240,6 +245,7 @@ public class ProcessService {
/** /**
* find one command from queue list * find one command from queue list
*
* @return command * @return command
*/ */
public Command findOneCommand() { public Command findOneCommand() {
@ -248,15 +254,16 @@ public class ProcessService {
/** /**
* check the input command exists in queue list * check the input command exists in queue list
*
* @param command command * @param command command
* @return create command result * @return create command result
*/ */
public Boolean verifyIsNeedCreateCommand(Command command) { public Boolean verifyIsNeedCreateCommand(Command command) {
Boolean isNeedCreate = true; Boolean isNeedCreate = true;
Map<CommandType,Integer> cmdTypeMap = new HashMap<CommandType,Integer>(); Map<CommandType, Integer> cmdTypeMap = new HashMap<CommandType, Integer>();
cmdTypeMap.put(CommandType.REPEAT_RUNNING,1); cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS,1); cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS,1); cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
CommandType commandType = command.getCommandType(); CommandType commandType = command.getCommandType();
if (cmdTypeMap.containsKey(commandType)) { if (cmdTypeMap.containsKey(commandType)) {
@ -265,7 +272,7 @@ public class ProcessService {
List<Command> commands = commandMapper.selectList(null); List<Command> commands = commandMapper.selectList(null);
// for all commands // for all commands
for (Command tmpCommand:commands) { for (Command tmpCommand : commands) {
if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) { if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam()); ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
if (tempObj != null && processInstanceId == tempObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt()) { if (tempObj != null && processInstanceId == tempObj.path(CMDPARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
@ -280,6 +287,7 @@ public class ProcessService {
/** /**
* find process instance detail by id * find process instance detail by id
*
* @param processId processId * @param processId processId
* @return process instance * @return process instance
*/ */
@ -289,6 +297,7 @@ public class ProcessService {
/** /**
* get task node list by definitionId * get task node list by definitionId
*
* @param defineId * @param defineId
* @return * @return
*/ */
@ -313,6 +322,7 @@ public class ProcessService {
/** /**
* find process instance by id * find process instance by id
*
* @param processId processId * @param processId processId
* @return process instance * @return process instance
*/ */
@ -322,6 +332,7 @@ public class ProcessService {
/** /**
* find process define by id. * find process define by id.
*
* @param processDefinitionId processDefinitionId * @param processDefinitionId processDefinitionId
* @return process definition * @return process definition
*/ */
@ -331,6 +342,7 @@ public class ProcessService {
/** /**
* delete work process instance by id * delete work process instance by id
*
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @return delete process instance result * @return delete process instance result
*/ */
@ -340,6 +352,7 @@ public class ProcessService {
/** /**
* delete all sub process by parent instance id * delete all sub process by parent instance id
*
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @return delete all sub process instance result * @return delete all sub process instance result
*/ */
@ -358,9 +371,10 @@ public class ProcessService {
/** /**
* remove task log file * remove task log file
*
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
*/ */
public void removeTaskLogFile(Integer processInstanceId){ public void removeTaskLogFile(Integer processInstanceId) {
LogClientService logClient = null; LogClientService logClient = null;
@ -389,7 +403,7 @@ public class ProcessService {
// remove task log from loggerserver // remove task log from loggerserver
logClient.removeTaskLog(ip, port, taskLogPath); logClient.removeTaskLog(ip, port, taskLogPath);
} }
}finally { } finally {
if (logClient != null) { if (logClient != null) {
logClient.close(); logClient.close();
} }
@ -398,6 +412,7 @@ public class ProcessService {
/** /**
* calculate sub process number in the process define. * calculate sub process number in the process define.
*
* @param processDefinitionId processDefinitionId * @param processDefinitionId processDefinitionId
* @return process thread num count * @return process thread num count
*/ */
@ -409,6 +424,7 @@ public class ProcessService {
/** /**
* recursive query sub process definition id by parent id. * recursive query sub process definition id by parent id.
*
* @param parentId parentId * @param parentId parentId
* @param ids ids * @param ids ids
*/ */
@ -428,7 +444,7 @@ public class ProcessService {
if (parameterJson.get(CMDPARAM_SUB_PROCESS_DEFINE_ID) != null) { if (parameterJson.get(CMDPARAM_SUB_PROCESS_DEFINE_ID) != null) {
SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class); SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class);
ids.add(subProcessParam.getProcessDefinitionId()); ids.add(subProcessParam.getProcessDefinitionId());
recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(),ids); recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids);
} }
} }
@ -440,6 +456,7 @@ public class ProcessService {
* sub work process instance need not to create recovery command. * sub work process instance need not to create recovery command.
* create recovery waiting thread command and delete origin command at the same time. * create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time * if the recovery command is exists, only update the field update_time
*
* @param originCommand originCommand * @param originCommand originCommand
* @param processInstance processInstance * @param processInstance processInstance
*/ */
@ -491,6 +508,7 @@ public class ProcessService {
/** /**
* get schedule time from command * get schedule time from command
*
* @param command command * @param command command
* @param cmdParam cmdParam map * @param cmdParam cmdParam map
* @return date * @return date
@ -507,6 +525,7 @@ public class ProcessService {
/** /**
* generate a new work process instance from command. * generate a new work process instance from command.
*
* @param processDefinition processDefinition * @param processDefinition processDefinition
* @param command command * @param command command
* @param cmdParam cmdParam map * @param cmdParam cmdParam map
@ -564,6 +583,7 @@ public class ProcessService {
* there is tenant id in definition, use the tenant of the definition. * there is tenant id in definition, use the tenant of the definition.
* if there is not tenant id in the definiton or the tenant not exist * if there is not tenant id in the definiton or the tenant not exist
* use definition creator's tenant. * use definition creator's tenant.
*
* @param tenantId tenantId * @param tenantId tenantId
* @param userId userId * @param userId userId
* @return tenant * @return tenant
@ -587,6 +607,7 @@ public class ProcessService {
/** /**
* check command parameters is valid * check command parameters is valid
*
* @param command command * @param command command
* @param cmdParam cmdParam map * @param cmdParam cmdParam map
* @return whether command param is valid * @return whether command param is valid
@ -605,6 +626,7 @@ public class ProcessService {
/** /**
* construct process instance according to one command. * construct process instance according to one command.
*
* @param command command * @param command command
* @param host host * @param host host
* @return process instance * @return process instance
@ -654,7 +676,7 @@ public class ProcessService {
//reset command parameter //reset command parameter
if (processInstance.getCommandParam() != null) { if (processInstance.getCommandParam() != null) {
Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam()); Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
for (Map.Entry<String, String> entry: processCmdParam.entrySet()) { for (Map.Entry<String, String> entry : processCmdParam.entrySet()) {
if (!cmdParam.containsKey(entry.getKey())) { if (!cmdParam.containsKey(entry.getKey())) {
cmdParam.put(entry.getKey(), entry.getValue()); cmdParam.put(entry.getKey(), entry.getValue());
} }
@ -761,6 +783,7 @@ public class ProcessService {
/** /**
* return complement data if the process start with complement data * return complement data if the process start with complement data
*
* @param processInstance processInstance * @param processInstance processInstance
* @param command command * @param command command
* @return command type * @return command type
@ -775,6 +798,7 @@ public class ProcessService {
/** /**
* initialize complement data parameters * initialize complement data parameters
*
* @param processDefinition processDefinition * @param processDefinition processDefinition
* @param processInstance processInstance * @param processInstance processInstance
* @param cmdParam cmdParam * @param cmdParam cmdParam
@ -802,6 +826,7 @@ public class ProcessService {
* set sub work process parameters. * set sub work process parameters.
* handle sub work process instance, update relation table and command parameters * handle sub work process instance, update relation table and command parameters
* set sub work process flag, extends parent work process command parameters * set sub work process flag, extends parent work process command parameters
*
* @param subProcessInstance subProcessInstance * @param subProcessInstance subProcessInstance
* @return process instance * @return process instance
*/ */
@ -846,6 +871,7 @@ public class ProcessService {
/** /**
* join parent global params into sub process. * join parent global params into sub process.
* only the keys doesn't in sub process global would be joined. * only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams * @param parentGlobalParams parentGlobalParams
* @param subGlobalParams subGlobalParams * @param subGlobalParams subGlobalParams
* @return global params join * @return global params join
@ -855,7 +881,7 @@ public class ProcessService {
List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class); List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class);
List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class); List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class);
Map<String,String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); Map<String, String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
for (Property parent : parentPropertyList) { for (Property parent : parentPropertyList) {
if (!subMap.containsKey(parent.getProp())) { if (!subMap.containsKey(parent.getProp())) {
@ -867,6 +893,7 @@ public class ProcessService {
/** /**
* initialize task instance * initialize task instance
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
*/ */
private void initTaskInstance(TaskInstance taskInstance) { private void initTaskInstance(TaskInstance taskInstance) {
@ -885,11 +912,12 @@ public class ProcessService {
/** /**
* submit task to db * submit task to db
* submit sub process to command * submit sub process to command
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return task instance * @return task instance
*/ */
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(TaskInstance taskInstance){ public TaskInstance submitTask(TaskInstance taskInstance) {
ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
logger.info("start submit task : {}, instance id:{}, state: {}", logger.info("start submit task : {}, instance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
@ -914,6 +942,7 @@ public class ProcessService {
* consider o * consider o
* repeat running does not generate new sub process instance * repeat running does not generate new sub process instance
* set map {parent instance id, task instance id, 0(child instance id)} * set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance * @param parentInstance parentInstance
* @param parentTask parentTask * @param parentTask parentTask
* @return process instance map * @return process instance map
@ -942,6 +971,7 @@ public class ProcessService {
/** /**
* find previous task work process map. * find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance * @param parentProcessInstance parentProcessInstance
* @param parentTask parentTask * @param parentTask parentTask
* @return process instance map * @return process instance map
@ -967,6 +997,7 @@ public class ProcessService {
/** /**
* create sub work process command * create sub work process command
*
* @param parentProcessInstance parentProcessInstance * @param parentProcessInstance parentProcessInstance
* @param task task * @param task task
*/ */
@ -994,6 +1025,7 @@ public class ProcessService {
/** /**
* complement data needs transform parent parameter to child. * complement data needs transform parent parameter to child.
*
* @param instanceMap * @param instanceMap
* @param parentProcessInstance * @param parentProcessInstance
* @return * @return
@ -1015,6 +1047,7 @@ public class ProcessService {
/** /**
* create sub work process command * create sub work process command
*
* @param parentProcessInstance * @param parentProcessInstance
* @param childInstance * @param childInstance
* @param instanceMap * @param instanceMap
@ -1048,6 +1081,7 @@ public class ProcessService {
/** /**
* initialize sub work flow state * initialize sub work flow state
* child instance state would be initialized when 'recovery from pause/stop/failure' * child instance state would be initialized when 'recovery from pause/stop/failure'
*
* @param childInstance * @param childInstance
*/ */
private void initSubInstanceState(ProcessInstance childInstance) { private void initSubInstanceState(ProcessInstance childInstance) {
@ -1076,6 +1110,7 @@ public class ProcessService {
/** /**
* update sub process definition * update sub process definition
*
* @param parentProcessInstance parentProcessInstance * @param parentProcessInstance parentProcessInstance
* @param childDefinitionId childDefinitionId * @param childDefinitionId childDefinitionId
*/ */
@ -1091,6 +1126,7 @@ public class ProcessService {
/** /**
* submit task to mysql * submit task to mysql
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @param processInstance processInstance * @param processInstance processInstance
* @return task instance * @return task instance
@ -1140,6 +1176,7 @@ public class ProcessService {
/** /**
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task executed by ip1},${ip2}... * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task executed by ip1},${ip2}...
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low. * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return task zk queue str * @return task zk queue str
*/ */
@ -1203,6 +1240,7 @@ public class ProcessService {
/** /**
* check process instance strategy * check process instance strategy
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return check strategy result * @return check strategy result
*/ */
@ -1224,6 +1262,7 @@ public class ProcessService {
/** /**
* check the task instance existing in queue * check the task instance existing in queue
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return whether taskinstance exists queue * @return whether taskinstance exists queue
*/ */
@ -1239,6 +1278,7 @@ public class ProcessService {
/** /**
* create a new process instance * create a new process instance
*
* @param processInstance processInstance * @param processInstance processInstance
*/ */
public void createProcessInstance(ProcessInstance processInstance) { public void createProcessInstance(ProcessInstance processInstance) {
@ -1250,6 +1290,7 @@ public class ProcessService {
/** /**
* insert or update work process instance to data base * insert or update work process instance to data base
*
* @param processInstance processInstance * @param processInstance processInstance
*/ */
public void saveProcessInstance(ProcessInstance processInstance) { public void saveProcessInstance(ProcessInstance processInstance) {
@ -1267,6 +1308,7 @@ public class ProcessService {
/** /**
* insert or update command * insert or update command
*
* @param command command * @param command command
* @return save command result * @return save command result
*/ */
@ -1280,6 +1322,7 @@ public class ProcessService {
/** /**
* insert or update task instance * insert or update task instance
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return save task instance result * @return save task instance result
*/ */
@ -1293,6 +1336,7 @@ public class ProcessService {
/** /**
* insert task instance * insert task instance
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return create task instance result * @return create task instance result
*/ */
@ -1303,6 +1347,7 @@ public class ProcessService {
/** /**
* update task instance * update task instance
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return update task instance result * @return update task instance result
*/ */
@ -1313,6 +1358,7 @@ public class ProcessService {
/** /**
* delete a command by id * delete a command by id
*
* @param id id * @param id id
*/ */
public void delCommandById(int id) { public void delCommandById(int id) {
@ -1321,6 +1367,7 @@ public class ProcessService {
/** /**
* find task instance by id * find task instance by id
*
* @param taskId task id * @param taskId task id
* @return task intance * @return task intance
*/ */
@ -1330,6 +1377,7 @@ public class ProcessService {
/** /**
* package task instanceassociate processInstance and processDefine * package task instanceassociate processInstance and processDefine
*
* @param taskInstId taskInstId * @param taskInstId taskInstId
* @return task instance * @return task instance
*/ */
@ -1351,6 +1399,7 @@ public class ProcessService {
/** /**
* get id list by task state * get id list by task state
*
* @param instanceId instanceId * @param instanceId instanceId
* @param state state * @param state state
* @return task instance states * @return task instance states
@ -1361,6 +1410,7 @@ public class ProcessService {
/** /**
* find valid task list by process definition id * find valid task list by process definition id
*
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @return task instance list * @return task instance list
*/ */
@ -1370,6 +1420,7 @@ public class ProcessService {
/** /**
* find previous task list by work process id * find previous task list by work process id
*
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @return task instance list * @return task instance list
*/ */
@ -1379,6 +1430,7 @@ public class ProcessService {
/** /**
* update work process instance map * update work process instance map
*
* @param processInstanceMap processInstanceMap * @param processInstanceMap processInstanceMap
* @return update process instance result * @return update process instance result
*/ */
@ -1388,6 +1440,7 @@ public class ProcessService {
/** /**
* create work process instance map * create work process instance map
*
* @param processInstanceMap processInstanceMap * @param processInstanceMap processInstanceMap
* @return create process instance result * @return create process instance result
*/ */
@ -1401,6 +1454,7 @@ public class ProcessService {
/** /**
* find work process map by parent process id and parent task id. * find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId * @param parentWorkProcessId parentWorkProcessId
* @param parentTaskId parentTaskId * @param parentTaskId parentTaskId
* @return process instance map * @return process instance map
@ -1411,6 +1465,7 @@ public class ProcessService {
/** /**
* delete work process map by parent process id * delete work process map by parent process id
*
* @param parentWorkProcessId parentWorkProcessId * @param parentWorkProcessId parentWorkProcessId
* @return delete process map result * @return delete process map result
*/ */
@ -1421,6 +1476,7 @@ public class ProcessService {
/** /**
* find sub process instance * find sub process instance
*
* @param parentProcessId parentProcessId * @param parentProcessId parentProcessId
* @param parentTaskId parentTaskId * @param parentTaskId parentTaskId
* @return process instance * @return process instance
@ -1437,6 +1493,7 @@ public class ProcessService {
/** /**
* find parent process instance * find parent process instance
*
* @param subProcessId subProcessId * @param subProcessId subProcessId
* @return process instance * @return process instance
*/ */
@ -1452,6 +1509,7 @@ public class ProcessService {
/** /**
* change task state * change task state
*
* @param state state * @param state state
* @param startTime startTime * @param startTime startTime
* @param host host * @param host host
@ -1459,11 +1517,10 @@ public class ProcessService {
* @param logPath logPath * @param logPath logPath
* @param taskInstId taskInstId * @param taskInstId taskInstId
*/ */
public void changeTaskState(ExecutionStatus state, Date startTime, String host, public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
String executePath, String executePath,
String logPath, String logPath,
int taskInstId) { int taskInstId) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setState(state); taskInstance.setState(state);
taskInstance.setStartTime(startTime); taskInstance.setStartTime(startTime);
taskInstance.setHost(host); taskInstance.setHost(host);
@ -1474,6 +1531,7 @@ public class ProcessService {
/** /**
* update process instance * update process instance
*
* @param processInstance processInstance * @param processInstance processInstance
* @return update process instance result * @return update process instance result
*/ */
@ -1483,6 +1541,7 @@ public class ProcessService {
/** /**
* update the process instance * update the process instance
*
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @param processJson processJson * @param processJson processJson
* @param globalParams globalParams * @param globalParams globalParams
@ -1509,18 +1568,18 @@ public class ProcessService {
/** /**
* change task state * change task state
*
* @param state state * @param state state
* @param endTime endTime * @param endTime endTime
* @param taskInstId taskInstId * @param taskInstId taskInstId
* @param varPool varPool * @param varPool varPool
*/ */
public void changeTaskState(ExecutionStatus state, public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime, Date endTime,
int processId, int processId,
String appIds, String appIds,
int taskInstId, int taskInstId,
String varPool) { String varPool) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setPid(processId); taskInstance.setPid(processId);
taskInstance.setAppLink(appIds); taskInstance.setAppLink(appIds);
taskInstance.setState(state); taskInstance.setState(state);
@ -1531,6 +1590,7 @@ public class ProcessService {
/** /**
* convert integer list to string list * convert integer list to string list
*
* @param intList intList * @param intList intList
* @return string list * @return string list
*/ */
@ -1547,6 +1607,7 @@ public class ProcessService {
/** /**
* query schedule by id * query schedule by id
*
* @param id id * @param id id
* @return schedule * @return schedule
*/ */
@ -1556,6 +1617,7 @@ public class ProcessService {
/** /**
* query Schedule by processDefinitionId * query Schedule by processDefinitionId
*
* @param processDefinitionId processDefinitionId * @param processDefinitionId processDefinitionId
* @see Schedule * @see Schedule
*/ */
@ -1565,6 +1627,7 @@ public class ProcessService {
/** /**
* query need failover process instance * query need failover process instance
*
* @param host host * @param host host
* @return process instance list * @return process instance list
*/ */
@ -1574,6 +1637,7 @@ public class ProcessService {
/** /**
* process need failover process instance * process need failover process instance
*
* @param processInstance processInstance * @param processInstance processInstance
*/ */
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
@ -1593,6 +1657,7 @@ public class ProcessService {
/** /**
* query all need failover task instances by host * query all need failover task instances by host
*
* @param host host * @param host host
* @return task instance list * @return task instance list
*/ */
@ -1603,6 +1668,7 @@ public class ProcessService {
/** /**
* find data source by id * find data source by id
*
* @param id id * @param id id
* @return datasource * @return datasource
*/ */
@ -1612,6 +1678,7 @@ public class ProcessService {
/** /**
* update process instance state by id * update process instance state by id
*
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @param executionStatus executionStatus * @param executionStatus executionStatus
* @return update process result * @return update process result
@ -1625,6 +1692,7 @@ public class ProcessService {
/** /**
* find process instance by the task id * find process instance by the task id
*
* @param taskId taskId * @param taskId taskId
* @return process instance * @return process instance
*/ */
@ -1638,6 +1706,7 @@ public class ProcessService {
/** /**
* find udf function list by id list string * find udf function list by id list string
*
* @param ids ids * @param ids ids
* @return udf function list * @return udf function list
*/ */
@ -1647,18 +1716,20 @@ public class ProcessService {
/** /**
* find tenant code by resource name * find tenant code by resource name
*
* @param resName resource name * @param resName resource name
* @param resourceType resource type * @param resourceType resource type
* @return tenant code * @return tenant code
*/ */
public String queryTenantCodeByResName(String resName,ResourceType resourceType){ public String queryTenantCodeByResName(String resName, ResourceType resourceType) {
// in order to query tenant code successful although the version is older // in order to query tenant code successful although the version is older
String fullName = resName.startsWith("/") ? resName : String.format("/%s",resName); String fullName = resName.startsWith("/") ? resName : String.format("/%s", resName);
return resourceMapper.queryTenantCodeByResourceName(fullName, resourceType.ordinal()); return resourceMapper.queryTenantCodeByResourceName(fullName, resourceType.ordinal());
} }
/** /**
* find schedule list by process define id. * find schedule list by process define id.
*
* @param ids ids * @param ids ids
* @return schedule list * @return schedule list
*/ */
@ -1669,6 +1740,7 @@ public class ProcessService {
/** /**
* get dependency cycle by work process define id and scheduler fire time * get dependency cycle by work process define id and scheduler fire time
*
* @param masterId masterId * @param masterId masterId
* @param processDefinitionId processDefinitionId * @param processDefinitionId processDefinitionId
* @param scheduledFireTime the time the task schedule is expected to trigger * @param scheduledFireTime the time the task schedule is expected to trigger
@ -1676,22 +1748,23 @@ public class ProcessService {
* @throws Exception if error throws Exception * @throws Exception if error throws Exception
*/ */
public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception { public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
List<CycleDependency> list = getCycleDependencies(masterId,new int[]{processDefinitionId},scheduledFireTime); List<CycleDependency> list = getCycleDependencies(masterId, new int[] {processDefinitionId}, scheduledFireTime);
return list.size() > 0 ? list.get(0) : null; return list.size() > 0 ? list.get(0) : null;
} }
/** /**
* get dependency cycle list by work process define id list and scheduler fire time * get dependency cycle list by work process define id list and scheduler fire time
*
* @param masterId masterId * @param masterId masterId
* @param ids ids * @param ids ids
* @param scheduledFireTime the time the task schedule is expected to trigger * @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency list * @return CycleDependency list
* @throws Exception if error throws Exception * @throws Exception if error throws Exception
*/ */
public List<CycleDependency> getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception { public List<CycleDependency> getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception {
List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>(); List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>();
if(ArrayUtils.isEmpty(ids)){ if (ArrayUtils.isEmpty(ids)) {
logger.warn("ids[] is empty!is invalid!"); logger.warn("ids[] is empty!is invalid!");
return cycleDependencyList; return cycleDependencyList;
} }
@ -1706,13 +1779,13 @@ public class ProcessService {
List<Date> list; List<Date> list;
List<Schedule> schedules = this.selectAllByProcessDefineId(ids); List<Schedule> schedules = this.selectAllByProcessDefineId(ids);
// for all scheduling information // for all scheduling information
for (Schedule depSchedule:schedules) { for (Schedule depSchedule : schedules) {
strCrontab = depSchedule.getCrontab(); strCrontab = depSchedule.getCrontab();
depCronExpression = CronUtils.parse2CronExpression(strCrontab); depCronExpression = CronUtils.parse2CronExpression(strCrontab);
depCron = CronUtils.parse2Cron(strCrontab); depCron = CronUtils.parse2Cron(strCrontab);
CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron); CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron);
if (cycleEnum == null) { if (cycleEnum == null) {
logger.error("{} is not valid",strCrontab); logger.error("{} is not valid", strCrontab);
continue; continue;
} }
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
@ -1720,16 +1793,16 @@ public class ProcessService {
/*case MINUTE: /*case MINUTE:
calendar.add(Calendar.MINUTE,-61);*/ calendar.add(Calendar.MINUTE,-61);*/
case HOUR: case HOUR:
calendar.add(Calendar.HOUR,-25); calendar.add(Calendar.HOUR, -25);
break; break;
case DAY: case DAY:
calendar.add(Calendar.DATE,-32); calendar.add(Calendar.DATE, -32);
break; break;
case WEEK: case WEEK:
calendar.add(Calendar.DATE,-32); calendar.add(Calendar.DATE, -32);
break; break;
case MONTH: case MONTH:
calendar.add(Calendar.MONTH,-13); calendar.add(Calendar.MONTH, -13);
break; break;
default: default:
logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name()); logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name());
@ -1744,7 +1817,7 @@ public class ProcessService {
} }
if (list.size() >= 1) { if (list.size() >= 1) {
start = list.get(list.size() - 1); start = list.get(list.size() - 1);
CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(),start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum); CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(), start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
cycleDependencyList.add(dependency); cycleDependencyList.add(dependency);
} }
@ -1754,6 +1827,7 @@ public class ProcessService {
/** /**
* find last scheduler process instance in the date interval * find last scheduler process instance in the date interval
*
* @param definitionId definitionId * @param definitionId definitionId
* @param dateInterval dateInterval * @param dateInterval dateInterval
* @return process instance * @return process instance
@ -1766,6 +1840,7 @@ public class ProcessService {
/** /**
* find last manual process instance interval * find last manual process instance interval
*
* @param definitionId process definition id * @param definitionId process definition id
* @param dateInterval dateInterval * @param dateInterval dateInterval
* @return process instance * @return process instance
@ -1778,6 +1853,7 @@ public class ProcessService {
/** /**
* find last running process instance * find last running process instance
*
* @param definitionId process definition id * @param definitionId process definition id
* @param startTime start time * @param startTime start time
* @param endTime end time * @param endTime end time
@ -1792,6 +1868,7 @@ public class ProcessService {
/** /**
* query user queue by process instance id * query user queue by process instance id
*
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @return queue * @return queue
*/ */
@ -1811,6 +1888,7 @@ public class ProcessService {
/** /**
* get task worker group * get task worker group
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return workerGroupId * @return workerGroupId
*/ */
@ -1832,6 +1910,7 @@ public class ProcessService {
/** /**
* get have perm project list * get have perm project list
*
* @param userId userId * @param userId userId
* @return project list * @return project list
*/ */
@ -1851,6 +1930,7 @@ public class ProcessService {
/** /**
* get have perm project ids * get have perm project ids
*
* @param userId userId * @param userId userId
* @return project ids * @return project ids
*/ */
@ -1865,11 +1945,12 @@ public class ProcessService {
/** /**
* list unauthorized udf function * list unauthorized udf function
*
* @param userId user id * @param userId user id
* @param needChecks data source id array * @param needChecks data source id array
* @return unauthorized udf function list * @return unauthorized udf function list
*/ */
public <T> List<T> listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType) { public <T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType) {
List<T> resultList = new ArrayList<T>(); List<T> resultList = new ArrayList<T>();
if (Objects.nonNull(needChecks) && needChecks.length > 0) { if (Objects.nonNull(needChecks) && needChecks.length > 0) {
@ -1889,7 +1970,7 @@ public class ProcessService {
originResSet.removeAll(authorizedUdfFiles); originResSet.removeAll(authorizedUdfFiles);
break; break;
case DATASOURCE: case DATASOURCE:
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet()); Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedDatasources); originResSet.removeAll(authorizedDatasources);
break; break;
case UDF: case UDF:
@ -1908,6 +1989,7 @@ public class ProcessService {
/** /**
* get user by user id * get user by user id
*
* @param userId user id * @param userId user id
* @return User * @return User
*/ */
@ -1917,6 +1999,7 @@ public class ProcessService {
/** /**
* get resource by resoruce id * get resource by resoruce id
*
* @param resoruceId resource id * @param resoruceId resource id
* @return Resource * @return Resource
*/ */
@ -1926,6 +2009,7 @@ public class ProcessService {
/** /**
* list resources by ids * list resources by ids
*
* @param resIds resIds * @param resIds resIds
* @return resource list * @return resource list
*/ */
@ -1935,6 +2019,7 @@ public class ProcessService {
/** /**
* format task app id in task instance * format task app id in task instance
*
* @param taskInstance * @param taskInstance
* @return * @return
*/ */

Loading…
Cancel
Save