Browse Source

[Improvement][Task] Remove deprecated TaskRecordDao and simply the after() method in the AbstractTask class (#5492)

* remove the depreated data quality module

* fix lint

Co-authored-by: Yao <ywang46@paypal.com>
pull/3/MERGE
Yao WANG 3 years ago committed by GitHub
parent
commit
bc22ae7c91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 123
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java
  2. 46
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java
  3. 86
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java
  4. 95
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskRecordControllerTest.java
  5. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  6. 291
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
  7. 51
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

123
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java

@ -1,123 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_RECORD_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskRecordService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
/**
* task record controller
*/
@ApiIgnore
@RestController
@RequestMapping("/projects/task-record")
public class TaskRecordController extends BaseController {
@Autowired
TaskRecordService taskRecordService;
/**
* query task record list page
*
* @param loginUser login user
* @param taskName task name
* @param state state
* @param sourceTable source table
* @param destTable destination table
* @param taskDate task date
* @param startTime start time
* @param endTime end time
* @param pageNo page number
* @param pageSize page size
* @return task record list
*/
@GetMapping("/list-paging")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_RECORD_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskRecordListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskName", required = false) String taskName,
@RequestParam(value = "state", required = false) String state,
@RequestParam(value = "sourceTable", required = false) String sourceTable,
@RequestParam(value = "destTable", required = false) String destTable,
@RequestParam(value = "taskDate", required = false) String taskDate,
@RequestParam(value = "startDate", required = false) String startTime,
@RequestParam(value = "endDate", required = false) String endTime,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize
) {
Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(false, taskName, startTime, taskDate, sourceTable, destTable, endTime, state, pageNo, pageSize);
return returnDataListPaging(result);
}
/**
* query history task record list paging
*
* @param loginUser login user
* @param taskName task name
* @param state state
* @param sourceTable source table
* @param destTable destination table
* @param taskDate task date
* @param startTime start time
* @param endTime end time
* @param pageNo page number
* @param pageSize page size
* @return history task record list
*/
@GetMapping("/history-list-paging")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_RECORD_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryHistoryTaskRecordListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskName", required = false) String taskName,
@RequestParam(value = "state", required = false) String state,
@RequestParam(value = "sourceTable", required = false) String sourceTable,
@RequestParam(value = "destTable", required = false) String destTable,
@RequestParam(value = "taskDate", required = false) String taskDate,
@RequestParam(value = "startDate", required = false) String startTime,
@RequestParam(value = "endDate", required = false) String endTime,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize
) {
Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(true, taskName, startTime, taskDate, sourceTable, destTable, endTime, state, pageNo, pageSize);
return returnDataListPaging(result);
}
}

46
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import java.util.Map;
/**
* task record service
*/
public interface TaskRecordService {
/**
* query task record list paging
*
* @param taskName task name
* @param state state
* @param sourceTable source table
* @param destTable destination table
* @param taskDate task date
* @param startDate start time
* @param endDate end time
* @param pageNo page numbere
* @param pageSize page size
* @param isHistory is history
* @return task record list
*/
Map<String,Object> queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate,
String taskDate, String sourceTable,
String destTable, String endDate,
String state, Integer pageNo, Integer pageSize);
}

86
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java

@ -1,86 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HISTORY_HIVE_LOG;
import static org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HIVE_LOG;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskRecordService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.dao.entity.TaskRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Service;
/**
* task record service impl
*/
@Service
public class TaskRecordServiceImpl extends BaseServiceImpl implements TaskRecordService {
/**
* query task record list paging
*
* @param taskName task name
* @param state state
* @param sourceTable source table
* @param destTable destination table
* @param taskDate task date
* @param startDate start time
* @param endDate end time
* @param pageNo page numbere
* @param pageSize page size
* @param isHistory is history
* @return task record list
*/
@Override
public Map<String,Object> queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate,
String taskDate, String sourceTable,
String destTable, String endDate,
String state, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
PageInfo<TaskRecord> pageInfo = new PageInfo<>(pageNo, pageSize);
Map<String, String> map = new HashMap<>();
map.put("taskName", taskName);
map.put("taskDate", taskDate);
map.put("state", state);
map.put("sourceTable", sourceTable);
map.put("targetTable", destTable);
map.put("startTime", startDate);
map.put("endTime", endDate);
map.put("offset", pageInfo.getStart().toString());
map.put("pageSize", pageInfo.getPageSize().toString());
String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG;
int count = TaskRecordDao.countTaskRecord(map, table);
List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map, table);
pageInfo.setTotalCount(count);
pageInfo.setLists(recordList);
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
}

95
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskRecordControllerTest.java

@ -1,95 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* task record controller test
*/
public class TaskRecordControllerTest extends AbstractControllerTest {
private static final Logger logger = LoggerFactory.getLogger(TaskRecordControllerTest.class);
@Test
public void testQueryTaskRecordListPaging() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("taskName","taskName");
paramsMap.add("state","state");
paramsMap.add("sourceTable","");
paramsMap.add("destTable","");
paramsMap.add("taskDate","");
paramsMap.add("startDate","2019-12-16 00:00:00");
paramsMap.add("endDate","2019-12-17 00:00:00");
paramsMap.add("pageNo","1");
paramsMap.add("pageSize","30");
MvcResult mvcResult = mockMvc.perform(get("/projects/task-record/list-paging")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testQueryHistoryTaskRecordListPaging() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("taskName","taskName");
paramsMap.add("state","state");
paramsMap.add("sourceTable","");
paramsMap.add("destTable","");
paramsMap.add("taskDate","");
paramsMap.add("startDate","2019-12-16 00:00:00");
paramsMap.add("endDate","2019-12-17 00:00:00");
paramsMap.add("pageNo","1");
paramsMap.add("pageSize","30");
MvcResult mvcResult = mockMvc.perform(get("/projects/task-record/history-list-paging")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
}
}

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -432,14 +432,6 @@ public final class Constants {
*/
public static final String DATASOURCE_PROPERTIES = "/datasource.properties";
public static final String TASK_RECORD_URL = "task.record.datasource.url";
public static final String TASK_RECORD_FLAG = "task.record.flag";
public static final String TASK_RECORD_USER = "task.record.datasource.username";
public static final String TASK_RECORD_PWD = "task.record.datasource.password";
public static final String DEFAULT = "Default";
public static final String USER = "user";
public static final String PASSWORD = "password";
@ -448,11 +440,6 @@ public final class Constants {
public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
public static final String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd";
public static final String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd";
/**
* command parameter keys
*/

291
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java

@ -1,291 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao;
import static org.apache.dolphinscheduler.common.Constants.DATASOURCE_PROPERTIES;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskRecord;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* task record dao
*/
public class TaskRecordDao {
private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName());
static {
PropertyUtils.loadPropertyFile(DATASOURCE_PROPERTIES);
}
/**
* get task record flag
*
* @return whether startup taskrecord
*/
public static boolean getTaskRecordFlag() {
return PropertyUtils.getBoolean(Constants.TASK_RECORD_FLAG, false);
}
/**
* create connection
*
* @return connection
*/
private static Connection getConn() {
if (!getTaskRecordFlag()) {
return null;
}
String driver = "com.mysql.jdbc.Driver";
String url = PropertyUtils.getString(Constants.TASK_RECORD_URL);
String username = PropertyUtils.getString(Constants.TASK_RECORD_USER);
String password = PropertyUtils.getString(Constants.TASK_RECORD_PWD);
Connection conn = null;
try {
//classLoader,load driver
Class.forName(driver);
conn = DriverManager.getConnection(url, username, password);
} catch (ClassNotFoundException e) {
logger.error("Class not found Exception ", e);
} catch (SQLException e) {
logger.error("SQL Exception ", e);
}
return conn;
}
/**
* generate where sql string
*
* @param filterMap filterMap
* @return sql string
*/
private static String getWhereString(Map<String, String> filterMap) {
if (filterMap.size() == 0) {
return "";
}
String result = " where 1=1 ";
Object taskName = filterMap.get("taskName");
if (taskName != null && StringUtils.isNotEmpty(taskName.toString())) {
result += " and PROC_NAME like concat('%', '" + taskName.toString() + "', '%') ";
}
Object taskDate = filterMap.get("taskDate");
if (taskDate != null && StringUtils.isNotEmpty(taskDate.toString())) {
result += " and PROC_DATE='" + taskDate.toString() + "'";
}
Object state = filterMap.get("state");
if (state != null && StringUtils.isNotEmpty(state.toString())) {
result += " and NOTE='" + state.toString() + "'";
}
Object sourceTable = filterMap.get("sourceTable");
if (sourceTable != null && StringUtils.isNotEmpty(sourceTable.toString())) {
result += " and SOURCE_TAB like concat('%', '" + sourceTable.toString() + "', '%')";
}
Object targetTable = filterMap.get("targetTable");
if (sourceTable != null && StringUtils.isNotEmpty(targetTable.toString())) {
result += " and TARGET_TAB like concat('%', '" + targetTable.toString() + "', '%') ";
}
Object start = filterMap.get("startTime");
if (start != null && StringUtils.isNotEmpty(start.toString())) {
result += " and STARTDATE>='" + start.toString() + "'";
}
Object end = filterMap.get("endTime");
if (end != null && StringUtils.isNotEmpty(end.toString())) {
result += " and ENDDATE>='" + end.toString() + "'";
}
return result;
}
/**
* count task record
*
* @param filterMap filterMap
* @param table table
* @return task record count
*/
public static int countTaskRecord(Map<String, String> filterMap, String table) {
int count = 0;
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = getConn();
if (conn == null) {
return count;
}
String sql = String.format("select count(1) as count from %s", table);
sql += getWhereString(filterMap);
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
count = rs.getInt("count");
break;
}
} catch (SQLException e) {
logger.error("Exception ", e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return count;
}
/**
* query task record by filter map paging
*
* @param filterMap filterMap
* @param table table
* @return task record list
*/
public static List<TaskRecord> queryAllTaskRecord(Map<String, String> filterMap, String table) {
String sql = String.format("select * from %s", table);
sql += getWhereString(filterMap);
int offset = Integer.parseInt(filterMap.get("offset"));
int pageSize = Integer.parseInt(filterMap.get("pageSize"));
sql += String.format(" order by STARTDATE desc limit %d,%d", offset, pageSize);
List<TaskRecord> recordList = new ArrayList<>();
try {
recordList = getQueryResult(sql);
} catch (Exception e) {
logger.error("Exception ", e);
}
return recordList;
}
/**
* convert result set to task record
*
* @param resultSet resultSet
* @return task record
* @throws SQLException if error throws SQLException
*/
private static TaskRecord convertToTaskRecord(ResultSet resultSet) throws SQLException {
TaskRecord taskRecord = new TaskRecord();
taskRecord.setId(resultSet.getInt("ID"));
taskRecord.setProcId(resultSet.getInt("PROC_ID"));
taskRecord.setProcName(resultSet.getString("PROC_NAME"));
taskRecord.setProcDate(resultSet.getString("PROC_DATE"));
taskRecord.setStartTime(DateUtils.stringToDate(resultSet.getString("STARTDATE")));
taskRecord.setEndTime(DateUtils.stringToDate(resultSet.getString("ENDDATE")));
taskRecord.setResult(resultSet.getString("RESULT"));
taskRecord.setDuration(resultSet.getInt("DURATION"));
taskRecord.setNote(resultSet.getString("NOTE"));
taskRecord.setSchema(resultSet.getString("SCHEMA"));
taskRecord.setJobId(resultSet.getString("JOB_ID"));
taskRecord.setSourceTab(resultSet.getString("SOURCE_TAB"));
taskRecord.setSourceRowCount(resultSet.getLong("SOURCE_ROW_COUNT"));
taskRecord.setTargetTab(resultSet.getString("TARGET_TAB"));
taskRecord.setTargetRowCount(resultSet.getLong("TARGET_ROW_COUNT"));
taskRecord.setErrorCode(resultSet.getString("ERROR_CODE"));
return taskRecord;
}
/**
* query task list by select sql
*
* @param selectSql select sql
* @return task record list
*/
private static List<TaskRecord> getQueryResult(String selectSql) {
List<TaskRecord> recordList = new ArrayList<>();
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = getConn();
if (conn == null) {
return recordList;
}
pstmt = conn.prepareStatement(selectSql);
rs = pstmt.executeQuery();
while (rs.next()) {
TaskRecord taskRecord = convertToTaskRecord(rs);
recordList.add(taskRecord);
}
} catch (SQLException e) {
logger.error("Exception ", e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return recordList;
}
/**
* according to procname and procdate query task record
*
* @param procName procName
* @param procDate procDate
* @return task record status
*/
public static TaskRecordStatus getTaskRecordState(String procName, String procDate) {
String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'"
, procName, procDate + "%");
List<TaskRecord> taskRecordList = getQueryResult(sql);
// contains no record and sql exception
if (CollectionUtils.isEmpty(taskRecordList)) {
// exception
return TaskRecordStatus.EXCEPTION;
} else if (taskRecordList.size() > 1) {
return TaskRecordStatus.EXCEPTION;
} else {
TaskRecord taskRecord = taskRecordList.get(0);
if (taskRecord == null) {
return TaskRecordStatus.EXCEPTION;
}
Long targetRowCount = taskRecord.getTargetRowCount();
if (targetRowCount <= 0) {
return TaskRecordStatus.FAILURE;
} else {
return TaskRecordStatus.SUCCESS;
}
}
}
}

51
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -20,20 +20,12 @@ package org.apache.dolphinscheduler.server.worker.task;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
@ -110,6 +102,13 @@ public abstract class AbstractTask {
*/
public abstract void handle() throws Exception;
/**
* result processing
*
* @throws Exception exception
*/
public void after() throws Exception {
}
/**
* cancel application
@ -187,42 +186,6 @@ public abstract class AbstractTask {
*/
public abstract AbstractParameters getParameters();
/**
* result processing
*/
public void after() {
if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) {
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag() && typeIsNormalTask(taskExecutionContext.getTaskType())) {
AbstractParameters params = TaskParametersUtils.getParameters(taskExecutionContext.getTaskType(), taskExecutionContext.getTaskParams());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
params.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")) {
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)) {
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskExecutionContext.getTaskName(), vProcDate);
logger.info("task record status : {}", taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE) {
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}
}
}
}
} else if (getExitStatusCode() == Constants.EXIT_CODE_KILL) {
setExitStatusCode(Constants.EXIT_CODE_KILL);
} else {
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}
}
private boolean typeIsNormalTask(String taskType) {
return !(TaskType.SUB_PROCESS.getDesc().equalsIgnoreCase(taskType) || TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType));
}

Loading…
Cancel
Save