diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 5d5b82edc0..b58a97f831 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -26,14 +26,12 @@ /dolphinscheduler-dao/src/main/resources/sql/ @zhongjiajie /dolphinscheduler-common/ @caishunfeng /dolphinscheduler-standalone-server/ @kezhenxu94 @caishunfeng -/dolphinscheduler-log-server/ @caishunfeng /dolphinscheduler-datasource-plugin/ @caishunfeng /dolphinscheduler-dist/ @kezhenxu94 @caishunfeng /dolphinscheduler-meter/ @caishunfeng @kezhenxu94 @ruanwenjun @EricGao888 /dolphinscheduler-scheduler-plugin/ @caishunfeng /dolphinscheduler-master/ @caishunfeng @SbloodyS @ruanwenjun /dolphinscheduler-worker/ @caishunfeng @SbloodyS @ruanwenjun -/dolphinscheduler-server/ @caishunfeng /dolphinscheduler-service/ @caishunfeng /dolphinscheduler-remote/ @caishunfeng /dolphinscheduler-spi/ @caishunfeng diff --git a/.github/actions/labeler/labeler.yml b/.github/actions/labeler/labeler.yml index 501579bb70..4bb724fed2 100644 --- a/.github/actions/labeler/labeler.yml +++ b/.github/actions/labeler/labeler.yml @@ -26,12 +26,10 @@ backend: - 'dolphinscheduler-data-quality/**/*' - 'dolphinscheduler-datasource-plugin/**/*' - 'dolphinscheduler-dist/**/*' - - 'dolphinscheduler-log-server/**/*' - 'dolphinscheduler-master/**/*' - 'dolphinscheduler-registry/**/*' - 'dolphinscheduler-remote/**/*' - 'dolphinscheduler-scheduler-plugin/**/*' - - 'dolphinscheduler-server/**/*' - 'dolphinscheduler-service/**/*' - 'dolphinscheduler-spi/**/*' - 'dolphinscheduler-standalone-server/**/*' diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 9abb4b38e1..5267c3355b 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -30,7 +30,6 @@ on: - 'dolphinscheduler-common/**' - 'dolphinscheduler-dao/**' - 'dolphinscheduler-rpc/**' - - 'dolphinscheduler-server/**' pull_request: concurrency: diff --git a/.gitignore b/.gitignore index a553ba02af..1082e4b155 100644 --- a/.gitignore +++ b/.gitignore @@ -43,7 +43,6 @@ dolphinscheduler-dao/src/main/resources/dao/data_source.properties dolphinscheduler-alert/logs/ dolphinscheduler-alert/src/main/resources/alert.properties_bak dolphinscheduler-alert/src/main/resources/logback.xml -dolphinscheduler-server/src/main/resources/logback.xml dolphinscheduler-ui/dist dolphinscheduler-ui/node dolphinscheduler-common/sql diff --git a/docs/docs/en/about/glossary.md b/docs/docs/en/about/glossary.md index dc3df7bb5c..c200aee994 100644 --- a/docs/docs/en/about/glossary.md +++ b/docs/docs/en/about/glossary.md @@ -49,6 +49,10 @@ process fails and ends ### 2.Module introduction +- dolphinscheduler-master master module, provides workflow management and orchestration. + +- dolphinscheduler-worker worker module, provides task execution management. + - dolphinscheduler-alert alarm module, providing AlertServer service. - dolphinscheduler-api web application module, providing ApiServer service. @@ -59,8 +63,6 @@ process fails and ends - dolphinscheduler-remote client and server based on netty -- dolphinscheduler-server MasterServer and WorkerServer services - - dolphinscheduler-service service module, including Quartz, Zookeeper, log client access service, easy to call server module and api module diff --git a/docs/docs/en/architecture/design.md b/docs/docs/en/architecture/design.md index 9579ab3651..b7cc733ace 100644 --- a/docs/docs/en/architecture/design.md +++ b/docs/docs/en/architecture/design.md @@ -197,10 +197,10 @@ In the early schedule design, if there is no priority design and use the fair sc - For details, please refer to the logback configuration of Master and Worker, as shown in the following example: ```xml - + - - + + taskAppId ${log.base} diff --git a/docs/docs/zh/about/glossary.md b/docs/docs/zh/about/glossary.md index 7642a4a4c1..0a29c1dd23 100644 --- a/docs/docs/zh/about/glossary.md +++ b/docs/docs/zh/about/glossary.md @@ -34,6 +34,10 @@ ### 2.模块介绍 +- dolphinscheduler-master master模块,提供工作流管理和编排服务。 + +- dolphinscheduler-worker worker模块,提供任务执行管理服务。 + - dolphinscheduler-alert 告警模块,提供 AlertServer 服务。 - dolphinscheduler-api web应用模块,提供 ApiServer 服务。 @@ -44,8 +48,6 @@ - dolphinscheduler-remote 基于 netty 的客户端、服务端 -- dolphinscheduler-server MasterServer 和 WorkerServer 服务 - - dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用 - dolphinscheduler-ui 前端模块 diff --git a/docs/docs/zh/architecture/design.md b/docs/docs/zh/architecture/design.md index f3368a7609..0726a79ad9 100644 --- a/docs/docs/zh/architecture/design.md +++ b/docs/docs/zh/architecture/design.md @@ -195,10 +195,10 @@ - 详情可参考Master和Worker的logback配置,如下示例: ```xml - + - - + + taskAppId ${log.base} diff --git a/dolphinscheduler-dist/pom.xml b/dolphinscheduler-dist/pom.xml index 22316a261a..b202cdd281 100644 --- a/dolphinscheduler-dist/pom.xml +++ b/dolphinscheduler-dist/pom.xml @@ -43,10 +43,6 @@ - - org.apache.dolphinscheduler - dolphinscheduler-server - org.apache.dolphinscheduler diff --git a/dolphinscheduler-log-server/pom.xml b/dolphinscheduler-log-server/pom.xml deleted file mode 100644 index 26244a4e1a..0000000000 --- a/dolphinscheduler-log-server/pom.xml +++ /dev/null @@ -1,64 +0,0 @@ - - - - - - dolphinscheduler - org.apache.dolphinscheduler - dev-SNAPSHOT - - 4.0.0 - - dolphinscheduler-log-server - - - - org.apache.dolphinscheduler - dolphinscheduler-service - - - org.apache.dolphinscheduler - dolphinscheduler-server - - - org.apache.dolphinscheduler - dolphinscheduler-spi - - - - org.mockito - mockito-core - test - - - - - - - org.apache.dolphinscheduler - dolphinscheduler-bom - ${project.version} - pom - import - - - - - diff --git a/dolphinscheduler-master/pom.xml b/dolphinscheduler-master/pom.xml index 03084f91de..66291e6a78 100644 --- a/dolphinscheduler-master/pom.xml +++ b/dolphinscheduler-master/pom.xml @@ -54,10 +54,6 @@ org.apache.dolphinscheduler dolphinscheduler-service - - org.apache.dolphinscheduler - dolphinscheduler-server - org.apache.dolphinscheduler dolphinscheduler-registry-all @@ -267,10 +263,6 @@ test - - org.apache.dolphinscheduler - dolphinscheduler-log-server - org.springframework.cloud diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java index 7bdebe1ee0..8036c2d6c8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.rpc; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; -import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.CacheProcessor; import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor; @@ -31,8 +30,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteStartProce import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor; - -import javax.annotation.PostConstruct; +import org.apache.dolphinscheduler.service.log.LoggerRequestProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +94,8 @@ public class MasterRPCServer implements AutoCloseable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, + workflowExecutingDataRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor); // logger server diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java index 508a2b0d14..f9371a255a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java @@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; -import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.service.utils.LogUtils; import java.util.ArrayList; import java.util.Date; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index e22d0d32bf..f04e950223 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -19,16 +19,16 @@ package org.apache.dolphinscheduler.server.master.runner.task; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; -import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.service.utils.LogUtils; import java.util.ArrayList; import java.util.Date; @@ -144,8 +144,8 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { private void setConditionResult() { - List taskInstances - = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag()); + List taskInstances = processService + .findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag()); for (TaskInstance task : taskInstances) { completeTaskList.putIfAbsent(task.getTaskCode(), task.getState()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 972abcb532..54e3192cc4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -17,7 +17,8 @@ package org.apache.dolphinscheduler.server.master.runner.task; -import com.google.auto.service.AutoService; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -34,8 +35,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; import org.apache.dolphinscheduler.server.master.utils.DependentExecute; -import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.utils.LogUtils; import java.util.ArrayList; import java.util.Date; @@ -47,7 +48,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; +import com.google.auto.service.AutoService; /** * dependent task processor @@ -57,9 +58,11 @@ public class DependentTaskProcessor extends BaseTaskProcessor { private DependentParameters dependentParameters; - private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class); + private final ProcessDefinitionMapper processDefinitionMapper = + SpringApplicationContext.getBean(ProcessDefinitionMapper.class); - private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class); + private final TaskDefinitionMapper taskDefinitionMapper = + SpringApplicationContext.getBean(TaskDefinitionMapper.class); private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class); @@ -180,9 +183,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor { taskDefinitionCodes.add(dependentItem.getDepTaskCode()); }); }); - projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity())); - processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity())); - taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); + projectCodeMap = projectMapper.queryByCodes(projectCodes).stream() + .collect(Collectors.toMap(Project::getCode, Function.identity())); + processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream() + .collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity())); + taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream() + .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) { logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation()); @@ -190,25 +196,31 @@ public class DependentTaskProcessor extends BaseTaskProcessor { Project project = projectCodeMap.get(dependentItem.getProjectCode()); if (project == null) { logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem); - throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem); + throw new RuntimeException( + "The dependent task's project is not exist, dependentItem: " + dependentItem); } ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode()); if (processDefinition == null) { logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem); - throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem); + throw new RuntimeException( + "The dependent task's workflow is not exist, dependentItem: " + dependentItem); } if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) { - logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}", + logger.info( + "Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}", project.getName(), processDefinition.getName(), dependentItem.getKey()); } else { TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode()); if (taskDefinition == null) { - logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem); - throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem); + logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", + dependentItem); + throw new RuntimeException( + "The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem); } logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}", - project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey()); + project.getName(), processDefinition.getName(), taskDefinition.getName(), + dependentItem.getKey()); } } this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation())); @@ -243,7 +255,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor { if (!dependResultMap.containsKey(entry.getKey())) { dependResultMap.put(entry.getKey(), entry.getValue()); // save depend result to log - logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate); + logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", + entry.getKey(), entry.getValue(), dependentDate); } } if (!dependentExecute.finish(dependentDate, testFlag)) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 98d4710827..43cf206e05 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.utils.LogUtils; import org.apache.commons.lang3.StringUtils; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index fc5bce804d..de571cb657 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -17,9 +17,8 @@ package org.apache.dolphinscheduler.server.master.runner.task; -import com.google.auto.service.AutoService; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; + import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -29,7 +28,10 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; import org.apache.dolphinscheduler.server.master.utils.SwitchTaskUtils; -import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.service.utils.LogUtils; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import java.util.Date; import java.util.HashMap; @@ -39,7 +41,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; +import com.google.auto.service.AutoService; /** * switch task processor diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index 414a417fc9..cf50ceec13 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -36,10 +36,10 @@ import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutor import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; -import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import org.apache.dolphinscheduler.service.utils.ProcessUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.commons.collections4.CollectionUtils; @@ -163,7 +163,8 @@ public class MasterFailoverService { processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); int processInstanceId = processInstance.getId(); - List taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); + List taskInstanceList = + processService.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); for (TaskInstance taskInstance : taskInstanceList) { try { LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index a294874ff4..c7ea9c13db 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -17,10 +17,6 @@ package org.apache.dolphinscheduler.server.master.service; -import lombok.NonNull; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.StopWatch; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.NodeType; @@ -39,15 +35,15 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; -import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; +import org.apache.dolphinscheduler.service.utils.ProcessUtils; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; -import javax.annotation.Nullable; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -56,6 +52,14 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.annotation.Nullable; + +import lombok.NonNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + @Service public class WorkerFailoverService { diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index 361b03b309..c18f621c51 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -28,10 +28,10 @@ + converterClass="org.apache.dolphinscheduler.service.log.SensitiveDataConverter"/> - - + + taskAppId ${log.base} diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml deleted file mode 100644 index 5a0ee9972e..0000000000 --- a/dolphinscheduler-server/pom.xml +++ /dev/null @@ -1,231 +0,0 @@ - - - - 4.0.0 - - org.apache.dolphinscheduler - dolphinscheduler - dev-SNAPSHOT - - dolphinscheduler-server - jar - dolphinscheduler-server - - - - - org.apache.dolphinscheduler - dolphinscheduler-bom - ${project.version} - pom - import - - - - - - - - org.apache.dolphinscheduler - dolphinscheduler-common - - - org.apache.dolphinscheduler - dolphinscheduler-service - - - org.apache.commons - commons-lang3 - - - org.mockito - mockito-core - test - - - org.mockito - mockito-inline - 3.12.4 - - test - - - org.apache.hadoop - hadoop-common - test - - - org.slf4j - slf4j-log4j12 - - - jdk.tools - jdk.tools - - - javax.servlet - servlet-api - - - javax.servlet - servlet-api - - - log4j - log4j - - - org.apache.curator - curator-client - - - - commons-configuration - commons-configuration - - - io.grpc - grpc-protobuf - - - io.netty - netty - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - com.google.protobuf - jackson-mapper-asl - - - com.google.code.gson - gson - - - xmlenc - xmlenc - - - commons-net - commons-net - - - org.apache.avro - avro - - - org.apache.zookeeper - zookeeper - - - javax.servlet.jsp - jsp-api - - - com.sun.jersey - jersey-json - - - com.sun.jersey - jersey-server - - - com.sun.jersey - jersey-core - - - - - org.apache.hbase.thirdparty - hbase-noop-htrace - - - org.apache.hadoop - hadoop-client - test - - - org.slf4j - slf4j-log4j12 - - - javax.servlet - servlet-api - - - org.codehaus.jackson - jackson-jaxrs - - - org.codehaus.jackson - jackson-xc - - - - org.fusesource.leveldbjni - leveldbjni-all - - - org.apache.zookeeper - zookeeper - - - org.apache.hadoop - hadoop-mapreduce-client-shuffle - - - com.sun.jersey - jersey-client - - - com.sun.jersey - jersey-core - - - javax.xml.bind - jaxb-api - - - log4j - log4j - - - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - config/ - - - - - - diff --git a/dolphinscheduler-server/src/test/resources/master.properties b/dolphinscheduler-server/src/test/resources/master.properties deleted file mode 100644 index 91d84ebd10..0000000000 --- a/dolphinscheduler-server/src/test/resources/master.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# master execute thread num -#master.exec.threads=100 - -# master execute task number in parallel -#master.exec.task.num=20 - -# master dispatch task number -master.dispatch.task.num=6 - -# master heartbeat interval -#master.heartbeat.interval=10 - -# master commit task retry times -#master.task.commit.retryTimes=5 - -# master commit task interval -#master.task.commit.interval=1000 - -# only less than cpu avg load, master server can work. default value -1 : the number of cpu cores * 2 -#master.max.cpuload.avg=-1 - -# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G. -#master.reserved.memory=0.3 - -# master listen port -#master.listen.port=5678 diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml index b8d89c0a6a..424b25324a 100644 --- a/dolphinscheduler-service/pom.xml +++ b/dolphinscheduler-service/pom.xml @@ -63,6 +63,12 @@ dolphinscheduler-task-api + + org.mockito + mockito-core + test + + org.mockito mockito-inline @@ -82,5 +88,153 @@ provided + + org.apache.hadoop + hadoop-common + test + + + org.slf4j + slf4j-log4j12 + + + jdk.tools + jdk.tools + + + javax.servlet + servlet-api + + + javax.servlet + servlet-api + + + log4j + log4j + + + org.apache.curator + curator-client + + + + commons-configuration + commons-configuration + + + io.grpc + grpc-protobuf + + + io.netty + netty + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.google.protobuf + jackson-mapper-asl + + + com.google.code.gson + gson + + + xmlenc + xmlenc + + + commons-net + commons-net + + + org.apache.avro + avro + + + org.apache.zookeeper + zookeeper + + + javax.servlet.jsp + jsp-api + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + com.sun.jersey + jersey-core + + + + + org.apache.hbase.thirdparty + hbase-noop-htrace + + + org.apache.hadoop + hadoop-client + test + + + org.slf4j + slf4j-log4j12 + + + javax.servlet + servlet-api + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-xc + + + + org.fusesource.leveldbjni + leveldbjni-all + + + org.apache.zookeeper + zookeeper + + + org.apache.hadoop + hadoop-mapreduce-client-shuffle + + + com.sun.jersey + jersey-client + + + com.sun.jersey + jersey-core + + + javax.xml.bind + jaxb-api + + + log4j + log4j + + + diff --git a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessor.java similarity index 99% rename from dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessor.java index 0d04ed8596..6de75255a7 100644 --- a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessor.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/MasterLogFilter.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/MasterLogFilter.java similarity index 93% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/MasterLogFilter.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/MasterLogFilter.java index 575571d9ac..a9d352de11 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/MasterLogFilter.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/MasterLogFilter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -25,6 +25,7 @@ import ch.qos.logback.core.spi.FilterReply; * master log filter */ public class MasterLogFilter extends Filter { + /** * log level */ @@ -37,7 +38,7 @@ public class MasterLogFilter extends Filter { */ @Override public FilterReply decide(ILoggingEvent event) { - if (event.getThreadName().startsWith("Master-") ){ + if (event.getThreadName().startsWith("Master-")) { return FilterReply.ACCEPT; } return FilterReply.DENY; @@ -46,4 +47,4 @@ public class MasterLogFilter extends Filter { public void setLevel(String level) { this.level = Level.toLevel(level); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/SensitiveDataConverter.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/SensitiveDataConverter.java similarity index 98% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/SensitiveDataConverter.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/SensitiveDataConverter.java index 67e471f9c7..9e38d9ef7d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/SensitiveDataConverter.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/SensitiveDataConverter.java @@ -15,16 +15,18 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.common.Constants; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.base.Strings; + import ch.qos.logback.classic.pattern.MessageConverter; import ch.qos.logback.classic.spi.ILoggingEvent; +import com.google.common.base.Strings; + /** * sensitive data log converter */ @@ -35,7 +37,6 @@ public class SensitiveDataConverter extends MessageConverter { */ private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX); - @Override public String convert(ILoggingEvent event) { @@ -85,5 +86,4 @@ public class SensitiveDataConverter extends MessageConverter { return sb.toString(); } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminator.java similarity index 90% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminator.java index 595dcd92d5..88bca53fd9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminator.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -52,7 +52,8 @@ public class TaskLogDiscriminator extends AbstractDiscriminator { if (event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) { String threadName = event.getThreadName(); if (threadName.endsWith(TaskConstants.GET_OUTPUT_LOG_SERVICE)) { - threadName = threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length()); + threadName = + threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length()); } String part1 = threadName.split(Constants.EQUAL_SIGN)[1]; String prefix = TaskConstants.TASK_LOGGER_INFO_PREFIX + "-"; @@ -60,7 +61,8 @@ public class TaskLogDiscriminator extends AbstractDiscriminator { key = part1.substring(prefix.length()).replaceFirst("-", "/"); } } - logger.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(), event.getLoggerName()); + logger.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(), + event.getLoggerName()); return key; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogFilter.java similarity index 92% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogFilter.java index f243faba08..6f7a920e79 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogFilter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -57,7 +57,8 @@ public class TaskLogFilter extends Filter { || event.getLevel().isGreaterOrEqual(level)) { filterReply = FilterReply.ACCEPT; } - logger.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(), event.getLoggerName(), filterReply.name(), level); + logger.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(), + event.getLoggerName(), filterReply.name(), level); return filterReply; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/WorkerLogFilter.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/WorkerLogFilter.java similarity index 92% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/WorkerLogFilter.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/WorkerLogFilter.java index 1a75e594cf..a1f4733a72 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/WorkerLogFilter.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/WorkerLogFilter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -25,6 +25,7 @@ import ch.qos.logback.core.spi.FilterReply; * worker log filter */ public class WorkerLogFilter extends Filter { + /** * level */ @@ -37,7 +38,7 @@ public class WorkerLogFilter extends Filter { */ @Override public FilterReply decide(ILoggingEvent event) { - if (event.getThreadName().startsWith("Worker-")){ + if (event.getThreadName().startsWith("Worker-")) { return FilterReply.ACCEPT; } @@ -46,4 +47,4 @@ public class WorkerLogFilter extends Filter { public void setLevel(String level) { this.level = Level.toLevel(level); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java similarity index 60% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java index aeb3648016..116051f145 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.service.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; +import org.apache.dolphinscheduler.service.log.TaskLogDiscriminator; import java.nio.file.Path; import java.nio.file.Paths; @@ -44,39 +44,40 @@ public class LogUtils { /** * get task log path */ - public static String getTaskLogPath(Date firstSubmitTime, Long processDefineCode, int processDefineVersion, int processInstanceId, int taskInstanceId) { + public static String getTaskLogPath(Date firstSubmitTime, Long processDefineCode, int processDefineVersion, + int processInstanceId, int taskInstanceId) { // format /logs/YYYYMMDD/defintion-code_defintion_version-processInstanceId-taskInstanceId.log final String taskLogFileName = new StringBuilder(String.valueOf(processDefineCode)) - .append(Constants.UNDERLINE) - .append(processDefineVersion) - .append(Constants.SUBTRACT_CHAR) - .append(processInstanceId) - .append(Constants.SUBTRACT_CHAR) - .append(taskInstanceId) - .append(LOG_TAILFIX) - .toString(); + .append(Constants.UNDERLINE) + .append(processDefineVersion) + .append(Constants.SUBTRACT_CHAR) + .append(processInstanceId) + .append(Constants.SUBTRACT_CHAR) + .append(taskInstanceId) + .append(LOG_TAILFIX) + .toString(); // Optional.map will be skipped if null return Optional.of(LoggerFactory.getILoggerFactory()) - .map(e -> (AppenderAttachable) (e.getLogger("ROOT"))) - .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE"))) - .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator()))) - .map(TaskLogDiscriminator::getLogBase) - .map(e -> Paths.get(e) - .toAbsolutePath() - .resolve(DateUtils.format(firstSubmitTime,Constants.YYYYMMDD, null)) - .resolve(taskLogFileName)) - .map(Path::toString) - .orElse(""); + .map(e -> (AppenderAttachable) (e.getLogger("ROOT"))) + .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE"))) + .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator()))) + .map(TaskLogDiscriminator::getLogBase) + .map(e -> Paths.get(e) + .toAbsolutePath() + .resolve(DateUtils.format(firstSubmitTime, Constants.YYYYMMDD, null)) + .resolve(taskLogFileName)) + .map(Path::toString) + .orElse(""); } /** * get task log path by TaskExecutionContext */ public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) { - return getTaskLogPath(taskExecutionContext.getFirstSubmitTime(),taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); + return getTaskLogPath(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java similarity index 99% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java index 35794cf3d0..e1d6a552a7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.service.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CommonUtils; diff --git a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java similarity index 72% rename from dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java index fa095231f8..ad4b7e82e1 100644 --- a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -23,26 +23,37 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.MockedStatic; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.junit.MockitoJUnitRunner; import io.netty.channel.Channel; -@RunWith(PowerMockRunner.class) -@PrepareForTest({LoggerUtils.class}) +@RunWith(MockitoJUnitRunner.class) public class LoggerRequestProcessorTest { + private MockedStatic mockedStaticLoggerUtils; + + @Before + public void setUp() { + mockedStaticLoggerUtils = Mockito.mockStatic(LoggerUtils.class); + } + + @After + public void after() { + mockedStaticLoggerUtils.close(); + } + @Test public void testProcessViewWholeLogRequest() { System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir")); - Channel channel = PowerMockito.mock(Channel.class); - PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null); - PowerMockito.mockStatic(LoggerUtils.class); - PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + Channel channel = Mockito.mock(Channel.class); + Mockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null); + Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); String userDir = System.getProperty("user.dir"); ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/path/a.log"); @@ -57,10 +68,8 @@ public class LoggerRequestProcessorTest { @Test(expected = IllegalArgumentException.class) public void testProcessViewWholeLogRequestError() { System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir")); - Channel channel = PowerMockito.mock(Channel.class); - PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null); - PowerMockito.mockStatic(LoggerUtils.class); - PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + Channel channel = Mockito.mock(Channel.class); + Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); String userDir = System.getProperty("user.dir"); ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/path/a"); @@ -75,10 +84,8 @@ public class LoggerRequestProcessorTest { @Test(expected = IllegalArgumentException.class) public void testProcessViewWholeLogRequestErrorRelativePath() { System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir")); - Channel channel = PowerMockito.mock(Channel.class); - PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null); - PowerMockito.mockStatic(LoggerUtils.class); - PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + Channel channel = Mockito.mock(Channel.class); + Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); String userDir = System.getProperty("user.dir"); ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/../../a.log"); @@ -93,10 +100,8 @@ public class LoggerRequestProcessorTest { @Test(expected = IllegalArgumentException.class) public void testProcessViewWholeLogRequestErrorStartWith() { System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir")); - Channel channel = PowerMockito.mock(Channel.class); - PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null); - PowerMockito.mockStatic(LoggerUtils.class); - PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + Channel channel = Mockito.mock(Channel.class); + Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand("/log/a.log"); Command command = new Command(); diff --git a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/MasterLogFilterTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/MasterLogFilterTest.java similarity index 97% rename from dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/MasterLogFilterTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/MasterLogFilterTest.java index a77a1002be..13fe5f0d39 100644 --- a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/MasterLogFilterTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/MasterLogFilterTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.common.Constants; diff --git a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/SensitiveDataConverterTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/SensitiveDataConverterTest.java similarity index 94% rename from dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/SensitiveDataConverterTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/SensitiveDataConverterTest.java index cd573cc51a..36b3b5e369 100644 --- a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/SensitiveDataConverterTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/SensitiveDataConverterTest.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; -import static org.apache.dolphinscheduler.server.log.SensitiveDataConverter.passwordHandler; +import static org.apache.dolphinscheduler.service.log.SensitiveDataConverter.passwordHandler; import org.apache.dolphinscheduler.common.Constants; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java similarity index 98% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java index 3931bac99e..a2c3b6aa7e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; diff --git a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java similarity index 97% rename from dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java index a02b1acc41..fe99768031 100644 --- a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; diff --git a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/WorkerLogFilterTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/WorkerLogFilterTest.java similarity index 97% rename from dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/WorkerLogFilterTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/WorkerLogFilterTest.java index 9af6954e1b..c7110d2501 100644 --- a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/WorkerLogFilterTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/WorkerLogFilterTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.log; +package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.common.Constants; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java similarity index 96% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java index b6adb7ba48..8c9d2affe8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.service.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; +import org.apache.dolphinscheduler.service.log.TaskLogDiscriminator; import java.nio.file.Path; import java.nio.file.Paths; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ProcessUtilsTest.java similarity index 98% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ProcessUtilsTest.java index 7664ed7f05..a1c7b7e233 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ProcessUtilsTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.service.utils; import static org.mockito.ArgumentMatchers.anyString; diff --git a/dolphinscheduler-standalone-server/pom.xml b/dolphinscheduler-standalone-server/pom.xml index 7bc450fd8a..36b73f9104 100644 --- a/dolphinscheduler-standalone-server/pom.xml +++ b/dolphinscheduler-standalone-server/pom.xml @@ -56,11 +56,6 @@ dolphinscheduler-alert-server - - org.apache.dolphinscheduler - dolphinscheduler-log-server - - com.h2database h2 diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index b411d3fd25..551225eb1a 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -51,10 +51,10 @@ + converterClass="org.apache.dolphinscheduler.service.log.SensitiveDataConverter"/> - - + + taskAppId ${log.base} diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml index e558deb9c7..74590602ff 100644 --- a/dolphinscheduler-worker/pom.xml +++ b/dolphinscheduler-worker/pom.xml @@ -44,15 +44,15 @@ org.apache.dolphinscheduler - dolphinscheduler-spi + dolphinscheduler-service org.apache.dolphinscheduler - dolphinscheduler-common + dolphinscheduler-spi org.apache.dolphinscheduler - dolphinscheduler-server + dolphinscheduler-common org.apache.dolphinscheduler @@ -100,10 +100,6 @@ spring-boot-starter-test test - - org.apache.dolphinscheduler - dolphinscheduler-log-server - org.springframework.cloud diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java index 288a5d9b33..85f703d25c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java @@ -17,10 +17,6 @@ package org.apache.dolphinscheduler.server.worker.processor; -import com.google.common.base.Preconditions; -import io.micrometer.core.annotation.Counted; -import io.micrometer.core.annotation.Timed; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -32,7 +28,6 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; @@ -41,11 +36,19 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; +import org.apache.dolphinscheduler.service.utils.LogUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import com.google.common.base.Preconditions; + +import io.micrometer.core.annotation.Counted; +import io.micrometer.core.annotation.Timed; +import io.netty.channel.Channel; + /** * Used to handle {@link CommandType#TASK_DISPATCH_REQUEST} */ @@ -104,7 +107,8 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { return; } try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType()); // set cache, it will be used when kill task TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); @@ -112,14 +116,17 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); // delay task process - long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L); + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L); if (remainTime > 0) { logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); - workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT); + workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, + CommandType.TASK_EXECUTE_RESULT); } - WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory( + WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder + .createWorkerDelayTaskExecuteRunnableFactory( taskExecutionContext, workerConfig, workflowMasterAddress, @@ -133,8 +140,9 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { if (!offer) { logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT); - } else + } else { logger.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize()); + } } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index ed351c3b86..3ebf497cda 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -17,14 +17,6 @@ package org.apache.dolphinscheduler.server.worker.processor; -import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -import io.micrometer.core.lang.NonNull; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; @@ -40,19 +32,29 @@ import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; import org.apache.dolphinscheduler.service.log.LogClient; +import org.apache.dolphinscheduler.service.utils.ProcessUtils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +import io.micrometer.core.lang.NonNull; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; /** * task kill processor @@ -80,7 +82,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), - String.format("invalid command type : %s", command.getType())); + String.format("invalid command type : %s", command.getType())); TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class); if (killCommand == null) { logger.error("task kill request command is null"); @@ -98,25 +100,25 @@ public class TaskKillProcessor implements NettyRequestProcessor { return; } - int processId = taskExecutionContext.getProcessId(); - if (processId == 0) { - this.cancelApplication(taskInstanceId); - workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); - taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); - sendTaskKillResponseCommand(channel, taskExecutionContext); - logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); - return; - } + int processId = taskExecutionContext.getProcessId(); + if (processId == 0) { + this.cancelApplication(taskInstanceId); + workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); + sendTaskKillResponseCommand(channel, taskExecutionContext); + logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); + return; + } // if processId > 0, it should call cancelApplication to cancel remote application too. this.cancelApplication(taskInstanceId); Pair> result = doKill(taskExecutionContext); - taskExecutionContext.setCurrentExecutionStatus( - result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); - taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight())); - sendTaskKillResponseCommand(channel, taskExecutionContext); + taskExecutionContext.setCurrentExecutionStatus( + result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); + taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight())); + sendTaskKillResponseCommand(channel, taskExecutionContext); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId()); @@ -131,7 +133,8 @@ public class TaskKillProcessor implements NettyRequestProcessor { TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus()); if (taskExecutionContext.getAppIds() != null) { - taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA))); + taskKillResponseCommand + .setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA))); } taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskKillResponseCommand.setHost(taskExecutionContext.getHost()); @@ -158,9 +161,9 @@ public class TaskKillProcessor implements NettyRequestProcessor { // find log and kill yarn job Pair> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()), - taskExecutionContext.getLogPath(), - taskExecutionContext.getExecutePath(), - taskExecutionContext.getTenantCode()); + taskExecutionContext.getLogPath(), + taskExecutionContext.getExecutePath(), + taskExecutionContext.getTenantCode()); return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight()); } @@ -226,8 +229,9 @@ public class TaskKillProcessor implements NettyRequestProcessor { String executePath, String tenantCode) { if (logPath == null || executePath == null || tenantCode == null) { - logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}", - host, logPath, executePath, tenantCode); + logger.error( + "Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}", + host, logPath, executePath, tenantCode); return Pair.of(false, Collections.emptyList()); } try { @@ -244,9 +248,10 @@ public class TaskKillProcessor implements NettyRequestProcessor { Thread.currentThread().interrupt(); logger.error("kill yarn job error, the current thread has been interrtpted", e); } catch (Exception e) { - logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, executePath, tenantCode, e); + logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, + executePath, tenantCode, e); } return Pair.of(false, Collections.emptyList()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java index 140e410242..9cae7b9f6a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.rpc; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; -import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor; @@ -29,6 +28,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAck import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor; +import org.apache.dolphinscheduler.service.log.LoggerRequestProcessor; import java.io.Closeable; @@ -78,7 +78,8 @@ public class WorkerRpcServer implements Closeable { this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, + taskExecuteRunningAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index 9cb1a1e4f4..de2780af86 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -17,9 +17,8 @@ package org.apache.dolphinscheduler.server.worker.runner; -import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; -import com.google.common.base.Strings; -import lombok.NonNull; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.storage.StorageOperate; @@ -38,27 +37,33 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.dolphinscheduler.service.utils.ProcessUtils; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Date; import java.util.List; -import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import javax.annotation.Nullable; + +import lombok.NonNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import com.google.common.base.Strings; public abstract class WorkerTaskExecuteRunnable implements Runnable { - protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class)); + protected final Logger logger = LoggerFactory + .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class)); protected final TaskExecutionContext taskExecutionContext; protected final WorkerConfig workerConfig; @@ -71,13 +76,13 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { protected @Nullable AbstractTask task; protected WorkerTaskExecuteRunnable( - @NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull String masterAddress, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull AlertClientService alertClientService, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate) { + @NonNull TaskExecutionContext taskExecutionContext, + @NonNull WorkerConfig workerConfig, + @NonNull String masterAddress, + @NonNull WorkerMessageSender workerMessageSender, + @NonNull AlertClientService alertClientService, + @NonNull TaskPluginManager taskPluginManager, + @Nullable StorageOperate storageOperate) { this.taskExecutionContext = taskExecutionContext; this.workerConfig = workerConfig; this.masterAddress = masterAddress; @@ -115,7 +120,9 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE); taskExecutionContext.setEndTime(new Date()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); - logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", TaskExecutionStatus.FAILURE); + logger.info( + "Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", + TaskExecutionStatus.FAILURE); } public void cancelTask() { @@ -125,10 +132,13 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { task.cancel(); List appIds = LogUtils.getAppIdsFromLogFile(taskExecutionContext.getLogPath()); if (CollectionUtils.isNotEmpty(appIds)) { - ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath()); + ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), + taskExecutionContext.getExecutePath()); } } catch (Exception e) { - logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e); + logger.error( + "Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", + e); } } } @@ -139,7 +149,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { // set the thread name to make sure the log be written to the task log file Thread.currentThread().setName(taskExecutionContext.getTaskLogName()); - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); logger.info("Begin to pulling task"); initializeTask(); @@ -148,14 +159,17 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); taskExecutionContext.setEndTime(new Date()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); - logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success"); + workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, + CommandType.TASK_EXECUTE_RESULT); + logger.info( + "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success"); return; } beforeExecute(); - TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender).masterAddress(masterAddress).build(); + TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender) + .masterAddress(masterAddress).build(); executeTask(taskCallBack); afterExecute(); @@ -179,7 +193,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskExecutionContext.setEnvFile(systemEnvPath); logger.info("Set task envFile: {}", systemEnvPath); - String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); + String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setTaskAppId(taskAppId); logger.info("Set task appId: {}", taskAppId); @@ -202,11 +217,13 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); if (null == taskChannel) { - throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", taskExecutionContext.getTaskType())); + throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", + taskExecutionContext.getTaskType())); } task = taskChannel.createTask(taskExecutionContext); if (task == null) { - throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", taskExecutionContext.getTaskType())); + throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", + taskExecutionContext.getTaskType())); } logger.info("Task plugin: {} create success", taskExecutionContext.getTaskType()); @@ -225,8 +242,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { logger.info("The current task need to send alert, begin to send alert"); TaskExecutionStatus status = task.getExitStatus(); TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo(); - int strategy = status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode(); - alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy); + int strategy = + status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode(); + alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), + taskAlertInfo.getContent(), strategy); logger.info("Success send alert"); } @@ -238,14 +257,16 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool())); workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT); - logger.info("Send task execute result to master, the current task status: {}", taskExecutionContext.getCurrentExecutionStatus()); + logger.info("Send task execute result to master, the current task status: {}", + taskExecutionContext.getCurrentExecutionStatus()); } protected void clearTaskExecPathIfNeeded() { String execLocalPath = taskExecutionContext.getExecutePath(); if (!CommonUtils.isDevelopMode()) { - logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", execLocalPath); + logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", + execLocalPath); // get exec dir if (Strings.isNullOrEmpty(execLocalPath)) { logger.warn("The task execute file is {} no need to clear", taskExecutionContext.getTaskName()); @@ -264,11 +285,14 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { if (e instanceof NoSuchFileException) { // this is expected } else { - logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", execLocalPath, e); + logger.error( + "Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", + execLocalPath, e); } } } else { - logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", execLocalPath); + logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", + execLocalPath); } } diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml index 0533827f0a..fb3d2226d8 100644 --- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml @@ -29,10 +29,10 @@ + converterClass="org.apache.dolphinscheduler.service.log.SensitiveDataConverter"/> - - + + taskAppId ${log.base} diff --git a/pom.xml b/pom.xml index ef784156f7..2bd06eb2aa 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,6 @@ dolphinscheduler-spi dolphinscheduler-registry dolphinscheduler-task-plugin - dolphinscheduler-server dolphinscheduler-common dolphinscheduler-api dolphinscheduler-dao @@ -53,7 +52,6 @@ dolphinscheduler-meter dolphinscheduler-master dolphinscheduler-worker - dolphinscheduler-log-server dolphinscheduler-tools dolphinscheduler-ui dolphinscheduler-scheduler-plugin @@ -99,11 +97,6 @@ - - org.apache.dolphinscheduler - dolphinscheduler-server - ${project.version} - org.apache.dolphinscheduler dolphinscheduler-master @@ -114,11 +107,6 @@ dolphinscheduler-worker ${project.version} - - org.apache.dolphinscheduler - dolphinscheduler-log-server - ${project.version} - org.apache.dolphinscheduler dolphinscheduler-standalone-server