qiaozhanwei
5 years ago
13 changed files with 784 additions and 208 deletions
@ -0,0 +1,296 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.worker.task.dependent; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.DependResult; |
||||
import org.apache.dolphinscheduler.common.enums.DependentRelation; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.model.DateInterval; |
||||
import org.apache.dolphinscheduler.common.model.DependentItem; |
||||
import org.apache.dolphinscheduler.common.model.TaskNode; |
||||
import org.apache.dolphinscheduler.common.utils.DependentUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.*; |
||||
|
||||
/** |
||||
* dependent item execute |
||||
*/ |
||||
public class DependentExecute { |
||||
/** |
||||
* process service |
||||
*/ |
||||
private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
|
||||
/** |
||||
* depend item list |
||||
*/ |
||||
private List<DependentItem> dependItemList; |
||||
|
||||
/** |
||||
* dependent relation |
||||
*/ |
||||
private DependentRelation relation; |
||||
|
||||
/** |
||||
* depend result |
||||
*/ |
||||
private DependResult modelDependResult = DependResult.WAITING; |
||||
|
||||
/** |
||||
* depend result map |
||||
*/ |
||||
private Map<String, DependResult> dependResultMap = new HashMap<>(); |
||||
|
||||
/** |
||||
* logger |
||||
*/ |
||||
private Logger logger = LoggerFactory.getLogger(DependentExecute.class); |
||||
|
||||
/** |
||||
* constructor |
||||
* @param itemList item list |
||||
* @param relation relation |
||||
*/ |
||||
public DependentExecute(List<DependentItem> itemList, DependentRelation relation){ |
||||
this.dependItemList = itemList; |
||||
this.relation = relation; |
||||
} |
||||
|
||||
/** |
||||
* get dependent item for one dependent item |
||||
* @param dependentItem dependent item |
||||
* @param currentTime current time |
||||
* @return DependResult |
||||
*/ |
||||
private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){ |
||||
List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue()); |
||||
return calculateResultForTasks(dependentItem, dateIntervals ); |
||||
} |
||||
|
||||
/** |
||||
* calculate dependent result for one dependent item. |
||||
* @param dependentItem dependent item |
||||
* @param dateIntervals date intervals |
||||
* @return dateIntervals |
||||
*/ |
||||
private DependResult calculateResultForTasks(DependentItem dependentItem, |
||||
List<DateInterval> dateIntervals) { |
||||
|
||||
DependResult result = DependResult.FAILED; |
||||
for(DateInterval dateInterval : dateIntervals){ |
||||
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), |
||||
dateInterval); |
||||
if(processInstance == null){ |
||||
logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}", |
||||
dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() ); |
||||
return DependResult.FAILED; |
||||
} |
||||
// need to check workflow for updates, so get all task and check the task state
|
||||
if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){ |
||||
List<TaskNode> taskNodes = |
||||
processService.getTaskNodeListByDefinitionId(dependentItem.getDefinitionId()); |
||||
|
||||
if(taskNodes != null && taskNodes.size() > 0){ |
||||
List<DependResult> results = new ArrayList<>(); |
||||
DependResult tmpResult = DependResult.FAILED; |
||||
for(TaskNode taskNode:taskNodes){ |
||||
tmpResult = getDependTaskResult(taskNode.getName(),processInstance); |
||||
if(DependResult.FAILED == tmpResult){ |
||||
break; |
||||
}else{ |
||||
results.add(getDependTaskResult(taskNode.getName(),processInstance)); |
||||
} |
||||
} |
||||
|
||||
if(DependResult.FAILED == tmpResult){ |
||||
result = DependResult.FAILED; |
||||
}else if(results.contains(DependResult.WAITING)){ |
||||
result = DependResult.WAITING; |
||||
}else{ |
||||
result = DependResult.SUCCESS; |
||||
} |
||||
}else{ |
||||
result = DependResult.FAILED; |
||||
} |
||||
}else{ |
||||
result = getDependTaskResult(dependentItem.getDepTasks(),processInstance); |
||||
} |
||||
if(result != DependResult.SUCCESS){ |
||||
break; |
||||
} |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* get depend task result |
||||
* @param taskName |
||||
* @param processInstance |
||||
* @return |
||||
*/ |
||||
private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) { |
||||
DependResult result = DependResult.FAILED; |
||||
TaskInstance taskInstance = null; |
||||
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); |
||||
|
||||
for(TaskInstance task : taskInstanceList){ |
||||
if(task.getName().equals(taskName)){ |
||||
taskInstance = task; |
||||
break; |
||||
} |
||||
} |
||||
|
||||
if(taskInstance == null){ |
||||
// cannot find task in the process instance
|
||||
// maybe because process instance is running or failed.
|
||||
result = getDependResultByProcessStateWhenTaskNull(processInstance.getState()); |
||||
}else{ |
||||
result = getDependResultByState(taskInstance.getState()); |
||||
} |
||||
|
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* find the last one process instance that : |
||||
* 1. manual run and finish between the interval |
||||
* 2. schedule run and schedule time between the interval |
||||
* @param definitionId definition id |
||||
* @param dateInterval date interval |
||||
* @return ProcessInstance |
||||
*/ |
||||
private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) { |
||||
|
||||
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval); |
||||
if(runningProcess != null){ |
||||
return runningProcess; |
||||
} |
||||
|
||||
ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval( |
||||
definitionId, dateInterval |
||||
); |
||||
|
||||
ProcessInstance lastManualProcess = processService.findLastManualProcessInterval( |
||||
definitionId, dateInterval |
||||
); |
||||
|
||||
if(lastManualProcess ==null){ |
||||
return lastSchedulerProcess; |
||||
} |
||||
if(lastSchedulerProcess == null){ |
||||
return lastManualProcess; |
||||
} |
||||
|
||||
return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime()))? |
||||
lastManualProcess : lastSchedulerProcess; |
||||
} |
||||
|
||||
/** |
||||
* get dependent result by task/process instance state |
||||
* @param state state |
||||
* @return DependResult |
||||
*/ |
||||
private DependResult getDependResultByState(ExecutionStatus state) { |
||||
|
||||
if(state.typeIsRunning() |
||||
|| state == ExecutionStatus.SUBMITTED_SUCCESS |
||||
|| state == ExecutionStatus.WAITTING_THREAD){ |
||||
return DependResult.WAITING; |
||||
}else if(state.typeIsSuccess()){ |
||||
return DependResult.SUCCESS; |
||||
}else{ |
||||
return DependResult.FAILED; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get dependent result by task instance state when task instance is null |
||||
* @param state state |
||||
* @return DependResult |
||||
*/ |
||||
private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) { |
||||
|
||||
if(state.typeIsRunning() |
||||
|| state == ExecutionStatus.SUBMITTED_SUCCESS |
||||
|| state == ExecutionStatus.WAITTING_THREAD){ |
||||
return DependResult.WAITING; |
||||
}else{ |
||||
return DependResult.FAILED; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* judge depend item finished |
||||
* @param currentTime current time |
||||
* @return boolean |
||||
*/ |
||||
public boolean finish(Date currentTime){ |
||||
if(modelDependResult == DependResult.WAITING){ |
||||
modelDependResult = getModelDependResult(currentTime); |
||||
return false; |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* get model depend result |
||||
* @param currentTime current time |
||||
* @return DependResult |
||||
*/ |
||||
public DependResult getModelDependResult(Date currentTime){ |
||||
|
||||
List<DependResult> dependResultList = new ArrayList<>(); |
||||
|
||||
for(DependentItem dependentItem : dependItemList){ |
||||
DependResult dependResult = getDependResultForItem(dependentItem, currentTime); |
||||
if(dependResult != DependResult.WAITING){ |
||||
dependResultMap.put(dependentItem.getKey(), dependResult); |
||||
} |
||||
dependResultList.add(dependResult); |
||||
} |
||||
modelDependResult = DependentUtils.getDependResultForRelation( |
||||
this.relation, dependResultList |
||||
); |
||||
return modelDependResult; |
||||
} |
||||
|
||||
/** |
||||
* get dependent item result |
||||
* @param item item |
||||
* @param currentTime current time |
||||
* @return DependResult |
||||
*/ |
||||
private DependResult getDependResultForItem(DependentItem item, Date currentTime){ |
||||
String key = item.getKey(); |
||||
if(dependResultMap.containsKey(key)){ |
||||
return dependResultMap.get(key); |
||||
} |
||||
return getDependentResultForItem(item, currentTime); |
||||
} |
||||
|
||||
public Map<String, DependResult> getDependResultMap(){ |
||||
return dependResultMap; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,192 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.worker.task.dependent; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.DependResult; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.model.DependentTaskModel; |
||||
import org.apache.dolphinscheduler.common.task.AbstractParameters; |
||||
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.utils.DependentUtils; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; |
||||
import org.apache.dolphinscheduler.server.worker.task.TaskProps; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
import org.slf4j.Logger; |
||||
|
||||
import java.util.*; |
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; |
||||
|
||||
/** |
||||
* Dependent Task |
||||
*/ |
||||
public class DependentTask extends AbstractTask { |
||||
|
||||
/** |
||||
* dependent task list |
||||
*/ |
||||
private List<DependentExecute> dependentTaskList = new ArrayList<>(); |
||||
|
||||
/** |
||||
* depend item result map |
||||
* save the result to log file |
||||
*/ |
||||
private Map<String, DependResult> dependResultMap = new HashMap<>(); |
||||
|
||||
/** |
||||
* dependent parameters |
||||
*/ |
||||
private DependentParameters dependentParameters; |
||||
|
||||
/** |
||||
* dependent date |
||||
*/ |
||||
private Date dependentDate; |
||||
|
||||
/** |
||||
* process service |
||||
*/ |
||||
private ProcessService processService; |
||||
|
||||
/** |
||||
* constructor |
||||
* @param props props |
||||
* @param logger logger |
||||
*/ |
||||
public DependentTask(TaskProps props, Logger logger) { |
||||
super(props, logger); |
||||
} |
||||
|
||||
@Override |
||||
public void init(){ |
||||
logger.info("dependent task initialize"); |
||||
|
||||
this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(), |
||||
DependentParameters.class); |
||||
if(dependentParameters != null){ |
||||
for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ |
||||
this.dependentTaskList.add(new DependentExecute( |
||||
taskModel.getDependItemList(), taskModel.getRelation())); |
||||
} |
||||
} |
||||
|
||||
this.processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
|
||||
if(taskProps.getScheduleTime() != null){ |
||||
this.dependentDate = taskProps.getScheduleTime(); |
||||
}else{ |
||||
this.dependentDate = taskProps.getTaskStartTime(); |
||||
} |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public void handle() throws Exception { |
||||
// set the name of the current thread
|
||||
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); |
||||
Thread.currentThread().setName(threadLoggerInfoName); |
||||
|
||||
try{ |
||||
TaskInstance taskInstance = null; |
||||
while(Stopper.isRunning()){ |
||||
taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstId()); |
||||
|
||||
if(taskInstance == null){ |
||||
exitStatusCode = -1; |
||||
break; |
||||
} |
||||
|
||||
if(taskInstance.getState() == ExecutionStatus.KILL){ |
||||
this.cancel = true; |
||||
} |
||||
|
||||
if(this.cancel || allDependentTaskFinish()){ |
||||
break; |
||||
} |
||||
|
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} |
||||
|
||||
if(cancel){ |
||||
exitStatusCode = Constants.EXIT_CODE_KILL; |
||||
}else{ |
||||
DependResult result = getTaskDependResult(); |
||||
exitStatusCode = (result == DependResult.SUCCESS) ? |
||||
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; |
||||
} |
||||
}catch (Exception e){ |
||||
logger.error(e.getMessage(),e); |
||||
exitStatusCode = -1; |
||||
throw e; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get dependent result |
||||
* @return DependResult |
||||
*/ |
||||
private DependResult getTaskDependResult(){ |
||||
List<DependResult> dependResultList = new ArrayList<>(); |
||||
for(DependentExecute dependentExecute : dependentTaskList){ |
||||
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate); |
||||
dependResultList.add(dependResult); |
||||
} |
||||
DependResult result = DependentUtils.getDependResultForRelation( |
||||
this.dependentParameters.getRelation(), dependResultList |
||||
); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* judge all dependent tasks finish |
||||
* @return whether all dependent tasks finish |
||||
*/ |
||||
private boolean allDependentTaskFinish(){ |
||||
boolean finish = true; |
||||
for(DependentExecute dependentExecute : dependentTaskList){ |
||||
for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) { |
||||
if(!dependResultMap.containsKey(entry.getKey())){ |
||||
dependResultMap.put(entry.getKey(), entry.getValue()); |
||||
//save depend result to log
|
||||
logger.info("dependent item complete {} {},{}", |
||||
DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString()); |
||||
} |
||||
} |
||||
if(!dependentExecute.finish(dependentDate)){ |
||||
finish = false; |
||||
} |
||||
} |
||||
return finish; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void cancelApplication(boolean cancelApplication) throws Exception { |
||||
// cancel process
|
||||
this.cancel = true; |
||||
} |
||||
|
||||
@Override |
||||
public AbstractParameters getParameters() { |
||||
return null; |
||||
} |
||||
} |
Loading…
Reference in new issue