From 9c5270e4cae9ab0c85dd88c05b0998fbfb9580dc Mon Sep 17 00:00:00 2001 From: baoliang Date: Wed, 17 Apr 2019 16:22:31 +0800 Subject: [PATCH] merge from upstream add error command --- .../java/cn/escheduler/dao/ProcessDao.java | 11 + .../dao/mapper/ErrorCommandMapper.java | 45 +++ .../mapper/ErrorCommandMapperProvider.java | 41 +++ .../cn/escheduler/dao/model/ErrorCommand.java | 275 ++++++++++++++++++ 4 files changed, 372 insertions(+) create mode 100644 escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapper.java create mode 100644 escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java create mode 100644 escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index e6156a0f4c..e216f292d7 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -88,6 +88,9 @@ public class ProcessDao extends AbstractBaseDao { @Autowired private ResourceMapper resourceMapper; + @Autowired + private ErrorCommandMapper errorCommandMapper; + /** * task queue impl */ @@ -139,6 +142,7 @@ public class ProcessDao extends AbstractBaseDao { if(processInstance == null){ logger.error("scan command, command parameter is error: %s", command.toString()); delCommandByid(command.getId()); + saveErrorCommand(command, "process instance is null"); return null; }else if(!checkThreadNum(command, validThreadNum)){ logger.info("there is not enough thread for this command: {}",command.toString() ); @@ -153,11 +157,18 @@ public class ProcessDao extends AbstractBaseDao { } }catch (Exception e){ logger.error("scan command error ", e); + saveErrorCommand(command, e.getMessage()); delCommandByid(command.getId()); } return null; } + private void saveErrorCommand(Command command, String message) { + + ErrorCommand errorCommand = new ErrorCommand(command, message); + this.errorCommandMapper.insert(errorCommand); + } + /** * set process waiting thread * @param command diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapper.java new file mode 100644 index 0000000000..5c702acf79 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.mapper; + +import cn.escheduler.common.enums.*; +import cn.escheduler.dao.model.Command; +import cn.escheduler.dao.model.ErrorCommand; +import org.apache.ibatis.annotations.*; +import org.apache.ibatis.type.EnumOrdinalTypeHandler; +import org.apache.ibatis.type.JdbcType; + +import java.sql.Timestamp; +import java.util.List; + +/** + * command mapper + */ +public interface ErrorCommandMapper { + + /** + * inert error command + * @param errorCommand + * @return + */ + @InsertProvider(type = ErrorCommandMapperProvider.class, method = "insert") + @Options(useGeneratedKeys = true,keyProperty = "errorCommand.id") + @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "errorCommand.id", before = false, resultType = int.class) + int insert(@Param("errorCommand") ErrorCommand errorCommand); + + +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java new file mode 100644 index 0000000000..d5c3f992c7 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ErrorCommandMapperProvider.java @@ -0,0 +1,41 @@ +package cn.escheduler.dao.mapper; + +import cn.escheduler.common.enums.*; +import cn.escheduler.common.utils.EnumFieldUtil; +import org.apache.ibatis.jdbc.SQL; + +import java.util.Map; + +public class ErrorCommandMapperProvider { + + + private static final String TABLE_NAME = "t_escheduler_error_command"; + + + /** + * inert command + * + * @param parameter + * @return + */ + public String insert(Map parameter) { + return new SQL() { + { + INSERT_INTO(TABLE_NAME); + VALUES("`command_type`", EnumFieldUtil.genFieldStr("errorCommand.commandType", CommandType.class)); + VALUES("`process_definition_id`", "#{errorCommand.processDefinitionId}"); + VALUES("`executor_id`", "#{errorCommand.executorId}"); + VALUES("`command_param`", "#{errorCommand.commandParam}"); + VALUES("`task_depend_type`", EnumFieldUtil.genFieldStr("errorCommand.taskDependType", TaskDependType.class)); + VALUES("`failure_strategy`", EnumFieldUtil.genFieldStr("errorCommand.failureStrategy", FailureStrategy.class)); + VALUES("`warning_type`", EnumFieldUtil.genFieldStr("errorCommand.warningType", WarningType.class)); + VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("errorCommand.processInstancePriority", Priority.class)); + VALUES("`warning_group_id`", "#{errorCommand.warningGroupId}"); + VALUES("`schedule_time`", "#{errorCommand.scheduleTime}"); + VALUES("`update_time`", "#{errorCommand.updateTime}"); + VALUES("`start_time`", "#{errorCommand.startTime}"); + VALUES("`message`", "#{errorCommand.message}"); + } + }.toString(); + } +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java new file mode 100644 index 0000000000..2128455703 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ErrorCommand.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.model; + +import cn.escheduler.common.enums.*; + +import java.util.Date; + +/** + * command + */ +public class ErrorCommand { + + /** + * id + */ + private int id; + + /** + * command type + */ + private CommandType commandType; + + /** + * process definition id + */ + private int processDefinitionId; + + /** + * executor id + */ + private int executorId; + + /** + * command parameter, format json + */ + private String commandParam; + + /** + * task depend type + */ + private TaskDependType taskDependType; + + /** + * failure strategy + */ + private FailureStrategy failureStrategy; + + /** + * warning type + */ + private WarningType warningType; + + /** + * warning group id + */ + private Integer warningGroupId; + + /** + * schedule time + */ + private Date scheduleTime; + + /** + * start time + */ + private Date startTime; + + /** + * process instance priority + */ + private Priority processInstancePriority; + + /** + * update time + */ + private Date updateTime; + + /** + * 执行信息 + */ + private String message; + + + public ErrorCommand(Command command, String message){ + this.commandType = command.getCommandType(); + this.executorId = command.getExecutorId(); + this.processDefinitionId = command.getProcessDefinitionId(); + this.commandParam = command.getCommandParam(); + this.warningType = command.getWarningType(); + this.warningGroupId = command.getWarningGroupId(); + this.scheduleTime = command.getScheduleTime(); + this.taskDependType = command.getTaskDependType(); + this.failureStrategy = command.getFailureStrategy(); + this.startTime = command.getStartTime(); + this.updateTime = command.getUpdateTime(); + this.processInstancePriority = command.getProcessInstancePriority(); + this.message = message; + } + + public ErrorCommand( + CommandType commandType, + TaskDependType taskDependType, + FailureStrategy failureStrategy, + int executorId, + int processDefinitionId, + String commandParam, + WarningType warningType, + int warningGroupId, + Date scheduleTime, + Priority processInstancePriority, + String message){ + this.commandType = commandType; + this.executorId = executorId; + this.processDefinitionId = processDefinitionId; + this.commandParam = commandParam; + this.warningType = warningType; + this.warningGroupId = warningGroupId; + this.scheduleTime = scheduleTime; + this.taskDependType = taskDependType; + this.failureStrategy = failureStrategy; + this.startTime = new Date(); + this.updateTime = new Date(); + this.processInstancePriority = processInstancePriority; + this.message = message; + } + + + public TaskDependType getTaskDependType() { + return taskDependType; + } + + public void setTaskDependType(TaskDependType taskDependType) { + this.taskDependType = taskDependType; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public CommandType getCommandType() { + return commandType; + } + + public void setCommandType(CommandType commandType) { + this.commandType = commandType; + } + + public int getProcessDefinitionId() { + return processDefinitionId; + } + + public void setProcessDefinitionId(int processDefinitionId) { + this.processDefinitionId = processDefinitionId; + } + + + public FailureStrategy getFailureStrategy() { + return failureStrategy; + } + + public void setFailureStrategy(FailureStrategy failureStrategy) { + this.failureStrategy = failureStrategy; + } + + public void setCommandParam(String commandParam) { + this.commandParam = commandParam; + } + + public String getCommandParam() { + return commandParam; + } + + public WarningType getWarningType() { + return warningType; + } + + public void setWarningType(WarningType warningType) { + this.warningType = warningType; + } + + public Integer getWarningGroupId() { + return warningGroupId; + } + + public void setWarningGroupId(Integer warningGroupId) { + this.warningGroupId = warningGroupId; + } + + public Date getScheduleTime() { + return scheduleTime; + } + + public void setScheduleTime(Date scheduleTime) { + this.scheduleTime = scheduleTime; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public int getExecutorId() { + return executorId; + } + + public void setExecutorId(int executorId) { + this.executorId = executorId; + } + + public Priority getProcessInstancePriority() { + return processInstancePriority; + } + + public void setProcessInstancePriority(Priority processInstancePriority) { + this.processInstancePriority = processInstancePriority; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + @Override + public String toString() { + return "Command{" + + "id=" + id + + ", commandType=" + commandType + + ", processDefinitionId=" + processDefinitionId + + ", executorId=" + executorId + + ", commandParam='" + commandParam + '\'' + + ", taskDependType=" + taskDependType + + ", failureStrategy=" + failureStrategy + + ", warningType=" + warningType + + ", warningGroupId=" + warningGroupId + + ", scheduleTime=" + scheduleTime + + ", startTime=" + startTime + + ", processInstancePriority=" + processInstancePriority + + ", updateTime=" + updateTime + + ", message=" + message + + '}'; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +}