|
|
@ -17,13 +17,12 @@ |
|
|
|
|
|
|
|
|
|
|
|
package org.apache.dolphinscheduler.server.master.runner.task; |
|
|
|
package org.apache.dolphinscheduler.server.master.runner.task; |
|
|
|
|
|
|
|
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; |
|
|
|
|
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; |
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; |
|
|
|
|
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.utils.NetUtils; |
|
|
|
import org.apache.dolphinscheduler.common.utils.NetUtils; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; |
|
|
@ -111,8 +110,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor { |
|
|
|
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) { |
|
|
|
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) { |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
logger.info("dependent task {} timeout, strategy {} ", |
|
|
|
logger.info("dependent taskInstanceId: {} timeout, taskName: {}, strategy: {} ", |
|
|
|
taskInstance.getId(), taskTimeoutStrategy.getDescp()); |
|
|
|
taskInstance.getId(), taskInstance.getName(), taskTimeoutStrategy.getDescp()); |
|
|
|
result = DependResult.FAILED; |
|
|
|
result = DependResult.FAILED; |
|
|
|
endTask(); |
|
|
|
endTask(); |
|
|
|
return true; |
|
|
|
return true; |
|
|
@ -161,7 +160,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { |
|
|
|
if (!dependResultMap.containsKey(entry.getKey())) { |
|
|
|
if (!dependResultMap.containsKey(entry.getKey())) { |
|
|
|
dependResultMap.put(entry.getKey(), entry.getValue()); |
|
|
|
dependResultMap.put(entry.getKey(), entry.getValue()); |
|
|
|
//save depend result to log
|
|
|
|
//save depend result to log
|
|
|
|
logger.info("dependent item complete {} {},{}", DEPENDENT_SPLIT, entry.getKey(), entry.getValue()); |
|
|
|
logger.info("dependent item complete, task: {}, result: {}", entry.getKey(), entry.getValue()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (!dependentExecute.finish(dependentDate)) { |
|
|
|
if (!dependentExecute.finish(dependentDate)) { |
|
|
@ -183,7 +182,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { |
|
|
|
dependResultList.add(dependResult); |
|
|
|
dependResultList.add(dependResult); |
|
|
|
} |
|
|
|
} |
|
|
|
result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList); |
|
|
|
result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList); |
|
|
|
logger.info("dependent task completed, dependent result:{}", result); |
|
|
|
logger.info("dependent task completed, dependent result: {}", result); |
|
|
|
return result; |
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|