Browse Source

[Feature][JsonSplit] Fix dependTask bug (#5360)

* update SnowFlake

* update processDefinite from processInstance

* update processDefinite from processInstance

* Fix task logger path

* Fix dependTask bug

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
862565a7c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  2. 4
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  3. 59
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  4. 49
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  5. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
  6. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -241,7 +241,6 @@ public class TaskInstance implements Serializable {
/** /**
* task params * task params
*/ */
@TableField(exist = false)
private String taskParams; private String taskParams;
public void init(String host, Date startTime, String executePath) { public void init(String host, Date startTime, String executePath) {

4
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -22,13 +22,13 @@
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time, id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link, start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group, executor_id, flag, retry_interval, max_retry_times, task_instance_priority, worker_group, executor_id,
first_submit_time, delay_time, var_pool first_submit_time, delay_time, task_params, var_pool
</sql> </sql>
<sql id="baseSqlV2"> <sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time, ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link, ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id, ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.var_pool ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool
</sql> </sql>
<update id="setFailoverByHostAndStateArray"> <update id="setFailoverByHostAndStateArray">
update t_ds_task_instance update t_ds_task_instance

59
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -26,12 +26,11 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
@ -62,7 +61,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
/** /**
* dependent date * dependent date
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date dependentDate; private Date dependentDate;
/** /**
@ -78,7 +77,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
@Override @Override
public Boolean submitWaitComplete() { public Boolean submitWaitComplete() {
try{ try {
logger.info("dependent task start"); logger.info("dependent task start");
this.taskInstance = submit(); this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@ -92,8 +91,8 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
initDependParameters(); initDependParameters();
waitTaskQuit(); waitTaskQuit();
updateTaskState(); updateTaskState();
}catch (Exception e){ } catch (Exception e) {
logger.error("dependent task run exception" , e); logger.error("dependent task run exception", e);
} }
return true; return true;
} }
@ -102,16 +101,13 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
* init dependent parameters * init dependent parameters
*/ */
private void initDependParameters() { private void initDependParameters() {
this.dependentParameters = taskInstance.getDependency(); this.dependentParameters = taskInstance.getDependency();
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
this.dependentTaskList.add(new DependentExecute(
taskModel.getDependItemList(), taskModel.getRelation()));
} }
if(this.processInstance.getScheduleTime() != null){ if (this.processInstance.getScheduleTime() != null) {
this.dependentDate = this.processInstance.getScheduleTime(); this.dependentDate = this.processInstance.getScheduleTime();
}else{ } else {
this.dependentDate = new Date(); this.dependentDate = new Date();
} }
} }
@ -121,9 +117,9 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
*/ */
private void updateTaskState() { private void updateTaskState() {
ExecutionStatus status; ExecutionStatus status;
if(this.cancel){ if (this.cancel) {
status = ExecutionStatus.KILL; status = ExecutionStatus.KILL;
}else{ } else {
DependResult result = getTaskDependResult(); DependResult result = getTaskDependResult();
status = (result == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; status = (result == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
} }
@ -144,8 +140,8 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
return true; return true;
} }
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try{ try {
if(this.processInstance == null){ if (this.processInstance == null) {
logger.error("process instance not exists , master task exec thread exit"); logger.error("process instance not exists , master task exec thread exit");
return true; return true;
} }
@ -153,12 +149,12 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
this.checkTimeoutFlag = !alertTimeout(); this.checkTimeoutFlag = !alertTimeout();
handleTimeoutFailed(); handleTimeoutFailed();
} }
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ if (this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP) {
cancelTaskInstance(); cancelTaskInstance();
break; break;
} }
if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){ if (allDependentTaskFinish() || taskInstance.getState().typeIsFinished()) {
break; break;
} }
// update process task // update process task
@ -166,7 +162,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
processInstance = processService.findProcessInstanceById(processInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) { } catch (Exception e) {
logger.error("exception",e); logger.error("exception", e);
if (processInstance != null) { if (processInstance != null) {
logger.error("wait task quit failed, instance id:{}, task id:{}", logger.error("wait task quit failed, instance id:{}, task id:{}",
processInstance.getId(), taskInstance.getId()); processInstance.getId(), taskInstance.getId());
@ -196,20 +192,20 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
/** /**
* judge all dependent tasks finish * judge all dependent tasks finish
*
* @return whether all dependent tasks finish * @return whether all dependent tasks finish
*/ */
private boolean allDependentTaskFinish(){ private boolean allDependentTaskFinish() {
boolean finish = true; boolean finish = true;
for(DependentExecute dependentExecute : dependentTaskList){ for (DependentExecute dependentExecute : dependentTaskList) {
for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) { for (Map.Entry<String, DependResult> entry : dependentExecute.getDependResultMap().entrySet()) {
if(!dependResultMap.containsKey(entry.getKey())){ if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue()); dependResultMap.put(entry.getKey(), entry.getValue());
//save depend result to log //save depend result to log
logger.info("dependent item complete {} {},{}", logger.info("dependent item complete {} {},{}", DEPENDENT_SPLIT, entry.getKey(), entry.getValue());
DEPENDENT_SPLIT, entry.getKey(), entry.getValue());
} }
} }
if(!dependentExecute.finish(dependentDate)){ if (!dependentExecute.finish(dependentDate)) {
finish = false; finish = false;
} }
} }
@ -218,17 +214,16 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
/** /**
* get dependent result * get dependent result
*
* @return DependResult * @return DependResult
*/ */
private DependResult getTaskDependResult(){ private DependResult getTaskDependResult() {
List<DependResult> dependResultList = new ArrayList<>(); List<DependResult> dependResultList = new ArrayList<>();
for(DependentExecute dependentExecute : dependentTaskList){ for (DependentExecute dependentExecute : dependentTaskList) {
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate); DependResult dependResult = dependentExecute.getModelDependResult(dependentDate);
dependResultList.add(dependResult); dependResultList.add(dependResult);
} }
DependResult result = DependentUtils.getDependResultForRelation( DependResult result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList);
this.dependentParameters.getRelation(), dependResultList
);
logger.info("dependent task completed, dependent result:{}", result); logger.info("dependent task completed, dependent result:{}", result);
return result; return result;
} }

49
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -448,13 +449,14 @@ public class MasterExecThread implements Runnable {
* find task instance in db. * find task instance in db.
* in case submit more than one same name task in the same time. * in case submit more than one same name task in the same time.
* *
* @param taskName task name * @param taskCode task code
* @param taskVersion task version
* @return TaskInstance * @return TaskInstance
*/ */
private TaskInstance findTaskIfExists(String taskName) { private TaskInstance findTaskIfExists(Long taskCode, int taskVersion) {
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(this.processInstance.getId()); List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(this.processInstance.getId());
for (TaskInstance taskInstance : taskInstanceList) { for (TaskInstance taskInstance : taskInstanceList) {
if (taskInstance.getName().equals(taskName)) { if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == taskVersion) {
return taskInstance; return taskInstance;
} }
} }
@ -465,20 +467,19 @@ public class MasterExecThread implements Runnable {
* encapsulation task * encapsulation task
* *
* @param processInstance process instance * @param processInstance process instance
* @param nodeName node name * @param taskNode taskNode
* @return TaskInstance * @return TaskInstance
*/ */
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
TaskNode taskNode) {
//update processInstance for update the globalParams //update processInstance for update the globalParams
this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId()); this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId());
TaskInstance taskInstance = findTaskIfExists(nodeName); TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion());
if (taskInstance == null) { if (taskInstance == null) {
taskInstance = new TaskInstance(); taskInstance = new TaskInstance();
taskInstance.setTaskCode(taskNode.getCode()); taskInstance.setTaskCode(taskNode.getCode());
taskInstance.setTaskDefinitionVersion(taskNode.getVersion()); taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
// task name // task name
taskInstance.setName(nodeName); taskInstance.setName(taskNode.getName());
// task instance state // task instance state
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
// process instance id // process instance id
@ -518,27 +519,28 @@ public class MasterExecThread implements Runnable {
} else { } else {
taskInstance.setWorkerGroup(taskWorkerGroup); taskInstance.setWorkerGroup(taskWorkerGroup);
} }
//get process global if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())) {
setProcessGlobal(taskNode, taskInstance); taskInstance.setTaskParams(taskNode.getDependence());
} else {
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getParams()));
}
// delay execution time // delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime()); taskInstance.setDelayTime(taskNode.getDelayTime());
} }
return taskInstance; return taskInstance;
} }
private void setProcessGlobal(TaskNode taskNode, TaskInstance taskInstance) { private String globalParamToTaskParams(String params) {
String globalParams = this.processInstance.getGlobalParams(); String globalParams = this.processInstance.getGlobalParams();
if (StringUtils.isNotEmpty(globalParams)) { if (StringUtils.isBlank(globalParams)) {
Map<String, String> globalMap = processService.getGlobalParamMap(globalParams); return params;
if (globalMap != null && globalMap.size() != 0) {
setGlobalMapToTask(taskNode, taskInstance, globalMap);
}
} }
Map<String, String> globalMap = processService.getGlobalParamMap(globalParams);
if (globalMap == null || globalMap.size() == 0) {
return params;
} }
// the process global param save in localParams
private void setGlobalMapToTask(TaskNode taskNode, TaskInstance taskInstance, Map<String, String> globalMap) { Map<String, Object> result = JSONUtils.toMap(params, String.class, Object.class);
// the param save in localParams
Map<String, Object> result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Object localParams = result.get(LOCAL_PARAMS); Object localParams = result.get(LOCAL_PARAMS);
if (localParams != null) { if (localParams != null) {
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
@ -552,8 +554,8 @@ public class MasterExecThread implements Runnable {
} }
} }
result.put(LOCAL_PARAMS, allParam); result.put(LOCAL_PARAMS, allParam);
taskInstance.setTaskParams(JSONUtils.toJsonString(result));
} }
return JSONUtils.toJsonString(result);
} }
private void submitPostNode(String parentNodeName) { private void submitPostNode(String parentNodeName) {
@ -567,9 +569,10 @@ public class MasterExecThread implements Runnable {
throw new RuntimeException(); throw new RuntimeException();
} }
TaskNode taskNodeObject = dag.getNode(taskNode); TaskNode taskNodeObject = dag.getNode(taskNode);
if (!TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNodeObject.getType())) {
VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue); VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
taskInstances.add(createTaskInstance(processInstance, taskNode, }
taskNodeObject)); taskInstances.add(createTaskInstance(processInstance, taskNodeObject));
} }
// if previous node success , post node submit // if previous node success , post node submit

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -87,7 +87,7 @@ public class DependentExecute {
*/ */
private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){ private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){
List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue()); List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
return calculateResultForTasks(dependentItem, dateIntervals ); return calculateResultForTasks(dependentItem, dateIntervals);
} }
/** /**
@ -257,9 +257,7 @@ public class DependentExecute {
} }
dependResultList.add(dependResult); dependResultList.add(dependResult);
} }
modelDependResult = DependentUtils.getDependResultForRelation( modelDependResult = DependentUtils.getDependResultForRelation(this.relation, dependResultList);
this.relation, dependResultList
);
return modelDependResult; return modelDependResult;
} }

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1247,7 +1247,6 @@ public class ProcessService {
if (taskInstance.isSubProcess()) { if (taskInstance.isSubProcess()) {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1); taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
} else { } else {
if (processInstanceState != ExecutionStatus.READY_STOP if (processInstanceState != ExecutionStatus.READY_STOP
&& processInstanceState != ExecutionStatus.READY_PAUSE) { && processInstanceState != ExecutionStatus.READY_PAUSE) {
// failure task set invalid // failure task set invalid

Loading…
Cancel
Save