dailidong
5 years ago
committed by
GitHub
20 changed files with 698 additions and 381 deletions
@ -0,0 +1,217 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.DependResult; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.model.DependentTaskModel; |
||||
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.utils.DependentUtils; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils; |
||||
import org.apache.dolphinscheduler.common.utils.OSUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.server.utils.DependentExecute; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.*; |
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; |
||||
|
||||
public class DependentTaskExecThread extends MasterBaseTaskExecThread { |
||||
|
||||
private DependentParameters dependentParameters; |
||||
|
||||
/** |
||||
* dependent task list |
||||
*/ |
||||
private List<DependentExecute> dependentTaskList = new ArrayList<>(); |
||||
|
||||
/** |
||||
* depend item result map |
||||
* save the result to log file |
||||
*/ |
||||
private Map<String, DependResult> dependResultMap = new HashMap<>(); |
||||
|
||||
|
||||
/** |
||||
* dependent date |
||||
*/ |
||||
private Date dependentDate; |
||||
|
||||
/** |
||||
* constructor of MasterBaseTaskExecThread |
||||
* |
||||
* @param taskInstance task instance |
||||
*/ |
||||
public DependentTaskExecThread(TaskInstance taskInstance) { |
||||
super(taskInstance); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Boolean submitWaitComplete() { |
||||
try{ |
||||
logger.info("dependent task start"); |
||||
this.taskInstance = submit(); |
||||
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, |
||||
taskInstance.getProcessDefinitionId(), |
||||
taskInstance.getProcessInstanceId(), |
||||
taskInstance.getId())); |
||||
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); |
||||
Thread.currentThread().setName(threadLoggerInfoName); |
||||
initTaskParameters(); |
||||
initDependParameters(); |
||||
waitTaskQuit(); |
||||
updateTaskState(); |
||||
}catch (Exception e){ |
||||
logger.error("dependent task run exception" , e); |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* init dependent parameters |
||||
*/ |
||||
private void initDependParameters() { |
||||
|
||||
this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), |
||||
DependentParameters.class); |
||||
|
||||
for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ |
||||
this.dependentTaskList.add(new DependentExecute( |
||||
taskModel.getDependItemList(), taskModel.getRelation())); |
||||
} |
||||
if(this.processInstance.getScheduleTime() != null){ |
||||
this.dependentDate = this.processInstance.getScheduleTime(); |
||||
}else{ |
||||
this.dependentDate = new Date(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
private void updateTaskState() { |
||||
ExecutionStatus status; |
||||
if(this.cancel){ |
||||
status = ExecutionStatus.KILL; |
||||
}else{ |
||||
DependResult result = getTaskDependResult(); |
||||
status = (result == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; |
||||
} |
||||
taskInstance.setState(status); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.saveTaskInstance(taskInstance); |
||||
} |
||||
|
||||
/** |
||||
* wait dependent tasks quit |
||||
*/ |
||||
private Boolean waitTaskQuit() { |
||||
logger.info("wait depend task : {} complete", this.taskInstance.getName()); |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
logger.info("task {} already complete. task state:{}", |
||||
this.taskInstance.getName(), |
||||
this.taskInstance.getState()); |
||||
return true; |
||||
} |
||||
while (Stopper.isRunning()) { |
||||
try{ |
||||
if(this.processInstance == null){ |
||||
logger.error("process instance not exists , master task exec thread exit"); |
||||
return true; |
||||
} |
||||
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ |
||||
cancelTaskInstance(); |
||||
break; |
||||
} |
||||
|
||||
if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){ |
||||
break; |
||||
} |
||||
// updateProcessInstance task instance
|
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
processInstance = processService.findProcessInstanceById(processInstance.getId()); |
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} catch (Exception e) { |
||||
logger.error("exception",e); |
||||
if (processInstance != null) { |
||||
logger.error("wait task quit failed, instance id:{}, task id:{}", |
||||
processInstance.getId(), taskInstance.getId()); |
||||
} |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* cancel dependent task |
||||
*/ |
||||
private void cancelTaskInstance() { |
||||
this.cancel = true; |
||||
} |
||||
|
||||
private void initTaskParameters() { |
||||
taskInstance.setLogPath(getTaskLogPath(taskInstance)); |
||||
taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); |
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION); |
||||
taskInstance.setStartTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
} |
||||
|
||||
/** |
||||
* judge all dependent tasks finish |
||||
* @return whether all dependent tasks finish |
||||
*/ |
||||
private boolean allDependentTaskFinish(){ |
||||
boolean finish = true; |
||||
for(DependentExecute dependentExecute : dependentTaskList){ |
||||
for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) { |
||||
if(!dependResultMap.containsKey(entry.getKey())){ |
||||
dependResultMap.put(entry.getKey(), entry.getValue()); |
||||
//save depend result to log
|
||||
logger.info("dependent item complete {} {},{}", |
||||
DEPENDENT_SPLIT, entry.getKey(), entry.getValue()); |
||||
} |
||||
} |
||||
if(!dependentExecute.finish(dependentDate)){ |
||||
finish = false; |
||||
} |
||||
} |
||||
return finish; |
||||
} |
||||
|
||||
/** |
||||
* get dependent result |
||||
* @return DependResult |
||||
*/ |
||||
private DependResult getTaskDependResult(){ |
||||
List<DependResult> dependResultList = new ArrayList<>(); |
||||
for(DependentExecute dependentExecute : dependentTaskList){ |
||||
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate); |
||||
dependResultList.add(dependResult); |
||||
} |
||||
DependResult result = DependentUtils.getDependResultForRelation( |
||||
this.dependentParameters.getRelation(), dependResultList |
||||
); |
||||
logger.info("dependent task completed, dependent result:{}", result); |
||||
return result; |
||||
} |
||||
} |
@ -1,198 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.worker.task.dependent; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.DependResult; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.model.DependentTaskModel; |
||||
import org.apache.dolphinscheduler.common.task.AbstractParameters; |
||||
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.utils.DependentUtils; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
import org.slf4j.Logger; |
||||
|
||||
import java.util.*; |
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; |
||||
|
||||
/** |
||||
* Dependent Task |
||||
*/ |
||||
public class DependentTask extends AbstractTask { |
||||
|
||||
/** |
||||
* dependent task list |
||||
*/ |
||||
private List<DependentExecute> dependentTaskList = new ArrayList<>(); |
||||
|
||||
/** |
||||
* depend item result map |
||||
* save the result to log file |
||||
*/ |
||||
private Map<String, DependResult> dependResultMap = new HashMap<>(); |
||||
|
||||
/** |
||||
* dependent parameters |
||||
*/ |
||||
private DependentParameters dependentParameters; |
||||
|
||||
/** |
||||
* dependent date |
||||
*/ |
||||
private Date dependentDate; |
||||
|
||||
/** |
||||
* process service |
||||
*/ |
||||
private ProcessService processService; |
||||
|
||||
/** |
||||
* taskExecutionContext |
||||
*/ |
||||
private TaskExecutionContext taskExecutionContext; |
||||
|
||||
/** |
||||
* constructor |
||||
* @param taskExecutionContext taskExecutionContext |
||||
* @param logger logger |
||||
*/ |
||||
public DependentTask(TaskExecutionContext taskExecutionContext, Logger logger) { |
||||
super(taskExecutionContext, logger); |
||||
this.taskExecutionContext = taskExecutionContext; |
||||
} |
||||
|
||||
@Override |
||||
public void init(){ |
||||
logger.info("dependent task initialize"); |
||||
|
||||
this.dependentParameters = JSONUtils.parseObject(null, |
||||
DependentParameters.class); |
||||
if(dependentParameters != null){ |
||||
for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ |
||||
this.dependentTaskList.add(new DependentExecute( |
||||
taskModel.getDependItemList(), taskModel.getRelation())); |
||||
} |
||||
} |
||||
|
||||
this.processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
|
||||
if(taskExecutionContext.getScheduleTime() != null){ |
||||
this.dependentDate = taskExecutionContext.getScheduleTime(); |
||||
}else{ |
||||
this.dependentDate = taskExecutionContext.getStartTime(); |
||||
} |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public void handle() throws Exception { |
||||
// set the name of the current thread
|
||||
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); |
||||
Thread.currentThread().setName(threadLoggerInfoName); |
||||
|
||||
try{ |
||||
TaskInstance taskInstance = null; |
||||
while(Stopper.isRunning()){ |
||||
taskInstance = processService.findTaskInstanceById(this.taskExecutionContext.getTaskInstanceId()); |
||||
|
||||
if(taskInstance == null){ |
||||
exitStatusCode = -1; |
||||
break; |
||||
} |
||||
|
||||
if(taskInstance.getState() == ExecutionStatus.KILL){ |
||||
this.cancel = true; |
||||
} |
||||
|
||||
if(this.cancel || allDependentTaskFinish()){ |
||||
break; |
||||
} |
||||
|
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} |
||||
|
||||
if(cancel){ |
||||
exitStatusCode = Constants.EXIT_CODE_KILL; |
||||
}else{ |
||||
DependResult result = getTaskDependResult(); |
||||
exitStatusCode = (result == DependResult.SUCCESS) ? |
||||
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; |
||||
} |
||||
}catch (Exception e){ |
||||
logger.error(e.getMessage(),e); |
||||
exitStatusCode = -1; |
||||
throw e; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get dependent result |
||||
* @return DependResult |
||||
*/ |
||||
private DependResult getTaskDependResult(){ |
||||
List<DependResult> dependResultList = new ArrayList<>(); |
||||
for(DependentExecute dependentExecute : dependentTaskList){ |
||||
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate); |
||||
dependResultList.add(dependResult); |
||||
} |
||||
DependResult result = DependentUtils.getDependResultForRelation( |
||||
this.dependentParameters.getRelation(), dependResultList |
||||
); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* judge all dependent tasks finish |
||||
* @return whether all dependent tasks finish |
||||
*/ |
||||
private boolean allDependentTaskFinish(){ |
||||
boolean finish = true; |
||||
for(DependentExecute dependentExecute : dependentTaskList){ |
||||
for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) { |
||||
if(!dependResultMap.containsKey(entry.getKey())){ |
||||
dependResultMap.put(entry.getKey(), entry.getValue()); |
||||
//save depend result to log
|
||||
logger.info("dependent item complete {} {},{}", |
||||
DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString()); |
||||
} |
||||
} |
||||
if(!dependentExecute.finish(dependentDate)){ |
||||
finish = false; |
||||
} |
||||
} |
||||
return finish; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void cancelApplication(boolean cancelApplication) throws Exception { |
||||
// cancel process
|
||||
this.cancel = true; |
||||
} |
||||
|
||||
@Override |
||||
public AbstractParameters getParameters() { |
||||
return null; |
||||
} |
||||
} |
@ -0,0 +1,132 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.master; |
||||
|
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
||||
import org.apache.dolphinscheduler.server.master.runner.ConditionsTaskExecThread; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
import org.junit.Assert; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Mockito; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.context.ApplicationContext; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
@RunWith(MockitoJUnitRunner.Silent.class) |
||||
public class ConditionsTaskTest { |
||||
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); |
||||
|
||||
private ProcessService processService; |
||||
private ApplicationContext applicationContext; |
||||
|
||||
|
||||
private MasterConfig config; |
||||
|
||||
@Before |
||||
public void before() { |
||||
config = new MasterConfig(); |
||||
config.setMasterTaskCommitRetryTimes(3); |
||||
config.setMasterTaskCommitInterval(1000); |
||||
processService = Mockito.mock(ProcessService.class); |
||||
applicationContext = Mockito.mock(ApplicationContext.class); |
||||
SpringApplicationContext springApplicationContext = new SpringApplicationContext(); |
||||
springApplicationContext.setApplicationContext(applicationContext); |
||||
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); |
||||
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); |
||||
|
||||
Mockito.when(processService |
||||
.findTaskInstanceById(252612)) |
||||
.thenReturn(getTaskInstance()); |
||||
|
||||
Mockito.when(processService.saveTaskInstance(getTaskInstance())) |
||||
.thenReturn(true); |
||||
|
||||
Mockito.when(processService.findProcessInstanceById(10112)) |
||||
.thenReturn(getProcessInstance()); |
||||
|
||||
Mockito.when(processService |
||||
.findValidTaskListByProcessId(10112)) |
||||
.thenReturn(getTaskInstances()); |
||||
} |
||||
|
||||
@Test |
||||
public void testCondition(){ |
||||
TaskInstance taskInstance = getTaskInstance(); |
||||
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"depTasks\":\"1\",\"status\":\"SUCCESS\"}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; |
||||
String conditionResult = "{\"successNode\":[\"2\"],\"failedNode\":[\"3\"]}"; |
||||
|
||||
taskInstance.setDependency(dependString); |
||||
Mockito.when(processService.submitTask(taskInstance)) |
||||
.thenReturn(taskInstance); |
||||
ConditionsTaskExecThread conditions = |
||||
new ConditionsTaskExecThread(taskInstance); |
||||
|
||||
try { |
||||
conditions.call(); |
||||
} catch (Exception e) { |
||||
e.printStackTrace(); |
||||
} |
||||
|
||||
Assert.assertEquals(ExecutionStatus.SUCCESS, conditions.getTaskInstance().getState()); |
||||
} |
||||
|
||||
|
||||
private TaskInstance getTaskInstance(){ |
||||
TaskInstance taskInstance = new TaskInstance(); |
||||
taskInstance.setId(252612); |
||||
taskInstance.setName("C"); |
||||
taskInstance.setTaskType("CONDITIONS"); |
||||
taskInstance.setProcessInstanceId(10112); |
||||
taskInstance.setProcessDefinitionId(100001); |
||||
return taskInstance; |
||||
} |
||||
|
||||
|
||||
|
||||
private List<TaskInstance> getTaskInstances(){ |
||||
List<TaskInstance> list = new ArrayList<>(); |
||||
TaskInstance taskInstance = new TaskInstance(); |
||||
taskInstance.setId(199999); |
||||
taskInstance.setName("1"); |
||||
taskInstance.setState(ExecutionStatus.SUCCESS); |
||||
list.add(taskInstance); |
||||
return list; |
||||
} |
||||
|
||||
private ProcessInstance getProcessInstance(){ |
||||
ProcessInstance processInstance = new ProcessInstance(); |
||||
processInstance.setId(10112); |
||||
processInstance.setProcessDefinitionId(100001); |
||||
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION); |
||||
|
||||
return processInstance; |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue