From f8d46a26c1a4c0a8eedf2a6260d4b968a3cccc42 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Sat, 3 Sep 2022 10:14:05 +0800 Subject: [PATCH] [Feature-11530] add state history for process instance (#11757) * [Feature] add state history for process instance (#97) * add state history for process instance * upsertProcessInstance * remove unuse method * fix UT Co-authored-by: caishunfeng <534328519@qq.com> --- .../api/service/impl/ExecutorServiceImpl.java | 8 +- .../impl/ProcessInstanceServiceImpl.java | 6 +- .../service/ProcessInstanceServiceTest.java | 6 +- .../dao/DaoConfiguration.java | 2 +- .../dao/entity/ProcessInstance.java | 49 +++++++++- .../dao/repository/ProcessInstanceDao.java | 35 +++++++ .../impl/ProcessInstanceDaoImpl.java | 55 +++++++++++ .../dao/mapper/ProcessInstanceMapper.xml | 4 +- .../resources/sql/dolphinscheduler_h2.sql | 1 + .../resources/sql/dolphinscheduler_mysql.sql | 1 + .../sql/dolphinscheduler_postgresql.sql | 1 + .../mysql/dolphinscheduler_ddl.sql | 2 + .../postgresql/dolphinscheduler_ddl.sql | 4 +- .../runner/MasterSchedulerBootstrap.java | 49 +++++----- .../runner/WorkflowExecuteRunnable.java | 17 ++-- .../master/runner/task/BaseTaskProcessor.java | 4 + .../runner/task/BlockingTaskProcessor.java | 2 +- .../master/runner/task/SubTaskProcessor.java | 8 +- .../runner/WorkflowExecuteRunnableTest.java | 20 ++-- .../service/process/ProcessService.java | 6 -- .../service/process/ProcessServiceImpl.java | 94 ++++++------------- .../service/process/ProcessServiceTest.java | 3 + 22 files changed, 253 insertions(+), 124 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 3dbfe97619..32ef8cc91b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -56,6 +56,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.remote.command.TaskExecuteStartCommand; import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; @@ -117,6 +118,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private ProcessService processService; + @Autowired + private ProcessInstanceDao processInstanceDao; + @Autowired private StateEventCallbackService stateEventCallbackService; @@ -528,8 +532,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ processInstance.setCommandType(commandType); processInstance.addHistoryCmd(commandType); - processInstance.setState(executionStatus); - int update = processService.updateProcessInstance(processInstance); + processInstance.setStateWithDesc(executionStatus, commandType.getDescp() + "by ui"); + int update = processInstanceDao.updateProcessInstance(processInstance); // determine whether the process is normal if (update > 0) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 67ff0edb9b..d1342b6b3c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; @@ -84,6 +85,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired ProcessInstanceMapper processInstanceMapper; + @Autowired + ProcessInstanceDao processInstanceDao; + @Autowired ProcessDefinitionMapper processDefineMapper; @@ -531,7 +535,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } processInstance.setProcessDefinitionVersion(insertVersion); - int update = processService.updateProcessInstance(processInstance); + int update = processInstanceDao.updateProcessInstance(processInstance); if (update == 0) { putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_INSTANCE_ERROR); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 73289dbabf..926a47b890 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -97,6 +98,9 @@ public class ProcessInstanceServiceTest { @Mock ProcessService processService; + @Mock + ProcessInstanceDao processInstanceDao; + @Mock ProcessInstanceMapper processInstanceMapper; @@ -475,7 +479,7 @@ public class ProcessInstanceServiceTest { when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant); when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); - when(processService.updateProcessInstance(processInstance)).thenReturn(1); + when(processInstanceDao.updateProcessInstance(processInstance)).thenReturn(1); when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1); List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java index f0c78fcf44..68fd2cae66 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java @@ -26,6 +26,6 @@ import org.springframework.context.annotation.Configuration; @Configuration @EnableAutoConfiguration -@MapperScan("org.apache.dolphinscheduler.dao") +@MapperScan("org.apache.dolphinscheduler.dao.mapper") public class DaoConfiguration { } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index c9608416d1..ed2a383ba2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -17,9 +17,6 @@ package org.apache.dolphinscheduler.dao.entity; -import lombok.Data; -import lombok.NoArgsConstructor; - import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; @@ -28,8 +25,14 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; import java.util.Date; +import java.util.List; +import java.util.Objects; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; @@ -37,6 +40,10 @@ import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.google.common.base.Strings; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + /** * process instance */ @@ -65,6 +72,18 @@ public class ProcessInstance { * process state */ private WorkflowExecutionStatus state; + + /** + * state history + */ + private String stateHistory; + + /** + * state desc list from state history + */ + @TableField(exist = false) + private List stateDescList; + /** * recovery flag for failover */ @@ -310,4 +329,28 @@ public class ProcessInstance { return commandType; } + /** + * set state with desc + * @param state + * @param stateDesc + */ + public void setStateWithDesc(WorkflowExecutionStatus state, String stateDesc) { + this.setState(state); + if (StringUtils.isEmpty(this.getStateHistory())) { + stateDescList = new ArrayList<>(); + } else if (stateDescList == null) { + stateDescList = JSONUtils.toList(this.getStateHistory(), StateDesc.class); + } + stateDescList.add(new StateDesc(new Date(), state, stateDesc)); + this.setStateHistory(JSONUtils.toJsonString(stateDescList)); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class StateDesc { + Date time; + WorkflowExecutionStatus state; + String desc; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java new file mode 100644 index 0000000000..7a5e44a761 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + + +public interface ProcessInstanceDao { + + public int insertProcessInstance(ProcessInstance processInstance); + + public int updateProcessInstance(ProcessInstance processInstance); + + /** + * insert or update work process instance to database + * + * @param processInstance processInstance + */ + public int upsertProcessInstance(ProcessInstance processInstance); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java new file mode 100644 index 0000000000..4360581855 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Repository +public class ProcessInstanceDaoImpl implements ProcessInstanceDao { + + @Autowired + private ProcessInstanceMapper processInstanceMapper; + + @Override + public int insertProcessInstance(ProcessInstance processInstance) { + return processInstanceMapper.insert(processInstance); + } + + @Override + public int updateProcessInstance(ProcessInstance processInstance) { + return processInstanceMapper.updateById(processInstance); + } + + @Override + public int upsertProcessInstance(@NonNull ProcessInstance processInstance) { + if (processInstance.getId() != 0) { + return updateProcessInstance(processInstance); + } else { + return insertProcessInstance(processInstance); + } + } +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 49553621d1..0074eff878 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -24,7 +24,7 @@ warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, history_cmd, process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, - dry_run, next_process_instance_id, restart_time + dry_run, next_process_instance_id, restart_time, state_history