diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index fa75a75e2c..9388f036fc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -205,6 +205,7 @@ public class WorkflowExecuteThread implements Runnable { * @param processInstance processInstance * @param processService processService * @param nettyExecutorManager nettyExecutorManager + * @param taskTimeoutCheckList */ public WorkflowExecuteThread(ProcessInstance processInstance , ProcessService processService @@ -568,12 +569,10 @@ public class WorkflowExecuteThread implements Runnable { startDate = endDate; endDate = tmp; } - ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinition.getId()); + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); complementListDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedules)); - logger.info(" process definition id:{} complement data: {}", - processDefinition.getId(), complementListDate.toString()); + logger.info(" process definition code:{} complement data: {}", + processInstance.getProcessDefinitionCode(), complementListDate.toString()); } } @@ -998,6 +997,7 @@ public class WorkflowExecuteThread implements Runnable { /** * generate the latest process instance status by the tasks state * + * @param instance * @return process instance execution status */ private ExecutionStatus getProcessInstanceState(ProcessInstance instance) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index cb04b16514..853a8cc7dd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -35,8 +35,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; -import org.apache.logging.log4j.util.Strings; - import java.util.Date; import org.slf4j.Logger; @@ -92,8 +90,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor { /** * common task cannot be paused - * - * @return */ @Override protected boolean pauseTask() { @@ -151,7 +147,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { if (taskInstance.getState().typeIsFinished()) { return true; } - if (Strings.isBlank(taskInstance.getHost())) { + if (null == taskInstance.getHost() || taskInstance.getHost().isEmpty()) { taskInstance.setState(ExecutionStatus.KILL); taskInstance.setEndTime(new Date()); processService.updateTaskInstance(taskInstance); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/ConditionsTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/ConditionsTaskExecThread.java deleted file mode 100644 index 765874f1ff..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/ConditionsTaskExecThread.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.DependResult; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.model.DependentItem; -import org.apache.dolphinscheduler.common.model.DependentTaskModel; -import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; -import org.apache.dolphinscheduler.common.utils.DependentUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.LogUtils; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.LoggerFactory; - -public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { - - /** - * dependent parameters - */ - private DependentParameters dependentParameters; - - /** - * complete task map - */ - private Map completeTaskList = new ConcurrentHashMap<>(); - - /** - * condition result - */ - private DependResult conditionResult; - - /** - * constructor of MasterBaseTaskExecThread - * - * @param taskInstance task instance - */ - public ConditionsTaskExecThread(TaskInstance taskInstance) { - super(taskInstance); - taskInstance.setStartTime(new Date()); - } - - @Override - public Boolean submitWaitComplete() { - try { - this.taskInstance = submit(); - logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion(), - taskInstance.getProcessInstanceId(), - taskInstance.getId())); - String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); - Thread.currentThread().setName(threadLoggerInfoName); - initTaskParameters(); - logger.info("dependent task start"); - waitTaskQuit(); - updateTaskState(); - } catch (Exception e) { - logger.error("conditions task run exception", e); - } - return true; - } - - private void waitTaskQuit() { - List taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId()); - for (TaskInstance task : taskInstances) { - completeTaskList.putIfAbsent(task.getName(), task.getState()); - } - - List modelResultList = new ArrayList<>(); - for (DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()) { - List itemDependResult = new ArrayList<>(); - for (DependentItem item : dependentTaskModel.getDependItemList()) { - itemDependResult.add(getDependResultForItem(item)); - } - DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult); - modelResultList.add(modelResult); - } - conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), modelResultList); - logger.info("the conditions task depend result : {}", conditionResult); - } - - /** - * - */ - private void updateTaskState() { - ExecutionStatus status; - if (this.cancel) { - status = ExecutionStatus.KILL; - } else { - status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; - } - taskInstance.setState(status); - taskInstance.setEndTime(new Date()); - processService.updateTaskInstance(taskInstance); - } - - private void initTaskParameters() { - taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion(), - taskInstance.getProcessInstanceId(), - taskInstance.getId())); - this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - taskInstance.setStartTime(new Date()); - this.processService.saveTaskInstance(taskInstance); - this.dependentParameters = taskInstance.getDependency(); - } - - /** - * depend result for depend item - */ - private DependResult getDependResultForItem(DependentItem item) { - - DependResult dependResult = DependResult.SUCCESS; - if (!completeTaskList.containsKey(item.getDepTasks())) { - logger.info("depend item: {} have not completed yet.", item.getDepTasks()); - dependResult = DependResult.FAILED; - return dependResult; - } - ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks()); - if (executionStatus != item.getStatus()) { - logger.info("depend item : {} expect status: {}, actual status: {}", item.getDepTasks(), item.getStatus(), executionStatus); - dependResult = DependResult.FAILED; - } - logger.info("dependent item complete {} {},{}", - Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult); - return dependResult; - } - -} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 49f9637578..1b4d3bfd4b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -123,7 +123,7 @@ public class WorkflowExecuteThreadTest { @Test public void testParallelWithOutSchedule() throws ParseException { try { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionId)).thenReturn(zeroSchedulerList()); Method method = WorkflowExecuteThread.class.getDeclaredMethod("executeComplementProcess"); method.setAccessible(true); method.invoke(workflowExecuteThread); @@ -141,7 +141,7 @@ public class WorkflowExecuteThreadTest { @Test public void testParallelWithSchedule() { try { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionId)).thenReturn(oneSchedulerList()); Method method = WorkflowExecuteThread.class.getDeclaredMethod("executeComplementProcess"); method.setAccessible(true); method.invoke(workflowExecuteThread); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 96b1796c46..f307b7d8ca 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.spi.utils.CollectionUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; import java.io.File; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index 2cd19d7cb5..cf9285ec24 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -28,8 +28,6 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections.MapUtils; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java index 2db0fa544b..4f29a3117d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.spi.utils.DateUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections.MapUtils; import org.apache.commons.io.Charsets; import org.apache.http.HttpEntity; import org.apache.http.ParseException; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java index 2e2552febd..6e4a251e4d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java @@ -28,8 +28,6 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.collections.MapUtils; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannelFactory.java new file mode 100644 index 0000000000..2530feb37d --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannelFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.procedure; + +import org.apache.dolphinscheduler.spi.params.base.PluginParams; +import org.apache.dolphinscheduler.spi.task.TaskChannel; +import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; + +import java.util.List; + +public class ProcedureTaskChannelFactory implements TaskChannelFactory { + + @Override + public String getName() { + return "PROCEDURE"; + } + + @Override + public List getParams() { + return null; + } + + @Override + public TaskChannel create() { + return new ProcedureTaskChannel(); + } +}