diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java deleted file mode 100644 index 38867fe174..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.controller; - -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_RECORD_LIST_PAGING_ERROR; - -import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; -import org.apache.dolphinscheduler.api.exceptions.ApiException; -import org.apache.dolphinscheduler.api.service.TaskRecordService; -import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.dao.entity.User; - -import java.util.Map; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestAttribute; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseStatus; -import org.springframework.web.bind.annotation.RestController; - -import springfox.documentation.annotations.ApiIgnore; - -/** - * task record controller - */ -@ApiIgnore -@RestController -@RequestMapping("/projects/task-record") -public class TaskRecordController extends BaseController { - - @Autowired - TaskRecordService taskRecordService; - - /** - * query task record list page - * - * @param loginUser login user - * @param taskName task name - * @param state state - * @param sourceTable source table - * @param destTable destination table - * @param taskDate task date - * @param startTime start time - * @param endTime end time - * @param pageNo page number - * @param pageSize page size - * @return task record list - */ - @GetMapping("/list-paging") - @ResponseStatus(HttpStatus.OK) - @ApiException(QUERY_TASK_RECORD_LIST_PAGING_ERROR) - @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result queryTaskRecordListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "taskName", required = false) String taskName, - @RequestParam(value = "state", required = false) String state, - @RequestParam(value = "sourceTable", required = false) String sourceTable, - @RequestParam(value = "destTable", required = false) String destTable, - @RequestParam(value = "taskDate", required = false) String taskDate, - @RequestParam(value = "startDate", required = false) String startTime, - @RequestParam(value = "endDate", required = false) String endTime, - @RequestParam("pageNo") Integer pageNo, - @RequestParam("pageSize") Integer pageSize - ) { - - Map result = taskRecordService.queryTaskRecordListPaging(false, taskName, startTime, taskDate, sourceTable, destTable, endTime, state, pageNo, pageSize); - return returnDataListPaging(result); - } - - /** - * query history task record list paging - * - * @param loginUser login user - * @param taskName task name - * @param state state - * @param sourceTable source table - * @param destTable destination table - * @param taskDate task date - * @param startTime start time - * @param endTime end time - * @param pageNo page number - * @param pageSize page size - * @return history task record list - */ - @GetMapping("/history-list-paging") - @ResponseStatus(HttpStatus.OK) - @ApiException(QUERY_TASK_RECORD_LIST_PAGING_ERROR) - @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result queryHistoryTaskRecordListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "taskName", required = false) String taskName, - @RequestParam(value = "state", required = false) String state, - @RequestParam(value = "sourceTable", required = false) String sourceTable, - @RequestParam(value = "destTable", required = false) String destTable, - @RequestParam(value = "taskDate", required = false) String taskDate, - @RequestParam(value = "startDate", required = false) String startTime, - @RequestParam(value = "endDate", required = false) String endTime, - @RequestParam("pageNo") Integer pageNo, - @RequestParam("pageSize") Integer pageSize - ) { - - Map result = taskRecordService.queryTaskRecordListPaging(true, taskName, startTime, taskDate, sourceTable, destTable, endTime, state, pageNo, pageSize); - return returnDataListPaging(result); - } - -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java deleted file mode 100644 index 8c8ad0abff..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.service; - -import java.util.Map; - -/** - * task record service - */ -public interface TaskRecordService { - - /** - * query task record list paging - * - * @param taskName task name - * @param state state - * @param sourceTable source table - * @param destTable destination table - * @param taskDate task date - * @param startDate start time - * @param endDate end time - * @param pageNo page numbere - * @param pageSize page size - * @param isHistory is history - * @return task record list - */ - Map queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, - String taskDate, String sourceTable, - String destTable, String endDate, - String state, Integer pageNo, Integer pageSize); -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java deleted file mode 100644 index c755da057b..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.service.impl; - -import static org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HISTORY_HIVE_LOG; -import static org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HIVE_LOG; - -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.service.TaskRecordService; -import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.dao.TaskRecordDao; -import org.apache.dolphinscheduler.dao.entity.TaskRecord; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.springframework.stereotype.Service; - -/** - * task record service impl - */ -@Service -public class TaskRecordServiceImpl extends BaseServiceImpl implements TaskRecordService { - - /** - * query task record list paging - * - * @param taskName task name - * @param state state - * @param sourceTable source table - * @param destTable destination table - * @param taskDate task date - * @param startDate start time - * @param endDate end time - * @param pageNo page numbere - * @param pageSize page size - * @param isHistory is history - * @return task record list - */ - @Override - public Map queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, - String taskDate, String sourceTable, - String destTable, String endDate, - String state, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(); - PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); - - Map map = new HashMap<>(); - map.put("taskName", taskName); - map.put("taskDate", taskDate); - map.put("state", state); - map.put("sourceTable", sourceTable); - map.put("targetTable", destTable); - map.put("startTime", startDate); - map.put("endTime", endDate); - map.put("offset", pageInfo.getStart().toString()); - map.put("pageSize", pageInfo.getPageSize().toString()); - - String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG; - int count = TaskRecordDao.countTaskRecord(map, table); - List recordList = TaskRecordDao.queryAllTaskRecord(map, table); - pageInfo.setTotalCount(count); - pageInfo.setLists(recordList); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - - return result; - } -} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskRecordControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskRecordControllerTest.java deleted file mode 100644 index a78ac060ad..0000000000 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskRecordControllerTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.controller; - -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.utils.JSONUtils; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.MediaType; -import org.springframework.test.web.servlet.MvcResult; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -/** - * task record controller test - */ -public class TaskRecordControllerTest extends AbstractControllerTest { - - private static final Logger logger = LoggerFactory.getLogger(TaskRecordControllerTest.class); - - @Test - public void testQueryTaskRecordListPaging() throws Exception { - MultiValueMap paramsMap = new LinkedMultiValueMap<>(); - paramsMap.add("taskName","taskName"); - paramsMap.add("state","state"); - paramsMap.add("sourceTable",""); - paramsMap.add("destTable",""); - paramsMap.add("taskDate",""); - paramsMap.add("startDate","2019-12-16 00:00:00"); - paramsMap.add("endDate","2019-12-17 00:00:00"); - paramsMap.add("pageNo","1"); - paramsMap.add("pageSize","30"); - - MvcResult mvcResult = mockMvc.perform(get("/projects/task-record/list-paging") - .header(SESSION_ID, sessionId) - .params(paramsMap)) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) - .andReturn(); - - Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); - Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); - logger.info(mvcResult.getResponse().getContentAsString()); - } - - - @Test - public void testQueryHistoryTaskRecordListPaging() throws Exception { - MultiValueMap paramsMap = new LinkedMultiValueMap<>(); - paramsMap.add("taskName","taskName"); - paramsMap.add("state","state"); - paramsMap.add("sourceTable",""); - paramsMap.add("destTable",""); - paramsMap.add("taskDate",""); - paramsMap.add("startDate","2019-12-16 00:00:00"); - paramsMap.add("endDate","2019-12-17 00:00:00"); - paramsMap.add("pageNo","1"); - paramsMap.add("pageSize","30"); - - MvcResult mvcResult = mockMvc.perform(get("/projects/task-record/history-list-paging") - .header(SESSION_ID, sessionId) - .params(paramsMap)) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) - .andReturn(); - - Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); - Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); - logger.info(mvcResult.getResponse().getContentAsString()); - - } -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 1544f2b638..4ef30a25e8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -432,14 +432,6 @@ public final class Constants { */ public static final String DATASOURCE_PROPERTIES = "/datasource.properties"; - public static final String TASK_RECORD_URL = "task.record.datasource.url"; - - public static final String TASK_RECORD_FLAG = "task.record.flag"; - - public static final String TASK_RECORD_USER = "task.record.datasource.username"; - - public static final String TASK_RECORD_PWD = "task.record.datasource.password"; - public static final String DEFAULT = "Default"; public static final String USER = "user"; public static final String PASSWORD = "password"; @@ -448,11 +440,6 @@ public final class Constants { public static final String THREAD_NAME_MASTER_SERVER = "Master-Server"; public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server"; - public static final String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; - - public static final String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; - - /** * command parameter keys */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java deleted file mode 100644 index af8d9af386..0000000000 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.dao; - -import static org.apache.dolphinscheduler.common.Constants.DATASOURCE_PROPERTIES; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.TaskRecordStatus; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.TaskRecord; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * task record dao - */ -public class TaskRecordDao { - - - private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName()); - - static { - PropertyUtils.loadPropertyFile(DATASOURCE_PROPERTIES); - } - - /** - * get task record flag - * - * @return whether startup taskrecord - */ - public static boolean getTaskRecordFlag() { - return PropertyUtils.getBoolean(Constants.TASK_RECORD_FLAG, false); - } - - /** - * create connection - * - * @return connection - */ - private static Connection getConn() { - if (!getTaskRecordFlag()) { - return null; - } - String driver = "com.mysql.jdbc.Driver"; - String url = PropertyUtils.getString(Constants.TASK_RECORD_URL); - String username = PropertyUtils.getString(Constants.TASK_RECORD_USER); - String password = PropertyUtils.getString(Constants.TASK_RECORD_PWD); - Connection conn = null; - try { - //classLoader,load driver - Class.forName(driver); - conn = DriverManager.getConnection(url, username, password); - } catch (ClassNotFoundException e) { - logger.error("Class not found Exception ", e); - } catch (SQLException e) { - logger.error("SQL Exception ", e); - } - return conn; - } - - /** - * generate where sql string - * - * @param filterMap filterMap - * @return sql string - */ - private static String getWhereString(Map filterMap) { - if (filterMap.size() == 0) { - return ""; - } - - String result = " where 1=1 "; - - Object taskName = filterMap.get("taskName"); - if (taskName != null && StringUtils.isNotEmpty(taskName.toString())) { - result += " and PROC_NAME like concat('%', '" + taskName.toString() + "', '%') "; - } - - Object taskDate = filterMap.get("taskDate"); - if (taskDate != null && StringUtils.isNotEmpty(taskDate.toString())) { - result += " and PROC_DATE='" + taskDate.toString() + "'"; - } - - Object state = filterMap.get("state"); - if (state != null && StringUtils.isNotEmpty(state.toString())) { - result += " and NOTE='" + state.toString() + "'"; - } - - Object sourceTable = filterMap.get("sourceTable"); - if (sourceTable != null && StringUtils.isNotEmpty(sourceTable.toString())) { - result += " and SOURCE_TAB like concat('%', '" + sourceTable.toString() + "', '%')"; - } - - Object targetTable = filterMap.get("targetTable"); - if (sourceTable != null && StringUtils.isNotEmpty(targetTable.toString())) { - result += " and TARGET_TAB like concat('%', '" + targetTable.toString() + "', '%') "; - } - - Object start = filterMap.get("startTime"); - if (start != null && StringUtils.isNotEmpty(start.toString())) { - result += " and STARTDATE>='" + start.toString() + "'"; - } - - Object end = filterMap.get("endTime"); - if (end != null && StringUtils.isNotEmpty(end.toString())) { - result += " and ENDDATE>='" + end.toString() + "'"; - } - return result; - } - - /** - * count task record - * - * @param filterMap filterMap - * @param table table - * @return task record count - */ - public static int countTaskRecord(Map filterMap, String table) { - - int count = 0; - Connection conn = null; - PreparedStatement pstmt = null; - ResultSet rs = null; - try { - conn = getConn(); - if (conn == null) { - return count; - } - String sql = String.format("select count(1) as count from %s", table); - sql += getWhereString(filterMap); - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); - while (rs.next()) { - count = rs.getInt("count"); - break; - } - } catch (SQLException e) { - logger.error("Exception ", e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); - } - return count; - } - - /** - * query task record by filter map paging - * - * @param filterMap filterMap - * @param table table - * @return task record list - */ - public static List queryAllTaskRecord(Map filterMap, String table) { - - String sql = String.format("select * from %s", table); - sql += getWhereString(filterMap); - - int offset = Integer.parseInt(filterMap.get("offset")); - int pageSize = Integer.parseInt(filterMap.get("pageSize")); - sql += String.format(" order by STARTDATE desc limit %d,%d", offset, pageSize); - - List recordList = new ArrayList<>(); - try { - recordList = getQueryResult(sql); - } catch (Exception e) { - logger.error("Exception ", e); - } - return recordList; - } - - /** - * convert result set to task record - * - * @param resultSet resultSet - * @return task record - * @throws SQLException if error throws SQLException - */ - private static TaskRecord convertToTaskRecord(ResultSet resultSet) throws SQLException { - - TaskRecord taskRecord = new TaskRecord(); - taskRecord.setId(resultSet.getInt("ID")); - taskRecord.setProcId(resultSet.getInt("PROC_ID")); - taskRecord.setProcName(resultSet.getString("PROC_NAME")); - taskRecord.setProcDate(resultSet.getString("PROC_DATE")); - taskRecord.setStartTime(DateUtils.stringToDate(resultSet.getString("STARTDATE"))); - taskRecord.setEndTime(DateUtils.stringToDate(resultSet.getString("ENDDATE"))); - taskRecord.setResult(resultSet.getString("RESULT")); - taskRecord.setDuration(resultSet.getInt("DURATION")); - taskRecord.setNote(resultSet.getString("NOTE")); - taskRecord.setSchema(resultSet.getString("SCHEMA")); - taskRecord.setJobId(resultSet.getString("JOB_ID")); - taskRecord.setSourceTab(resultSet.getString("SOURCE_TAB")); - taskRecord.setSourceRowCount(resultSet.getLong("SOURCE_ROW_COUNT")); - taskRecord.setTargetTab(resultSet.getString("TARGET_TAB")); - taskRecord.setTargetRowCount(resultSet.getLong("TARGET_ROW_COUNT")); - taskRecord.setErrorCode(resultSet.getString("ERROR_CODE")); - return taskRecord; - } - - /** - * query task list by select sql - * - * @param selectSql select sql - * @return task record list - */ - private static List getQueryResult(String selectSql) { - List recordList = new ArrayList<>(); - Connection conn = null; - PreparedStatement pstmt = null; - ResultSet rs = null; - try { - conn = getConn(); - if (conn == null) { - return recordList; - } - pstmt = conn.prepareStatement(selectSql); - rs = pstmt.executeQuery(); - - while (rs.next()) { - TaskRecord taskRecord = convertToTaskRecord(rs); - recordList.add(taskRecord); - } - } catch (SQLException e) { - logger.error("Exception ", e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); - } - return recordList; - } - - /** - * according to procname and procdate query task record - * - * @param procName procName - * @param procDate procDate - * @return task record status - */ - public static TaskRecordStatus getTaskRecordState(String procName, String procDate) { - String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'" - , procName, procDate + "%"); - List taskRecordList = getQueryResult(sql); - - // contains no record and sql exception - if (CollectionUtils.isEmpty(taskRecordList)) { - // exception - return TaskRecordStatus.EXCEPTION; - } else if (taskRecordList.size() > 1) { - return TaskRecordStatus.EXCEPTION; - } else { - TaskRecord taskRecord = taskRecordList.get(0); - if (taskRecord == null) { - return TaskRecordStatus.EXCEPTION; - } - Long targetRowCount = taskRecord.getTargetRowCount(); - if (targetRowCount <= 0) { - return TaskRecordStatus.FAILURE; - } else { - return TaskRecordStatus.SUCCESS; - } - - } - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index a5221dd224..7454f49ae5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -20,20 +20,12 @@ package org.apache.dolphinscheduler.server.worker.task; import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.TaskRecordStatus; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; -import org.apache.dolphinscheduler.dao.TaskRecordDao; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.utils.ParamUtils; import java.util.List; -import java.util.Map; import org.slf4j.Logger; @@ -110,6 +102,13 @@ public abstract class AbstractTask { */ public abstract void handle() throws Exception; + /** + * result processing + * + * @throws Exception exception + */ + public void after() throws Exception { + } /** * cancel application @@ -187,42 +186,6 @@ public abstract class AbstractTask { */ public abstract AbstractParameters getParameters(); - - /** - * result processing - */ - public void after() { - if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) { - // task recor flat : if true , start up qianfan - if (TaskRecordDao.getTaskRecordFlag() && typeIsNormalTask(taskExecutionContext.getTaskType())) { - AbstractParameters params = TaskParametersUtils.getParameters(taskExecutionContext.getTaskType(), taskExecutionContext.getTaskParams()); - - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - params.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); - if (paramsMap != null && !paramsMap.isEmpty() - && paramsMap.containsKey("v_proc_date")) { - String vProcDate = paramsMap.get("v_proc_date").getValue(); - if (!StringUtils.isEmpty(vProcDate)) { - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskExecutionContext.getTaskName(), vProcDate); - logger.info("task record status : {}", taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE) { - setExitStatusCode(Constants.EXIT_CODE_FAILURE); - } - } - } - } - - } else if (getExitStatusCode() == Constants.EXIT_CODE_KILL) { - setExitStatusCode(Constants.EXIT_CODE_KILL); - } else { - setExitStatusCode(Constants.EXIT_CODE_FAILURE); - } - } - private boolean typeIsNormalTask(String taskType) { return !(TaskType.SUB_PROCESS.getDesc().equalsIgnoreCase(taskType) || TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)); }