Browse Source

[Feature-11530] add state history for process instance (#11757)

* [Feature] add state history for process instance (#97)

* add state history for process instance

* upsertProcessInstance

* remove unuse method

* fix UT

Co-authored-by: caishunfeng <534328519@qq.com>
3.1.0-release
caishunfeng 2 years ago committed by GitHub
parent
commit
f8d46a26c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  3. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  4. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
  5. 49
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  6. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  7. 55
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  8. 4
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  9. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  10. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  11. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  12. 2
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
  13. 4
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql
  14. 17
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
  15. 17
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  16. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  17. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
  18. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  19. 10
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
  20. 6
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  21. 94
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  22. 3
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -56,6 +56,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.remote.command.TaskExecuteStartCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
@ -117,6 +118,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private ProcessService processService;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private StateEventCallbackService stateEventCallbackService;
@ -528,8 +532,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType);
processInstance.setState(executionStatus);
int update = processService.updateProcessInstance(processInstance);
processInstance.setStateWithDesc(executionStatus, commandType.getDescp() + "by ui");
int update = processInstanceDao.updateProcessInstance(processInstance);
// determine whether the process is normal
if (update > 0) {

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@ -84,6 +85,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessInstanceDao processInstanceDao;
@Autowired
ProcessDefinitionMapper processDefineMapper;
@ -531,7 +535,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
processInstance.setProcessDefinitionVersion(insertVersion);
int update = processService.updateProcessInstance(processInstance);
int update = processInstanceDao.updateProcessInstance(processInstance);
if (update == 0) {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_INSTANCE_ERROR);

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -97,6 +98,9 @@ public class ProcessInstanceServiceTest {
@Mock
ProcessService processService;
@Mock
ProcessInstanceDao processInstanceDao;
@Mock
ProcessInstanceMapper processInstanceMapper;
@ -475,7 +479,7 @@ public class ProcessInstanceServiceTest {
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1);
when(processInstanceDao.updateProcessInstance(processInstance)).thenReturn(1);
when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java

@ -26,6 +26,6 @@ import org.springframework.context.annotation.Configuration;
@Configuration
@EnableAutoConfiguration
@MapperScan("org.apache.dolphinscheduler.dao")
@MapperScan("org.apache.dolphinscheduler.dao.mapper")
public class DaoConfiguration {
}

49
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -17,9 +17,6 @@
package org.apache.dolphinscheduler.dao.entity;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@ -28,8 +25,14 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@ -37,6 +40,10 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* process instance
*/
@ -65,6 +72,18 @@ public class ProcessInstance {
* process state
*/
private WorkflowExecutionStatus state;
/**
* state history
*/
private String stateHistory;
/**
* state desc list from state history
*/
@TableField(exist = false)
private List<StateDesc> stateDescList;
/**
* recovery flag for failover
*/
@ -310,4 +329,28 @@ public class ProcessInstance {
return commandType;
}
/**
* set state with desc
* @param state
* @param stateDesc
*/
public void setStateWithDesc(WorkflowExecutionStatus state, String stateDesc) {
this.setState(state);
if (StringUtils.isEmpty(this.getStateHistory())) {
stateDescList = new ArrayList<>();
} else if (stateDescList == null) {
stateDescList = JSONUtils.toList(this.getStateHistory(), StateDesc.class);
}
stateDescList.add(new StateDesc(new Date(), state, stateDesc));
this.setStateHistory(JSONUtils.toJsonString(stateDescList));
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class StateDesc {
Date time;
WorkflowExecutionStatus state;
String desc;
}
}

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
public interface ProcessInstanceDao {
public int insertProcessInstance(ProcessInstance processInstance);
public int updateProcessInstance(ProcessInstance processInstance);
/**
* insert or update work process instance to database
*
* @param processInstance processInstance
*/
public int upsertProcessInstance(ProcessInstance processInstance);
}

55
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Repository
public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Override
public int insertProcessInstance(ProcessInstance processInstance) {
return processInstanceMapper.insert(processInstance);
}
@Override
public int updateProcessInstance(ProcessInstance processInstance) {
return processInstanceMapper.updateById(processInstance);
}
@Override
public int upsertProcessInstance(@NonNull ProcessInstance processInstance) {
if (processInstance.getId() != 0) {
return updateProcessInstance(processInstance);
} else {
return insertProcessInstance(processInstance);
}
}
}

4
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -24,7 +24,7 @@
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool,
dry_run, next_process_instance_id, restart_time
dry_run, next_process_instance_id, restart_time, state_history
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
@ -111,7 +111,7 @@
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id,
restart_time
restart_time, instance.state_history
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -590,6 +590,7 @@ CREATE TABLE t_ds_process_instance
process_definition_version int(11) DEFAULT NULL,
process_definition_code bigint(20) not NULL,
state tinyint(4) DEFAULT NULL,
state_history text,
recovery tinyint(4) DEFAULT NULL,
start_time datetime DEFAULT NULL,
end_time datetime DEFAULT NULL,

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -592,6 +592,7 @@ CREATE TABLE `t_ds_process_instance` (
`process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`process_definition_version` int(11) DEFAULT '0' COMMENT 'process definition version',
`state` tinyint(4) DEFAULT NULL COMMENT 'process instance Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
`state_history` text DEFAULT NULL COMMENT 'state history desc',
`recovery` tinyint(4) DEFAULT NULL COMMENT 'process instance failover flag:0:normal,1:failover instance',
`start_time` datetime DEFAULT NULL COMMENT 'process instance start time',
`end_time` datetime DEFAULT NULL COMMENT 'process instance end time',

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -518,6 +518,7 @@ CREATE TABLE t_ds_process_instance (
process_definition_code bigint DEFAULT NULL ,
process_definition_version int DEFAULT NULL ,
state int DEFAULT NULL ,
state_history text,
recovery int DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
end_time timestamp DEFAULT NULL ,

2
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql

@ -18,3 +18,5 @@
ALTER TABLE `t_ds_worker_group` ADD COLUMN `other_params_json` text DEFAULT NULL COMMENT 'other params json';
ALTER TABLE `t_ds_process_instance` ADD COLUMN `state_history` text DEFAULT NULL COMMENT 'state history desc' AFTER `state`;

4
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql

@ -35,7 +35,9 @@ BEGIN
--- add column
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_worker_group ADD COLUMN IF NOT EXISTS other_params_json int DEFAULT NULL ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_worker_group ADD COLUMN IF NOT EXISTS other_params_json text DEFAULT NULL ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_isntance ADD COLUMN IF NOT EXISTS state_history text DEFAULT NULL ';

17
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
@ -28,6 +27,7 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@ -41,10 +41,8 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
@ -52,6 +50,11 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@ -63,6 +66,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private ProcessService processService;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private MasterConfig masterConfig;
@ -164,6 +170,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
}
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
processService,
processInstanceDao,
nettyExecutorManager,
processAlertManager,
masterConfig,

17
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
@ -51,6 +50,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@ -122,6 +122,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final ProcessService processService;
private ProcessInstanceDao processInstanceDao;
private final ProcessAlertManager processAlertManager;
private final NettyExecutorManager nettyExecutorManager;
@ -218,6 +220,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* @param processInstance processInstance
* @param processService processService
* @param processInstanceDao processInstanceDao
* @param nettyExecutorManager nettyExecutorManager
* @param processAlertManager processAlertManager
* @param masterConfig masterConfig
@ -226,12 +229,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
public WorkflowExecuteRunnable(
@NonNull ProcessInstance processInstance,
@NonNull ProcessService processService,
@NonNull ProcessInstanceDao processInstanceDao,
@NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessAlertManager processAlertManager,
@NonNull MasterConfig masterConfig,
@NonNull StateWheelExecuteThread stateWheelExecuteThread,
@NonNull CuringParamsService curingParamsService) {
this.processService = processService;
this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
@ -373,7 +378,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// todo: merge the last taskInstance
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
processInstanceDao.upsertProcessInstance(processInstance);
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
}
@ -891,7 +896,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
processInstance.getScheduleTime(),
cmdParam.get(Constants.SCHEDULE_TIMEZONE));
processInstance.setGlobalParams(globalParams);
processService.updateProcessInstance(processInstance);
processInstanceDao.updateProcessInstance(processInstance);
}
}
}
@ -1667,15 +1672,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
originStates,
newStates);
processInstance.setState(newStates);
processInstance.setStateWithDesc(newStates, "update by workflow executor");
if (newStates.isFinished()) {
processInstance.setEndTime(new Date());
}
try {
processService.updateProcessInstance(processInstance);
processInstanceDao.updateProcessInstance(processInstance);
} catch (Exception ex) {
// recover the status
processInstance.setState(originStates);
processInstance.setStateWithDesc(originStates, "recover state by DB error");
processInstance.setEndTime(null);
throw new StateEventHandleException("Update process instance status to DB error", ex);
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
@ -118,6 +119,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessService processService;
protected ProcessInstanceDao processInstanceDao;
protected MasterConfig masterConfig;
protected TaskPluginManager taskPluginManager;
@ -129,6 +132,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
@Override
public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) {
processService = SpringApplicationContext.getBean(ProcessService.class);
processInstanceDao = SpringApplicationContext.getBean(ProcessInstanceDao.class);
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java

@ -200,7 +200,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
logger.info("blocking opportunity: expected-->{}, actual-->{}", expected, this.conditionResult);
processInstance.setBlocked(isBlocked);
if (isBlocked) {
processInstance.setState(WorkflowExecutionStatus.READY_BLOCK);
processInstance.setStateWithDesc(WorkflowExecutionStatus.READY_BLOCK, "ready block");
}
taskInstance.setState(TaskExecutionStatus.SUCCESS);
taskInstance.setEndTime(new Date());

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -179,8 +179,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (subProcessInstance == null || taskInstance.getState().isFinished()) {
return false;
}
subProcessInstance.setState(WorkflowExecutionStatus.READY_PAUSE);
processService.updateProcessInstance(subProcessInstance);
subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE, "ready pause sub workflow");
processInstanceDao.updateProcessInstance(subProcessInstance);
sendToSubProcess();
return true;
}
@ -215,8 +215,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (subProcessInstance == null || taskInstance.getState().isFinished()) {
return false;
}
subProcessInstance.setState(WorkflowExecutionStatus.READY_STOP);
processService.updateProcessInstance(subProcessInstance);
subProcessInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by kill task");
processInstanceDao.updateProcessInstance(subProcessInstance);
sendToSubProcess();
return true;
}

10
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
@ -79,7 +80,7 @@ public class WorkflowExecuteRunnableTest {
private ProcessService processService;
private final int processDefinitionId = 1;
private ProcessInstanceDao processInstanceDao;
private MasterConfig config;
@ -101,6 +102,8 @@ public class WorkflowExecuteRunnableTest {
processService = mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstanceDao = mock(ProcessInstanceDao.class);
processInstance = mock(ProcessInstance.class);
Mockito.when(processInstance.getState()).thenReturn(WorkflowExecutionStatus.SUCCESS);
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString());
@ -119,9 +122,8 @@ public class WorkflowExecuteRunnableTest {
curingGlobalParamsService = mock(CuringParamsService.class);
NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
workflowExecuteThread =
PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, nettyExecutorManager,
processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
workflowExecuteThread = PowerMockito.spy(
new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
// prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);

6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -127,8 +127,6 @@ public interface ProcessService {
TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
void saveProcessInstance(ProcessInstance processInstance);
int saveCommand(Command command);
boolean saveTaskInstance(TaskInstance taskInstance);
@ -163,8 +161,6 @@ public interface ProcessService {
ProcessInstance findParentProcessInstance(Integer subProcessId);
int updateProcessInstance(ProcessInstance processInstance);
void changeOutParam(TaskInstance taskInstance);
Schedule querySchedule(int id);
@ -186,8 +182,6 @@ public interface ProcessService {
DataSource findDataSourceById(int id);
int updateProcessInstanceState(Integer processInstanceId, WorkflowExecutionStatus executionStatus);
ProcessInstance findProcessInstanceByTaskId(int taskId);
List<UdfFunc> queryUdfFunListByIds(Integer[] ids);

94
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -105,6 +105,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@ -180,9 +181,13 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private ProcessDefinitionLogMapper processDefineLogMapper;
// todo replace with processInstanceDao
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private DataSourceMapper dataSourceMapper;
@ -301,7 +306,7 @@ public class ProcessServiceImpl implements ProcessService {
return null;
}
} else {
saveProcessInstance(processInstance);
processInstanceDao.upsertProcessInstance(processInstance);
}
setSubProcessParam(processInstance);
deleteCommandWithCheck(command.getId());
@ -309,10 +314,10 @@ public class ProcessServiceImpl implements ProcessService {
}
protected void saveSerialProcess(ProcessInstance processInstance, ProcessDefinition processDefinition) {
processInstance.setState(WorkflowExecutionStatus.SERIAL_WAIT);
saveProcessInstance(processInstance);
// serial wait
// when we get the running instance(or waiting instance) only get the priority instance(by id)
processInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, "wait by serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
//serial wait
//when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
@ -320,8 +325,8 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE,
processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(WorkflowExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, "submit from serial_wait strategy");
processInstanceDao.upsertProcessInstance(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances =
@ -330,12 +335,12 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE,
processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
processInstance.setState(WorkflowExecutionStatus.STOP);
saveProcessInstance(processInstance);
processInstance.setStateWithDesc(WorkflowExecutionStatus.STOP, "stop by serial_discard strategy");
processInstanceDao.upsertProcessInstance(processInstance);
return;
}
processInstance.setState(WorkflowExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, "submit from serial_discard strategy");
processInstanceDao.upsertProcessInstance(processInstance);
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
@ -345,8 +350,8 @@ public class ProcessServiceImpl implements ProcessService {
for (ProcessInstance info : runningProcessInstances) {
info.setCommandType(CommandType.STOP);
info.addHistoryCmd(CommandType.STOP);
info.setState(WorkflowExecutionStatus.READY_STOP);
int update = updateProcessInstance(info);
info.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, "ready stop by serial_priority strategy");
int update = processInstanceDao.updateProcessInstance(info);
// determine whether the process is normal
if (update > 0) {
WorkflowStateEventChangeCommand workflowStateEventChangeCommand =
@ -360,8 +365,8 @@ public class ProcessServiceImpl implements ProcessService {
}
}
}
processInstance.setState(WorkflowExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, "submit by serial_priority strategy");
processInstanceDao.upsertProcessInstance(processInstance);
}
}
@ -742,7 +747,7 @@ public class ProcessServiceImpl implements ProcessService {
ProcessInstance processInstance = new ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "init running");
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
// the new process instance restart time is null.
@ -1046,7 +1051,7 @@ public class ProcessServiceImpl implements ProcessService {
default:
break;
}
processInstance.setState(runStatus);
processInstance.setStateWithDesc(runStatus, commandType.getDescp());
return processInstance;
}
@ -1158,7 +1163,7 @@ public class ProcessServiceImpl implements ProcessService {
paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
subProcessInstance.setIsSubProcess(Flag.YES);
this.saveProcessInstance(subProcessInstance);
processInstanceDao.upsertProcessInstance(subProcessInstance);
}
// copy parent instance user def params to sub process..
String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
@ -1169,7 +1174,7 @@ public class ProcessServiceImpl implements ProcessService {
joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
subProcessInstance
.setVarPool(joinVarPool(parentInstance.getVarPool(), subProcessInstance.getVarPool()));
this.saveProcessInstance(subProcessInstance);
processInstanceDao.upsertProcessInstance(subProcessInstance);
} else {
logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
}
@ -1510,8 +1515,8 @@ public class ProcessServiceImpl implements ProcessService {
*/
private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) {
childInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
updateProcessInstance(childInstance);
childInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "init sub workflow instance");
processInstanceDao.updateProcessInstance(childInstance);
}
}
@ -1640,24 +1645,6 @@ public class ProcessServiceImpl implements ProcessService {
return true;
}
/**
* insert or update work process instance to data base
*
* @param processInstance processInstance
*/
@Override
public void saveProcessInstance(ProcessInstance processInstance) {
if (processInstance == null) {
logger.error("save error, process instance is null!");
return;
}
if (processInstance.getId() != 0) {
processInstanceMapper.updateById(processInstance);
} else {
processInstanceMapper.insert(processInstance);
}
}
/**
* insert or update command
*
@ -1940,17 +1927,6 @@ public class ProcessServiceImpl implements ProcessService {
return processInstance;
}
/**
* update process instance
*
* @param processInstance processInstance
* @return update process instance result
*/
@Override
public int updateProcessInstance(ProcessInstance processInstance) {
return processInstanceMapper.updateById(processInstance);
}
/**
* for show in page of taskInstance
*/
@ -2121,20 +2097,6 @@ public class ProcessServiceImpl implements ProcessService {
return dataSourceMapper.selectById(id);
}
/**
* update process instance state by id
*
* @param processInstanceId processInstanceId
* @param executionStatus executionStatus
* @return update process result
*/
@Override
public int updateProcessInstanceState(Integer processInstanceId, WorkflowExecutionStatus executionStatus) {
ProcessInstance instance = processInstanceMapper.selectById(processInstanceId);
instance.setState(executionStatus);
return processInstanceMapper.updateById(instance);
}
/**
* find process instance by the task id
*
@ -3190,8 +3152,8 @@ public class ProcessServiceImpl implements ProcessService {
.filter(instance -> instance.getState().isFailure() || instance.getState().isKill())
.map(TaskInstance::getId).collect(Collectors.toList());
if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) {
processInstance.setState(WorkflowExecutionStatus.SUCCESS);
updateProcessInstance(processInstance);
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUCCESS, "success by task force success");
processInstanceDao.updateProcessInstance(processInstance);
}
}
}

3
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -71,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
@ -130,6 +131,8 @@ public class ProcessServiceTest {
@Mock
private ProcessInstanceMapper processInstanceMapper;
@Mock
private ProcessInstanceDao processInstanceDao;
@Mock
private UserMapper userMapper;
@Mock
private TaskInstanceMapper taskInstanceMapper;

Loading…
Cancel
Save