Wenjun Ruan
5 months ago
committed by
GitHub
151 changed files with 1333 additions and 2647 deletions
@ -0,0 +1,21 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<configuration scan="true" scanPeriod="120 seconds"> |
||||
<logger name="*" level="ERROR"/> |
||||
</configuration> |
@ -1,59 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
||||
|
||||
import java.util.Optional; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(StateEventHandler.class) |
||||
@Slf4j |
||||
public class WorkflowBlockStateEventHandler implements StateEventHandler { |
||||
|
||||
@Override |
||||
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, |
||||
StateEvent stateEvent) throws StateEventHandleError { |
||||
log.info("Handle workflow instance state block event"); |
||||
Optional<TaskInstance> taskInstanceOptional = |
||||
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()); |
||||
if (!taskInstanceOptional.isPresent()) { |
||||
throw new StateEventHandleError("Cannot find taskInstance from taskMap by taskInstanceId: " |
||||
+ stateEvent.getTaskInstanceId()); |
||||
} |
||||
TaskInstance task = taskInstanceOptional.get(); |
||||
|
||||
BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class); |
||||
if (parameters != null && parameters.isAlertWhenBlocking()) { |
||||
workflowExecuteRunnable.processBlock(); |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public StateEventType getEventType() { |
||||
return StateEventType.PROCESS_BLOCKED; |
||||
} |
||||
} |
@ -1,139 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task.blocking; |
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.BlockingOpportunity; |
||||
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; |
||||
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
||||
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; |
||||
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; |
||||
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.function.Function; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference; |
||||
|
||||
@Slf4j |
||||
public class BlockingLogicTask extends BaseSyncLogicTask<BlockingParameters> { |
||||
|
||||
public static final String TASK_TYPE = "BLOCKING"; |
||||
|
||||
private final ProcessInstanceExecCacheManager processInstanceExecCacheManager; |
||||
|
||||
private final ProcessInstanceDao processInstanceDao; |
||||
|
||||
private final TaskInstanceDao taskInstanceDao; |
||||
|
||||
public BlockingLogicTask(TaskExecutionContext taskExecutionContext, |
||||
ProcessInstanceExecCacheManager processInstanceExecCacheManager, |
||||
ProcessInstanceDao processInstanceDao, |
||||
TaskInstanceDao taskInstanceDao) { |
||||
super(taskExecutionContext, |
||||
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), new TypeReference<BlockingParameters>() { |
||||
})); |
||||
this.processInstanceExecCacheManager = processInstanceExecCacheManager; |
||||
this.processInstanceDao = processInstanceDao; |
||||
this.taskInstanceDao = taskInstanceDao; |
||||
} |
||||
|
||||
@Override |
||||
public void handle() throws MasterTaskExecuteException { |
||||
DependResult conditionResult = calculateConditionResult(); |
||||
DependResult expected = taskParameters.getBlockingOpportunity() |
||||
.equals(BlockingOpportunity.BLOCKING_ON_SUCCESS.getDesc()) |
||||
? DependResult.SUCCESS |
||||
: DependResult.FAILED; |
||||
boolean isBlocked = (expected == conditionResult); |
||||
log.info("blocking opportunity: expected-->{}, actual-->{}", expected, conditionResult); |
||||
ProcessInstance workflowInstance = processInstanceExecCacheManager |
||||
.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()).getWorkflowExecuteContext() |
||||
.getWorkflowInstance(); |
||||
workflowInstance.setBlocked(isBlocked); |
||||
if (isBlocked) { |
||||
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_BLOCK, "ready block"); |
||||
} |
||||
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); |
||||
} |
||||
|
||||
private DependResult calculateConditionResult() throws MasterTaskExecuteException { |
||||
// todo: Directly get the task instance from the cache
|
||||
Map<Long, TaskInstance> completeTaskList = taskInstanceDao |
||||
.queryValidTaskListByWorkflowInstanceId(taskExecutionContext.getProcessInstanceId(), |
||||
taskExecutionContext.getTestFlag()) |
||||
.stream() |
||||
.collect(Collectors.toMap(TaskInstance::getTaskCode, Function.identity())); |
||||
|
||||
// todo: we need to parse the task parameter from TaskExecutionContext
|
||||
TaskInstance taskInstance = |
||||
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()) |
||||
.getTaskInstance(taskExecutionContext.getTaskInstanceId()) |
||||
.orElseThrow(() -> new MasterTaskExecuteException("Task instance not found")); |
||||
DependentParameters dependentParameters = taskInstance.getDependency(); |
||||
|
||||
List<DependResult> tempResultList = new ArrayList<>(); |
||||
for (DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()) { |
||||
List<DependResult> itemDependResult = new ArrayList<>(); |
||||
for (DependentItem item : dependentTaskModel.getDependItemList()) { |
||||
itemDependResult.add(getDependResultForItem(item, completeTaskList)); |
||||
} |
||||
DependResult tempResult = |
||||
DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult); |
||||
tempResultList.add(tempResult); |
||||
} |
||||
return DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), tempResultList); |
||||
} |
||||
|
||||
private DependResult getDependResultForItem(DependentItem item, Map<Long, TaskInstance> completeTaskList) { |
||||
|
||||
DependResult dependResult = DependResult.SUCCESS; |
||||
if (!completeTaskList.containsKey(item.getDepTaskCode())) { |
||||
log.info("depend item: {} have not completed yet.", item.getDepTaskCode()); |
||||
dependResult = DependResult.FAILED; |
||||
return dependResult; |
||||
} |
||||
TaskInstance taskInstance = completeTaskList.get(item.getDepTaskCode()); |
||||
if (taskInstance.getState() != item.getStatus()) { |
||||
log.info("depend item : {} expect status: {}, actual status: {}", item.getDepTaskCode(), item.getStatus(), |
||||
taskInstance.getState().name()); |
||||
dependResult = DependResult.FAILED; |
||||
} |
||||
log.info("Dependent item complete {} {},{}", |
||||
Constants.DEPENDENT_SPLIT, item.getDepTaskCode(), dependResult); |
||||
return dependResult; |
||||
} |
||||
|
||||
} |
@ -1,51 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task.blocking; |
||||
|
||||
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; |
||||
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.ILogicTaskPluginFactory; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class BlockingLogicTaskPluginFactory implements ILogicTaskPluginFactory<BlockingLogicTask> { |
||||
|
||||
@Autowired |
||||
private ProcessInstanceDao processInstanceDao; |
||||
|
||||
@Autowired |
||||
private TaskInstanceDao taskInstanceDao; |
||||
|
||||
@Autowired |
||||
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; |
||||
|
||||
@Override |
||||
public BlockingLogicTask createLogicTask(TaskExecutionContext taskExecutionContext) { |
||||
return new BlockingLogicTask(taskExecutionContext, processInstanceExecCacheManager, processInstanceDao, |
||||
taskInstanceDao); |
||||
} |
||||
|
||||
@Override |
||||
public String getTaskType() { |
||||
return BlockingLogicTask.TASK_TYPE; |
||||
} |
||||
} |
@ -1,48 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.utils; |
||||
|
||||
import org.apache.dolphinscheduler.server.master.runner.task.blocking.BlockingLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.condition.ConditionLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.dynamic.DynamicLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask; |
||||
|
||||
import java.util.Set; |
||||
|
||||
import lombok.experimental.UtilityClass; |
||||
|
||||
import com.google.common.collect.Sets; |
||||
|
||||
@UtilityClass |
||||
public class TaskUtils { |
||||
|
||||
// todo: Add to SPI
|
||||
private final Set<String> MASTER_TASK_TYPES = Sets.newHashSet( |
||||
BlockingLogicTask.TASK_TYPE, |
||||
ConditionLogicTask.TASK_TYPE, |
||||
DependentLogicTask.TASK_TYPE, |
||||
SubWorkflowLogicTask.TASK_TYPE, |
||||
SwitchLogicTask.TASK_TYPE, |
||||
DynamicLogicTask.TASK_TYPE); |
||||
|
||||
public boolean isMasterTask(String taskType) { |
||||
return MASTER_TASK_TYPES.contains(taskType); |
||||
} |
||||
} |
@ -1,39 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.utils; |
||||
|
||||
import org.apache.dolphinscheduler.server.master.runner.task.blocking.BlockingLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.condition.ConditionLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.dependent.DependentLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; |
||||
import org.apache.dolphinscheduler.server.master.runner.task.switchtask.SwitchLogicTask; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
public class TaskUtilsTest { |
||||
|
||||
@Test |
||||
public void isMasterTask() { |
||||
Assertions.assertTrue(TaskUtils.isMasterTask(BlockingLogicTask.TASK_TYPE)); |
||||
Assertions.assertTrue(TaskUtils.isMasterTask(ConditionLogicTask.TASK_TYPE)); |
||||
Assertions.assertTrue(TaskUtils.isMasterTask(DependentLogicTask.TASK_TYPE)); |
||||
Assertions.assertTrue(TaskUtils.isMasterTask(SubWorkflowLogicTask.TASK_TYPE)); |
||||
Assertions.assertTrue(TaskUtils.isMasterTask(SwitchLogicTask.TASK_TYPE)); |
||||
} |
||||
} |
@ -1,51 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.parameters; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
public class BlockingParameters extends AbstractParameters { |
||||
|
||||
// condition of blocking: BlockingOnFailed or BlockingOnSuccess
|
||||
private String blockingOpportunity; |
||||
|
||||
// if true, alert when blocking, otherwise do nothing
|
||||
|
||||
private boolean isAlertWhenBlocking; |
||||
|
||||
@Override |
||||
public boolean checkParameters() { |
||||
return !StringUtils.isEmpty(blockingOpportunity); |
||||
} |
||||
|
||||
public String getBlockingOpportunity() { |
||||
return blockingOpportunity; |
||||
} |
||||
|
||||
public void setBlockingCondition(String blockingOpportunity) { |
||||
this.blockingOpportunity = blockingOpportunity; |
||||
} |
||||
|
||||
public boolean isAlertWhenBlocking() { |
||||
return isAlertWhenBlocking; |
||||
} |
||||
|
||||
public void setAlertWhenBlocking(boolean alertWhenBlocking) { |
||||
isAlertWhenBlocking = alertWhenBlocking; |
||||
} |
||||
} |
@ -1,116 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.parameters; |
||||
|
||||
/** |
||||
* TODO <p>Need to optimize, why there are multiple task parameter variables:taskParams,dependence,switchResult</p> |
||||
* |
||||
*/ |
||||
public class ParametersNode { |
||||
|
||||
private String taskType; |
||||
|
||||
private String taskParams; |
||||
|
||||
private String dependence; |
||||
|
||||
private String switchResult; |
||||
|
||||
public static ParametersNode.ParametersNodeBuilder builder() { |
||||
return new ParametersNode.ParametersNodeBuilder(); |
||||
} |
||||
|
||||
public static class ParametersNodeBuilder { |
||||
|
||||
private String taskType; |
||||
|
||||
private String taskParams; |
||||
|
||||
private String dependence; |
||||
|
||||
private String switchResult; |
||||
|
||||
public ParametersNodeBuilder taskType(String taskType) { |
||||
this.taskType = taskType; |
||||
return this; |
||||
} |
||||
|
||||
public ParametersNodeBuilder taskParams(String taskParams) { |
||||
this.taskParams = taskParams; |
||||
return this; |
||||
} |
||||
|
||||
public ParametersNodeBuilder dependence(String dependence) { |
||||
this.dependence = dependence; |
||||
return this; |
||||
} |
||||
|
||||
public ParametersNodeBuilder switchResult(String switchResult) { |
||||
this.switchResult = switchResult; |
||||
return this; |
||||
} |
||||
|
||||
public ParametersNode build() { |
||||
return new ParametersNode(this.taskType, this.taskParams, this.dependence, this.switchResult); |
||||
} |
||||
|
||||
} |
||||
|
||||
public ParametersNode() { |
||||
|
||||
} |
||||
|
||||
public ParametersNode(String taskType, String taskParams, String dependence, String switchResult) { |
||||
this.taskType = taskType; |
||||
this.taskParams = taskParams; |
||||
this.dependence = dependence; |
||||
this.switchResult = switchResult; |
||||
} |
||||
|
||||
public String getTaskType() { |
||||
return taskType; |
||||
} |
||||
|
||||
public void setTaskType(String taskType) { |
||||
this.taskType = taskType; |
||||
} |
||||
|
||||
public String getTaskParams() { |
||||
return taskParams; |
||||
} |
||||
|
||||
public void setTaskParams(String taskParams) { |
||||
this.taskParams = taskParams; |
||||
} |
||||
|
||||
public String getDependence() { |
||||
return dependence; |
||||
} |
||||
|
||||
public void setDependence(String dependence) { |
||||
this.dependence = dependence; |
||||
} |
||||
|
||||
public String getSwitchResult() { |
||||
return switchResult; |
||||
} |
||||
|
||||
public void setSwitchResult(String switchResult) { |
||||
this.switchResult = switchResult; |
||||
} |
||||
} |
@ -0,0 +1,31 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; |
||||
|
||||
public class ConditionsLogicTaskChannel extends AbstractLogicTaskChannel { |
||||
|
||||
@Override |
||||
public AbstractParameters parseParameters(String taskParams) { |
||||
return JSONUtils.parseObject(taskParams, ConditionsParameters.class); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,30 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; |
||||
|
||||
public class DependentLogicTaskChannel extends AbstractLogicTaskChannel { |
||||
|
||||
@Override |
||||
public AbstractParameters parseParameters(String taskParams) { |
||||
return JSONUtils.parseObject(taskParams, DependentParameters.class); |
||||
} |
||||
} |
@ -0,0 +1,38 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(TaskChannelFactory.class) |
||||
public class DependentLogicTaskChannelFactory implements TaskChannelFactory { |
||||
|
||||
public static final String NAME = "DEPENDENT"; |
||||
@Override |
||||
public String getName() { |
||||
return NAME; |
||||
} |
||||
|
||||
@Override |
||||
public TaskChannel create() { |
||||
return new DependentLogicTaskChannel(); |
||||
} |
||||
} |
@ -0,0 +1,30 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.DynamicParameters; |
||||
|
||||
public class DynamicLogicTaskChannel extends AbstractLogicTaskChannel { |
||||
|
||||
@Override |
||||
public AbstractParameters parseParameters(String taskParams) { |
||||
return JSONUtils.parseObject(taskParams, DynamicParameters.class); |
||||
} |
||||
} |
@ -0,0 +1,38 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(TaskChannelFactory.class) |
||||
public class DynamicLogicTaskChannelFactory implements TaskChannelFactory { |
||||
|
||||
public static final String NAME = "DYNAMIC"; |
||||
@Override |
||||
public String getName() { |
||||
return NAME; |
||||
} |
||||
|
||||
@Override |
||||
public TaskChannel create() { |
||||
return new DynamicLogicTaskChannel(); |
||||
} |
||||
} |
@ -0,0 +1,30 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters; |
||||
|
||||
public class SubWorkflowLogicTaskChannel extends AbstractLogicTaskChannel { |
||||
|
||||
@Override |
||||
public AbstractParameters parseParameters(String taskParams) { |
||||
return JSONUtils.parseObject(taskParams, SubProcessParameters.class); |
||||
} |
||||
} |
@ -0,0 +1,39 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(TaskChannelFactory.class) |
||||
public class SubWorkflowLogicTaskChannelFactory implements TaskChannelFactory { |
||||
|
||||
public static final String NAME = "SUB_PROCESS"; |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return NAME; |
||||
} |
||||
|
||||
@Override |
||||
public TaskChannel create() { |
||||
return new SubWorkflowLogicTaskChannel(); |
||||
} |
||||
} |
@ -0,0 +1,30 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; |
||||
|
||||
public class SwitchLogicTaskChannel extends AbstractLogicTaskChannel { |
||||
|
||||
@Override |
||||
public AbstractParameters parseParameters(String taskParams) { |
||||
return JSONUtils.parseObject(taskParams, SwitchParameters.class); |
||||
} |
||||
} |
@ -0,0 +1,39 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.task; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(TaskChannelFactory.class) |
||||
public class SwitchLogicTaskChannelFactory implements TaskChannelFactory { |
||||
|
||||
public static final String NAME = "SWITCH"; |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return NAME; |
||||
} |
||||
|
||||
@Override |
||||
public TaskChannel create() { |
||||
return new SwitchLogicTaskChannel(); |
||||
} |
||||
} |
@ -0,0 +1,55 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.plugin.task.api.utils; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.ILogicTaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; |
||||
import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory; |
||||
import org.apache.dolphinscheduler.plugin.task.api.task.DynamicLogicTaskChannelFactory; |
||||
import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory; |
||||
import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory; |
||||
|
||||
import lombok.experimental.UtilityClass; |
||||
|
||||
@UtilityClass |
||||
public class TaskTypeUtils { |
||||
|
||||
public boolean isSwitchTask(String taskType) { |
||||
return SwitchLogicTaskChannelFactory.NAME.equals(taskType); |
||||
} |
||||
|
||||
public boolean isConditionTask(String taskType) { |
||||
return ConditionsLogicTaskChannelFactory.NAME.equals(taskType); |
||||
} |
||||
|
||||
public boolean isSubWorkflowTask(String taskType) { |
||||
return SubWorkflowLogicTaskChannelFactory.NAME.equals(taskType); |
||||
} |
||||
|
||||
public boolean isDependentTask(String taskType) { |
||||
return SubWorkflowLogicTaskChannelFactory.NAME.equals(taskType); |
||||
} |
||||
|
||||
public boolean isDynamicTask(String taskType) { |
||||
return DynamicLogicTaskChannelFactory.NAME.equals(taskType); |
||||
} |
||||
|
||||
public boolean isLogicTask(String taskType) { |
||||
return TaskPluginManager.getTaskChannel(taskType) instanceof ILogicTaskChannel; |
||||
} |
||||
|
||||
} |
@ -1,50 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.plugin.task.api.utils; |
||||
|
||||
import java.util.Set; |
||||
|
||||
import lombok.experimental.UtilityClass; |
||||
|
||||
import com.google.common.collect.Sets; |
||||
|
||||
@UtilityClass |
||||
public class TaskUtils { |
||||
|
||||
private final String blockingLogicTask = "BLOCKING"; |
||||
private final String conditionLogicTask = "CONDITIONS"; |
||||
|
||||
private final String dependentLogicTask = "DEPENDENT"; |
||||
private final String subWorkflowLogicTask = "SUB_PROCESS"; |
||||
private final String switchLogicTask = "SWITCH"; |
||||
private final String dynamicLogicTask = "DYNAMIC"; |
||||
|
||||
// todo: Add to SPI
|
||||
private final Set<String> MASTER_TASK_TYPES = Sets.newHashSet( |
||||
blockingLogicTask, |
||||
conditionLogicTask, |
||||
dependentLogicTask, |
||||
subWorkflowLogicTask, |
||||
switchLogicTask, |
||||
dynamicLogicTask); |
||||
|
||||
// todo: add to task plugin spi
|
||||
public boolean isLogicTask(String taskType) { |
||||
return MASTER_TASK_TYPES.contains(taskType); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,44 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api; |
||||
|
||||
import static com.google.common.truth.Truth.assertThat; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory; |
||||
import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory; |
||||
import org.apache.dolphinscheduler.plugin.task.api.task.DynamicLogicTaskChannelFactory; |
||||
import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory; |
||||
import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory; |
||||
|
||||
import org.junit.jupiter.params.ParameterizedTest; |
||||
import org.junit.jupiter.params.provider.ValueSource; |
||||
|
||||
class TaskPluginManagerTest { |
||||
|
||||
@ParameterizedTest |
||||
@ValueSource(strings = { |
||||
ConditionsLogicTaskChannelFactory.NAME, |
||||
DependentLogicTaskChannelFactory.NAME, |
||||
DynamicLogicTaskChannelFactory.NAME, |
||||
SubWorkflowLogicTaskChannelFactory.NAME, |
||||
SwitchLogicTaskChannelFactory.NAME}) |
||||
void testGetTaskChannel_logicTaskChannel(String type) { |
||||
assertThat(TaskPluginManager.getTaskChannel(type)).isNotNull(); |
||||
} |
||||
|
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue