diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index ff0088ea8d..55c5c3446c 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -23,11 +23,13 @@ import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient; import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer; import org.apache.dolphinscheduler.alert.service.AlertBootstrapService; import org.apache.dolphinscheduler.alert.service.ListenerEventPostService; +import org.apache.dolphinscheduler.common.CommonConfiguration; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryAutoConfiguration; +import org.apache.dolphinscheduler.dao.DaoConfiguration; +import org.apache.dolphinscheduler.registry.api.RegistryConfiguration; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -37,14 +39,13 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.FilterType; +import org.springframework.context.annotation.Import; @Slf4j +@Import({CommonConfiguration.class, + DaoConfiguration.class, + RegistryConfiguration.class}) @SpringBootApplication -@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { - @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = JdbcRegistryAutoConfiguration.class) -}) public class AlertServer { @Autowired diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java index 20a6412076..6f7d8f43d2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java @@ -18,13 +18,17 @@ package org.apache.dolphinscheduler.api; import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics; +import org.apache.dolphinscheduler.common.CommonConfiguration; import org.apache.dolphinscheduler.common.enums.PluginType; import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler; +import org.apache.dolphinscheduler.dao.DaoConfiguration; import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.entity.PluginDefine; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryAutoConfiguration; +import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; +import org.apache.dolphinscheduler.registry.api.RegistryConfiguration; +import org.apache.dolphinscheduler.service.ServiceConfiguration; import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer; import org.apache.dolphinscheduler.spi.params.base.PluginParams; @@ -38,21 +42,19 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.web.servlet.ServletComponentScan; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.FilterType; +import org.springframework.context.annotation.Import; import org.springframework.context.event.EventListener; +@Slf4j +@Import({DaoConfiguration.class, + CommonConfiguration.class, + ServiceConfiguration.class, + StorageConfiguration.class, + RegistryConfiguration.class}) @ServletComponentScan @SpringBootApplication -@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { - @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = JdbcRegistryAutoConfiguration.class) -}) -@Slf4j public class ApiApplicationServer { - @Autowired - private TaskPluginManager taskPluginManager; - @Autowired private PluginDao pluginDao; @@ -66,8 +68,8 @@ public class ApiApplicationServer { public void run(ApplicationReadyEvent readyEvent) { log.info("Received spring application context ready event will load taskPlugin and write to DB"); // install task plugin - taskPluginManager.loadPlugin(); - for (Map.Entry entry : taskPluginManager.getTaskChannelFactoryMap().entrySet()) { + TaskPluginManager.loadPlugin(); + for (Map.Entry entry : TaskPluginManager.getTaskChannelFactoryMap().entrySet()) { String taskPluginName = entry.getKey(); TaskChannelFactory taskChannelFactory = entry.getValue(); List params = taskChannelFactory.getParams(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java index b5a8ea7a46..d7b026ed16 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java @@ -25,10 +25,10 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.commons.beanutils.BeanUtils; -import java.lang.reflect.InvocationTargetException; import java.util.Date; import lombok.Data; +import lombok.SneakyThrows; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; @@ -107,7 +107,8 @@ public class TaskUpdateRequest { * @param taskDefinition exists task definition object * @return task definition */ - public TaskDefinition mergeIntoTaskDefinition(TaskDefinition taskDefinition) throws InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchMethodException { + @SneakyThrows + public TaskDefinition mergeIntoTaskDefinition(TaskDefinition taskDefinition) { TaskDefinition taskDefinitionDeepCopy = (TaskDefinition) BeanUtils.cloneBean(taskDefinition); assert taskDefinitionDeepCopy != null; if (this.name != null) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 2bf84a0bcc..e6893b04fb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -238,9 +238,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired private DataSourceMapper dataSourceMapper; - @Autowired - private TaskPluginManager taskPluginManager; - @Autowired private WorkFlowLineageService workFlowLineageService; @@ -424,7 +421,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJson); } for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() + if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder() .taskType(taskDefinitionLog.getTaskType()) .taskParams(taskDefinitionLog.getTaskParams()) .dependence(taskDefinitionLog.getDependence()) @@ -1618,7 +1615,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro // check whether the process definition json is normal for (TaskNode taskNode : taskNodes) { - if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() + if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder() .taskType(taskNode.getType()) .taskParams(taskNode.getTaskParams()) .dependence(taskNode.getDependence()) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 36cc986607..06e9fd95db 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -70,11 +70,9 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper; -import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; -import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; @@ -177,18 +175,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired UsersService usersService; - @Autowired - private TenantMapper tenantMapper; - @Autowired TaskDefinitionMapper taskDefinitionMapper; - @Autowired - private TaskPluginManager taskPluginManager; - - @Autowired - private ScheduleMapper scheduleMapper; - @Autowired private RelationSubWorkflowMapper relationSubWorkflowMapper; @@ -725,7 +714,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() + if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder() .taskType(taskDefinitionLog.getTaskType()) .taskParams(taskDefinitionLog.getTaskParams()) .dependence(taskDefinitionLog.getDependence()) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 71e4d04d6f..2887606a1d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -75,7 +75,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -135,9 +134,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private ProcessService processService; - @Autowired - private TaskPluginManager taskPluginManager; - @Autowired private ProcessDefinitionService processDefinitionService; @@ -147,8 +143,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * create task definition * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param taskDefinitionJson task definition json */ @Transactional @@ -171,7 +167,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() + if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder() .taskType(taskDefinitionLog.getTaskType()) .taskParams(taskDefinitionLog.getTaskParams()) .dependence(taskDefinitionLog.getDependence()) @@ -212,7 +208,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe Project project = projectMapper.queryByCode(taskDefinition.getProjectCode()); projectService.checkProjectAndAuthThrowException(user, project, permissions); - if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() + if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder() .taskType(taskDefinition.getTaskType()) .taskParams(taskDefinition.getTaskParams()) .dependence(taskDefinition.getDependence()) @@ -242,7 +238,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * Create resource task definition * - * @param loginUser login user + * @param loginUser login user * @param taskCreateRequest task definition json * @return new TaskDefinition have created */ @@ -286,11 +282,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * create single task definition that binds the workflow * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param processDefinitionCode process definition code * @param taskDefinitionJsonObj task definition json object - * @param upstreamCodes upstream task codes, sep comma + * @param upstreamCodes upstream task codes, sep comma * @return create result code */ @Transactional @@ -325,7 +321,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); return result; } - if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() + if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder() .taskType(taskDefinition.getTaskType()) .taskParams(taskDefinition.getTaskParams()) .dependence(taskDefinition.getDependence()) @@ -412,10 +408,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * query task definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code * @param processCode process code - * @param taskName task name + * @param taskName task name */ @Override public Map queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, @@ -473,12 +469,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * Delete resource task definition by code - * + *

* Only task release state offline and no downstream tasks can be deleted, will also remove the exists * task relation [upstreamTaskCode, taskCode] * * @param loginUser login user - * @param taskCode task code + * @param taskCode task code */ @Transactional @Override @@ -546,9 +542,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * update task definition * - * @param loginUser login user - * @param projectCode project code - * @param taskCode task code + * @param loginUser login user + * @param projectCode project code + * @param taskCode task code * @param taskDefinitionJsonObj task definition json object */ @Transactional @@ -604,8 +600,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * update task definition * - * @param loginUser login user - * @param taskCode task code + * @param loginUser login user + * @param taskCode task code * @param taskUpdateRequest task definition json object * @return new TaskDefinition have updated */ @@ -619,13 +615,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe throw new ServiceException(Status.TASK_DEFINITION_NOT_EXISTS, taskCode); } - TaskDefinition taskDefinitionUpdate; - try { - taskDefinitionUpdate = taskUpdateRequest.mergeIntoTaskDefinition(taskDefinitionOriginal); - } catch (InvocationTargetException | IllegalAccessException | InstantiationException - | NoSuchMethodException e) { - throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, taskUpdateRequest.toString()); - } + TaskDefinition taskDefinitionUpdate = taskUpdateRequest.mergeIntoTaskDefinition(taskDefinitionOriginal); + this.checkTaskDefinitionValid(loginUser, taskDefinitionUpdate, TASK_DEFINITION_UPDATE); this.TaskDefinitionUpdateValid(taskDefinitionOriginal, taskDefinitionUpdate); @@ -656,7 +647,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * Get resource task definition by code * * @param loginUser login user - * @param taskCode task code + * @param taskCode task code * @return TaskDefinition */ @Override @@ -674,7 +665,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * Get resource task definition according to query parameter * - * @param loginUser login user + * @param loginUser login user * @param taskFilterRequest taskFilterRequest object you want to filter the resource task definitions * @return TaskDefinitions of page */ @@ -741,7 +732,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); return null; } - if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() + if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder() .taskType(taskDefinitionToUpdate.getTaskType()) .taskParams(taskDefinitionToUpdate.getTaskParams()) .dependence(taskDefinitionToUpdate.getDependence()) @@ -846,11 +837,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * update task definition and upstream * - * @param loginUser login user - * @param projectCode project code - * @param taskCode task definition code + * @param loginUser login user + * @param projectCode project code + * @param taskCode task definition code * @param taskDefinitionJsonObj task definition json object - * @param upstreamCodes upstream task codes, sep comma + * @param upstreamCodes upstream task codes, sep comma * @return update result code */ @Override @@ -1019,10 +1010,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * switch task definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param taskCode task code - * @param version the version user want to switch + * @param taskCode task code + * @param version the version user want to switch */ @Transactional @Override @@ -1277,9 +1268,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe /** * release task definition * - * @param loginUser login user - * @param projectCode project code - * @param code task definition code + * @param loginUser login user + * @param projectCode project code + * @param code task definition code * @param releaseState releaseState * @return update result code */ diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 9fb4d83052..7d9a4f9338 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -84,6 +84,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -142,9 +143,6 @@ public class ProcessInstanceServiceTest { @Mock TaskDefinitionMapper taskDefinitionMapper; - @Mock - TaskPluginManager taskPluginManager; - @Mock ScheduleMapper scheduleMapper; @@ -625,21 +623,27 @@ public class ProcessInstanceServiceTest { List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); when(processDefinitionService.checkProcessNodeList(taskRelationJson, taskDefinitionLogs)).thenReturn(result); putMsg(result, Status.SUCCESS, projectCode); - when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); - Map processInstanceFinishRes = - processInstanceService.updateProcessInstance(loginUser, projectCode, 1, - taskRelationJson, taskDefinitionJson, "2020-02-21 00:00:00", true, "", "", 0); - Assertions.assertEquals(Status.SUCCESS, processInstanceFinishRes.get(Constants.STATUS)); - // success - when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); - putMsg(result, Status.SUCCESS, projectCode); - - when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.FALSE, Boolean.FALSE)) - .thenReturn(1); - Map successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, - taskRelationJson, taskDefinitionJson, "2020-02-21 00:00:00", Boolean.FALSE, "", "", 0); - Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + try ( + MockedStatic taskPluginManagerMockedStatic = + Mockito.mockStatic(TaskPluginManager.class)) { + taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any())) + .thenReturn(true); + Map processInstanceFinishRes = + processInstanceService.updateProcessInstance(loginUser, projectCode, 1, + taskRelationJson, taskDefinitionJson, "2020-02-21 00:00:00", true, "", "", 0); + Assertions.assertEquals(Status.SUCCESS, processInstanceFinishRes.get(Constants.STATUS)); + + // success + when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); + putMsg(result, Status.SUCCESS, projectCode); + + when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.FALSE, Boolean.FALSE)) + .thenReturn(1); + Map successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, + taskRelationJson, taskDefinitionJson, "2020-02-21 00:00:00", Boolean.FALSE, "", "", 0); + Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + } } @Test diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index e77eab8029..ff8ca6ba2c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -17,13 +17,19 @@ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow; +import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest; @@ -75,13 +81,17 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class TaskDefinitionServiceImplTest { @InjectMocks @@ -114,9 +124,6 @@ public class TaskDefinitionServiceImplTest { @Mock private ProcessTaskRelationMapper processTaskRelationMapper; - @Mock - private TaskPluginManager taskPluginManager; - @Mock private ProcessTaskRelationService processTaskRelationService; @@ -155,61 +162,73 @@ public class TaskDefinitionServiceImplTest { @Test public void createTaskDefinition() { - Project project = getProject(); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); - - Map result = new HashMap<>(); - Mockito.when(projectService.hasProjectAndWritePerm(user, project, result)) - .thenReturn(true); - Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); + try ( + MockedStatic taskPluginManagerMockedStatic = + Mockito.mockStatic(TaskPluginManager.class)) { + taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any())) + .thenReturn(true); + Project project = getProject(); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); + + Map result = new HashMap<>(); + when(projectService.hasProjectAndWritePerm(user, project, result)) + .thenReturn(true); + + String createTaskDefinitionJson = + "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" + + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," + + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" + + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," + + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," + + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; + Map relation = taskDefinitionService + .createTaskDefinition(user, PROJECT_CODE, createTaskDefinitionJson); + assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); - String createTaskDefinitionJson = - "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" - + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," - + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" - + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," - + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," - + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," - + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; - Map relation = taskDefinitionService - .createTaskDefinition(user, PROJECT_CODE, createTaskDefinitionJson); - Assertions.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); + } } @Test public void updateTaskDefinition() { - String taskDefinitionJson = getTaskDefinitionJson();; - - Project project = getProject(); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); - - Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, PROJECT_CODE); - Mockito.when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true); - - Mockito.when(processService.isTaskOnline(TASK_CODE)).thenReturn(Boolean.FALSE); - Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition()); - Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); - Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); - Mockito.when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1); - Mockito.when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition()); - Mockito.when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1); - Mockito.when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1); - Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); - Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); - Mockito.when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0)) - .thenReturn(getProcessTaskRelationList2()); - Mockito.when(processTaskRelationMapper - .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1); - result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson); - Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - // failure - Mockito.when(processTaskRelationMapper - .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson)); - Assertions.assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(), - ((ServiceException) exception).getCode()); + try ( + MockedStatic taskPluginManagerMockedStatic = + Mockito.mockStatic(TaskPluginManager.class)) { + taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any())) + .thenReturn(true); + String taskDefinitionJson = getTaskDefinitionJson(); + + Project project = getProject(); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); + + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, PROJECT_CODE); + when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true); + + when(processService.isTaskOnline(TASK_CODE)).thenReturn(Boolean.FALSE); + when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition()); + when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); + when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); + when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1); + when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition()); + when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1); + when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1); + when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); + when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0)) + .thenReturn(getProcessTaskRelationList2()); + when(processTaskRelationMapper + .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1); + result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson); + assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + // failure + when(processTaskRelationMapper + .updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2); + exception = Assertions.assertThrows(ServiceException.class, + () -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, + taskDefinitionJson)); + assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(), + ((ServiceException) exception).getCode()); + } } @@ -217,72 +236,72 @@ public class TaskDefinitionServiceImplTest { public void queryTaskDefinitionByName() { String taskName = "task"; Project project = getProject(); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, PROJECT_CODE); - Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION)) + when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION)) .thenReturn(result); - Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), PROCESS_DEFINITION_CODE, taskName)) + when(taskDefinitionMapper.queryByName(project.getCode(), PROCESS_DEFINITION_CODE, taskName)) .thenReturn(new TaskDefinition()); Map relation = taskDefinitionService .queryTaskDefinitionByName(user, PROJECT_CODE, PROCESS_DEFINITION_CODE, taskName); - Assertions.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); + assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); } @Test public void deleteTaskDefinitionByCode() { Project project = getProject(); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); // error task definition not find exception = Assertions.assertThrows(ServiceException.class, () -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); - Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); + assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); // error delete single task definition object - Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); - Mockito.when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(0); - Mockito.when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true); + when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); + when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(0); + when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true); exception = Assertions.assertThrows(ServiceException.class, () -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); - Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR.getCode(), + assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR.getCode(), ((ServiceException) exception).getCode()); // success - Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, + doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_DELETE); - Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(TASK_CODE)).thenReturn(new ArrayList<>()); - Mockito.when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(1); + when(processTaskRelationMapper.queryDownstreamByTaskCode(TASK_CODE)).thenReturn(new ArrayList<>()); + when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(1); Assertions.assertDoesNotThrow(() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE)); } @Test public void switchVersion() { Project project = getProject(); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS, PROJECT_CODE); - Mockito.when( + when( projectService.checkProjectAndAuth(user, project, PROJECT_CODE, WORKFLOW_SWITCH_TO_THIS_VERSION)) - .thenReturn(result); + .thenReturn(result); - Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, VERSION)) + when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, VERSION)) .thenReturn(new TaskDefinitionLog()); TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setProjectCode(PROJECT_CODE); - Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)) + when(taskDefinitionMapper.queryByCode(TASK_CODE)) .thenReturn(taskDefinition); - Mockito.when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1); + when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1); Map relation = taskDefinitionService .switchVersion(user, PROJECT_CODE, TASK_CODE, VERSION); - Assertions.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); + assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); } private void putMsg(Map result, Status status, Object... statusParams) { @@ -331,7 +350,7 @@ public class TaskDefinitionServiceImplTest { @Test public void genTaskCodeList() { Map genTaskCodeList = taskDefinitionService.genTaskCodeList(10); - Assertions.assertEquals(Status.SUCCESS, genTaskCodeList.get(Constants.STATUS)); + assertEquals(Status.SUCCESS, genTaskCodeList.get(Constants.STATUS)); } @Test @@ -348,31 +367,31 @@ public class TaskDefinitionServiceImplTest { taskMainInfo.setUpstreamTaskName("4"); taskMainInfoIPage.setRecords(Collections.singletonList(taskMainInfo)); taskMainInfoIPage.setTotal(10L); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); - Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION)) + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); + when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION)) .thenReturn(checkResult); - Mockito.when(taskDefinitionMapper.queryDefineListPaging(Mockito.any(Page.class), Mockito.anyLong(), + when(taskDefinitionMapper.queryDefineListPaging(Mockito.any(Page.class), Mockito.anyLong(), Mockito.isNull(), Mockito.anyString(), Mockito.isNull())) - .thenReturn(taskMainInfoIPage); - Mockito.when(taskDefinitionMapper.queryDefineListByCodeList(PROJECT_CODE, Collections.singletonList(3L))) + .thenReturn(taskMainInfoIPage); + when(taskDefinitionMapper.queryDefineListByCodeList(PROJECT_CODE, Collections.singletonList(3L))) .thenReturn(Collections.singletonList(taskMainInfo)); Result result = taskDefinitionService.queryTaskDefinitionListPaging(user, PROJECT_CODE, null, null, null, pageNo, pageSize); - Assertions.assertEquals(Status.SUCCESS.getMsg(), result.getMsg()); + assertEquals(Status.SUCCESS.getMsg(), result.getMsg()); } @Test public void testReleaseTaskDefinition() { - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); Project project = getProject(); // check task dose not exist Map result = new HashMap<>(); putMsg(result, Status.TASK_DEFINE_NOT_EXIST, TASK_CODE); - Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, null)).thenReturn(result); + when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, null)).thenReturn(result); Map map = taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE); - Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST, map.get(Constants.STATUS)); + assertEquals(Status.TASK_DEFINE_NOT_EXIST, map.get(Constants.STATUS)); // process definition offline putMsg(result, Status.SUCCESS); @@ -384,23 +403,23 @@ public class TaskDefinitionServiceImplTest { "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}"; taskDefinition.setTaskParams(params); taskDefinition.setTaskType("SHELL"); - Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); + when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition); - Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, taskDefinition.getVersion())) + when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, taskDefinition.getVersion())) .thenReturn(taskDefinitionLog); Map offlineTaskResult = taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE); - Assertions.assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS)); + assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS)); // process definition online, resource exist Map onlineTaskResult = taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.ONLINE); - Assertions.assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS)); + assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS)); // release error code Map failResult = taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.getEnum(2)); - Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS)); + assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS)); } @Test @@ -410,133 +429,127 @@ public class TaskDefinitionServiceImplTest { taskCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE); // error process definition not find - exception = Assertions.assertThrows(ServiceException.class, + assertThrowsServiceException(Status.PROCESS_DEFINE_NOT_EXIST, () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); // error project not find - Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition()); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); - Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) + when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition()); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); + doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) .checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE); - exception = Assertions.assertThrows(ServiceException.class, + assertThrowsServiceException(Status.PROJECT_NOT_EXIST, () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); // error task definition taskCreateRequest.setTaskParams(TASK_PARAMETER); - Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), - TASK_DEFINITION_CREATE); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(), - ((ServiceException) exception).getCode()); - - // error create task definition object - Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); - Mockito.when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(0); - exception = Assertions.assertThrows(ServiceException.class, + doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE); + assertThrowsServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR.getCode(), - ((ServiceException) exception).getCode()); - // error sync to task definition log - Mockito.when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(1); - Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); - Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_LOG_ERROR.getCode(), - ((ServiceException) exception).getCode()); - - // success - Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); - // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService - Mockito.when( - processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class), - isA(Boolean.class), - isA(TaskRelationUpdateUpstreamRequest.class))) - .thenReturn(getProcessTaskRelationList()); - Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class), - isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition()); - Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + try ( + MockedStatic taskPluginManagerMockedStatic = + Mockito.mockStatic(TaskPluginManager.class)) { + taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any())) + .thenReturn(true); + + // error create task definition object + when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(0); + assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_ERROR, + () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + + // error sync to task definition log + when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(1); + when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0); + assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR, + () -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + + // success + when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); + // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService + when( + processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class), + isA(Boolean.class), + isA(TaskRelationUpdateUpstreamRequest.class))) + .thenReturn(getProcessTaskRelationList()); + when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class), + isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition()); + assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); + } } @Test public void testUpdateTaskDefinitionV2() { TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest(); + TaskDefinition taskDefinition = getTaskDefinition(); + Project project = getProject(); // error task definition not exists - exception = Assertions.assertThrows(ServiceException.class, + assertThrowsServiceException(Status.TASK_DEFINITION_NOT_EXISTS, () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - Assertions.assertEquals(Status.TASK_DEFINITION_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode()); // error project not find - Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); - Mockito.when(projectMapper.queryByCode(isA(Long.class))).thenReturn(getProject()); - Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) - .checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_UPDATE); - exception = Assertions.assertThrows(ServiceException.class, + when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); + when(projectMapper.queryByCode(isA(Long.class))).thenReturn(project); + doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) + .checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_UPDATE); + assertThrowsServiceException(Status.PROJECT_NOT_EXIST, () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); // error task definition - Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), - TASK_DEFINITION_UPDATE); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(), - ((ServiceException) exception).getCode()); - - // error task definition already online - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(), - ((ServiceException) exception).getCode()); - - // error task definition nothing update - Mockito.when(processService.isTaskOnline(TASK_CODE)).thenReturn(false); - Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - Assertions.assertEquals(Status.TASK_DEFINITION_NOT_CHANGE.getCode(), ((ServiceException) exception).getCode()); - - // error task definition version invalid - taskUpdateRequest.setTaskPriority(String.valueOf(Priority.HIGH)); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - Assertions.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ((ServiceException) exception).getCode()); - - // error task definition update effect number - Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(VERSION); - Mockito.when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(0); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - Assertions.assertEquals(Status.UPDATE_TASK_DEFINITION_ERROR.getCode(), - ((ServiceException) exception).getCode()); - - // error task definition log insert - Mockito.when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(1); - Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0); - exception = Assertions.assertThrows(ServiceException.class, - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); - Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_LOG_ERROR.getCode(), - ((ServiceException) exception).getCode()); - - // success - Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); - // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService - Mockito.when( - processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class), - isA(Boolean.class), - isA(TaskRelationUpdateUpstreamRequest.class))) - .thenReturn(getProcessTaskRelationList()); - Assertions.assertDoesNotThrow( - () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_UPDATE); + + try ( + MockedStatic taskPluginManagerMockedStatic = + Mockito.mockStatic(TaskPluginManager.class)) { + taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any())) + .thenReturn(false); + assertThrowsServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + } - TaskDefinition taskDefinition = - taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest); - Assertions.assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion()); + try ( + MockedStatic taskPluginManagerMockedStatic = + Mockito.mockStatic(TaskPluginManager.class)) { + taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any())) + .thenReturn(true); + // error task definition nothing update + when(processService.isTaskOnline(TASK_CODE)).thenReturn(false); + assertThrowsServiceException(Status.TASK_DEFINITION_NOT_CHANGE, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + + // error task definition version invalid + taskUpdateRequest.setTaskPriority(String.valueOf(Priority.HIGH)); + assertThrowsServiceException(Status.DATA_IS_NOT_VALID, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + + // error task definition update effect number + when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(VERSION); + when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(0); + assertThrowsServiceException(Status.UPDATE_TASK_DEFINITION_ERROR, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + + // error task definition log insert + when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(1); + when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0); + assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR, + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + + // success + when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); + // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService + when( + processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class), + isA(Boolean.class), + isA(TaskRelationUpdateUpstreamRequest.class))) + .thenReturn(getProcessTaskRelationList()); + Assertions.assertDoesNotThrow( + () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); + + taskDefinition = + taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest); + assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion()); + } } @Test @@ -549,28 +562,28 @@ public class TaskDefinitionServiceImplTest { ArrayList taskDefinitionLogs = new ArrayList<>(); taskDefinitionLogs.add(taskDefinitionLog); Integer version = 1; - Mockito.when(processDefinitionMapper.queryByCode(isA(long.class))).thenReturn(processDefinition); + when(processDefinitionMapper.queryByCode(isA(long.class))).thenReturn(processDefinition); // saveProcessDefine - Mockito.when(processDefineLogMapper.queryMaxVersionForDefinition(isA(long.class))).thenReturn(version); - Mockito.when(processDefineLogMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1); - Mockito.when(processDefinitionMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1); + when(processDefineLogMapper.queryMaxVersionForDefinition(isA(long.class))).thenReturn(version); + when(processDefineLogMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1); + when(processDefinitionMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1); int insertVersion = processServiceImpl.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); - Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)) + when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)) .thenReturn(insertVersion); - Assertions.assertEquals(insertVersion, version + 1); + assertEquals(insertVersion, version + 1); // saveTaskRelation List processTaskRelationLogList = getProcessTaskRelationLogList(); - Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getCode()))) + when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getCode()))) .thenReturn(processTaskRelationList); - Mockito.when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1); - Mockito.when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1); + when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1); + when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1); int insertResult = processServiceImpl.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, processTaskRelationLogList, taskDefinitionLogs, Boolean.TRUE); - Assertions.assertEquals(Constants.EXIT_CODE_SUCCESS, insertResult); + assertEquals(Constants.EXIT_CODE_SUCCESS, insertResult); Assertions.assertDoesNotThrow( () -> taskDefinitionService.updateDag(loginUser, processDefinition.getCode(), processTaskRelationList, taskDefinitionLogs)); @@ -581,55 +594,60 @@ public class TaskDefinitionServiceImplTest { // error task definition not exists exception = Assertions.assertThrows(ServiceException.class, () -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); - Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); + assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); // error task definition not exists - Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); - Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService) + when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition()); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); + doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService) .checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION); exception = Assertions.assertThrows(ServiceException.class, () -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); - Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(), + assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(), ((ServiceException) exception).getCode()); // success - Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION); + doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION); Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); } @Test public void testUpdateTaskWithUpstream() { - - String taskDefinitionJson = getTaskDefinitionJson(); - TaskDefinition taskDefinition = getTaskDefinition(); - taskDefinition.setFlag(Flag.NO); - TaskDefinition taskDefinitionSecond = getTaskDefinition(); - taskDefinitionSecond.setCode(5); - - user.setUserType(UserType.ADMIN_USER); - Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); - Mockito.when(projectService.hasProjectAndWritePerm(user, getProject(), new HashMap<>())).thenReturn(true); - Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); - Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true); - Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); - Mockito.when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1); - Mockito.when(taskDefinitionLogMapper.insert(Mockito.any())).thenReturn(1); - - Mockito.when(taskDefinitionMapper.queryByCodeList(Mockito.anySet())) - .thenReturn(Arrays.asList(taskDefinition, taskDefinitionSecond)); - - Mockito.when(processTaskRelationMapper.queryUpstreamByCode(PROJECT_CODE, TASK_CODE)) - .thenReturn(getProcessTaskRelationListV2()); - Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition()); - Mockito.when(processTaskRelationMapper.batchInsert(Mockito.anyList())).thenReturn(1); - Mockito.when(processTaskRelationMapper.updateById(Mockito.any())).thenReturn(1); - Mockito.when(processTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2); - // success - Map successMap = taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE, - taskDefinitionJson, UPSTREAM_CODE); - Assertions.assertEquals(Status.SUCCESS, successMap.get(Constants.STATUS)); - user.setUserType(UserType.GENERAL_USER); + try ( + MockedStatic taskPluginManagerMockedStatic = + Mockito.mockStatic(TaskPluginManager.class)) { + taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any())) + .thenReturn(true); + String taskDefinitionJson = getTaskDefinitionJson(); + TaskDefinition taskDefinition = getTaskDefinition(); + taskDefinition.setFlag(Flag.NO); + TaskDefinition taskDefinitionSecond = getTaskDefinition(); + taskDefinitionSecond.setCode(5); + + user.setUserType(UserType.ADMIN_USER); + when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); + when(projectService.hasProjectAndWritePerm(user, getProject(), new HashMap<>())).thenReturn(true); + when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); + when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); + when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1); + when(taskDefinitionLogMapper.insert(Mockito.any())).thenReturn(1); + + when(taskDefinitionMapper.queryByCodeList(Mockito.anySet())) + .thenReturn(Arrays.asList(taskDefinition, taskDefinitionSecond)); + + when(processTaskRelationMapper.queryUpstreamByCode(PROJECT_CODE, TASK_CODE)) + .thenReturn(getProcessTaskRelationListV2()); + when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)) + .thenReturn(getProcessDefinition()); + when(processTaskRelationMapper.batchInsert(Mockito.anyList())).thenReturn(1); + when(processTaskRelationMapper.updateById(Mockito.any())).thenReturn(1); + when(processTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2); + // success + Map successMap = taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE, + taskDefinitionJson, UPSTREAM_CODE); + assertEquals(Status.SUCCESS, successMap.get(Constants.STATUS)); + user.setUserType(UserType.GENERAL_USER); + } } private String getTaskDefinitionJson() { diff --git a/dolphinscheduler-api/src/test/resources/logback-spring.xml b/dolphinscheduler-api/src/test/resources/logback-spring.xml new file mode 100644 index 0000000000..9159c3b02d --- /dev/null +++ b/dolphinscheduler-api/src/test/resources/logback-spring.xml @@ -0,0 +1,57 @@ + + + + + + + + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n + + UTF-8 + + + + + ${log.base}/dolphinscheduler-api.log + + ${log.base}/dolphinscheduler-api.%d{yyyy-MM-dd_HH}.%i.log + 168 + 64MB + 50GB + true + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n + + UTF-8 + + + + + + + + + + + + diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/CommonConfiguration.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/CommonConfiguration.java new file mode 100644 index 0000000000..5411e4cbc1 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/CommonConfiguration.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common; + +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ComponentScan("org.apache.dolphinscheduler.common") +public class CommonConfiguration { +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java index 25d8024474..8aab70f304 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java @@ -40,7 +40,6 @@ public class RemoteLogUtils { @Autowired private RemoteLogService autowiredRemoteLogService; - @PostConstruct private void init() { remoteLogService = autowiredRemoteLogService; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/HttpProperty.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/HttpProperty.java deleted file mode 100644 index 11786fd5a3..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/HttpProperty.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.process; - -import org.apache.dolphinscheduler.common.enums.HttpParametersType; - -import java.util.Objects; - -public class HttpProperty { - - /** - * key - */ - private String prop; - - /** - * httpParametersType - */ - private HttpParametersType httpParametersType; - - /** - * value - */ - private String value; - - public HttpProperty() { - } - - public HttpProperty(String prop, HttpParametersType httpParametersType, String value) { - this.prop = prop; - this.httpParametersType = httpParametersType; - this.value = value; - } - - /** - * getter method - * - * @return the prop - * @see HttpProperty#prop - */ - public String getProp() { - return prop; - } - - /** - * setter method - * - * @param prop the prop to set - * @see HttpProperty#prop - */ - public void setProp(String prop) { - this.prop = prop; - } - - /** - * getter method - * - * @return the value - * @see HttpProperty#value - */ - public String getValue() { - return value; - } - - /** - * setter method - * - * @param value the value to set - * @see HttpProperty#value - */ - public void setValue(String value) { - this.value = value; - } - - public HttpParametersType getHttpParametersType() { - return httpParametersType; - } - - public void setHttpParametersType(HttpParametersType httpParametersType) { - this.httpParametersType = httpParametersType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - HttpProperty property = (HttpProperty) o; - return Objects.equals(prop, property.prop) - && Objects.equals(value, property.value); - } - - @Override - public int hashCode() { - return Objects.hash(prop, value); - } - - @Override - public String toString() { - return "HttpProperty{" - + "prop='" + prop + '\'' - + ", httpParametersType=" + httpParametersType - + ", value='" + value + '\'' - + '}'; - } -} diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-api/src/main/java/org/apache/dolphinscheduler/dao/plugin/api/DatabaseEnvironmentCondition.java b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-api/src/main/java/org/apache/dolphinscheduler/dao/plugin/api/DatabaseEnvironmentCondition.java new file mode 100644 index 0000000000..32fb807a9b --- /dev/null +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-api/src/main/java/org/apache/dolphinscheduler/dao/plugin/api/DatabaseEnvironmentCondition.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.plugin.api; + +import java.util.Arrays; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.type.AnnotatedTypeMetadata; + +public class DatabaseEnvironmentCondition implements Condition { + + private final String profile; + + public DatabaseEnvironmentCondition(String profile) { + this.profile = profile; + } + + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + String[] activeProfiles = context.getEnvironment().getActiveProfiles(); + return Arrays.asList(activeProfiles).contains(profile); + } +} diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginConfiguration.java b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginAutoConfiguration.java similarity index 88% rename from dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginConfiguration.java rename to dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginAutoConfiguration.java index 9aea94f77d..f496679ed5 100644 --- a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginConfiguration.java +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginAutoConfiguration.java @@ -29,14 +29,14 @@ import org.apache.dolphinscheduler.dao.plugin.h2.monitor.H2Monitor; import javax.sql.DataSource; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import com.baomidou.mybatisplus.annotation.DbType; -@Profile("h2") -@Configuration -public class H2DaoPluginConfiguration implements DaoPluginConfiguration { +@Conditional(H2DatabaseEnvironmentCondition.class) +@Configuration(proxyBeanMethods = false) +public class H2DaoPluginAutoConfiguration implements DaoPluginConfiguration { @Autowired private DataSource dataSource; diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DatabaseEnvironmentCondition.java b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DatabaseEnvironmentCondition.java new file mode 100644 index 0000000000..894f38bd20 --- /dev/null +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DatabaseEnvironmentCondition.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.plugin.h2; + +import org.apache.dolphinscheduler.dao.plugin.api.DatabaseEnvironmentCondition; + +public class H2DatabaseEnvironmentCondition extends DatabaseEnvironmentCondition { + + public H2DatabaseEnvironmentCondition() { + super("h2"); + } + +} diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/resources/META-INF/spring.factories b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..c899dfb43f --- /dev/null +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.apache.dolphinscheduler.dao.plugin.h2.H2DaoPluginAutoConfiguration diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginConfiguration.java b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginAutoConfiguration.java similarity index 88% rename from dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginConfiguration.java rename to dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginAutoConfiguration.java index 8b37fca67b..5fb3a350ae 100644 --- a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginConfiguration.java +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginAutoConfiguration.java @@ -28,14 +28,14 @@ import org.apache.dolphinscheduler.dao.plugin.mysql.monitor.MysqlMonitor; import javax.sql.DataSource; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import com.baomidou.mybatisplus.annotation.DbType; -@Profile("mysql") -@Configuration -public class MysqlDaoPluginConfiguration implements DaoPluginConfiguration { +@Configuration(proxyBeanMethods = false) +@Conditional(MysqlDatabaseEnvironmentCondition.class) +public class MysqlDaoPluginAutoConfiguration implements DaoPluginConfiguration { @Autowired private DataSource dataSource; diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDatabaseEnvironmentCondition.java b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDatabaseEnvironmentCondition.java new file mode 100644 index 0000000000..2136e1354e --- /dev/null +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDatabaseEnvironmentCondition.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.plugin.mysql; + +import org.apache.dolphinscheduler.dao.plugin.api.DatabaseEnvironmentCondition; + +public class MysqlDatabaseEnvironmentCondition extends DatabaseEnvironmentCondition { + + public MysqlDatabaseEnvironmentCondition() { + super("mysql"); + } + +} diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/resources/META-INF/spring.factories b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..386c80c676 --- /dev/null +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.apache.dolphinscheduler.dao.plugin.mysql.MysqlDaoPluginAutoConfiguration diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginConfiguration.java b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginAutoConfiguration.java similarity index 88% rename from dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginConfiguration.java rename to dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginAutoConfiguration.java index e57c84fab9..f0467bfd07 100644 --- a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginConfiguration.java +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginAutoConfiguration.java @@ -29,14 +29,14 @@ import org.apache.dolphinscheduler.dao.plugin.postgresql.monitor.PostgresqlMonit import javax.sql.DataSource; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import com.baomidou.mybatisplus.annotation.DbType; -@Profile("postgresql") -@Configuration -public class PostgresqlDaoPluginConfiguration implements DaoPluginConfiguration { +@Conditional(PostgresqlDatabaseEnvironmentCondition.class) +@Configuration(proxyBeanMethods = false) +public class PostgresqlDaoPluginAutoConfiguration implements DaoPluginConfiguration { @Autowired private DataSource dataSource; diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDatabaseEnvironmentCondition.java b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDatabaseEnvironmentCondition.java new file mode 100644 index 0000000000..2c71ea8442 --- /dev/null +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDatabaseEnvironmentCondition.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.plugin.postgresql; + +import org.apache.dolphinscheduler.dao.plugin.api.DatabaseEnvironmentCondition; + +public class PostgresqlDatabaseEnvironmentCondition extends DatabaseEnvironmentCondition { + + public PostgresqlDatabaseEnvironmentCondition() { + super("postgresql"); + } + +} diff --git a/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/resources/META-INF/spring.factories b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..fd6a5f07b9 --- /dev/null +++ b/dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.apache.dolphinscheduler.dao.plugin.postgresql.PostgresqlDaoPluginAutoConfiguration diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java index 985f5b56b4..e089c8086b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java @@ -40,8 +40,8 @@ import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; @Configuration +@ComponentScan("org.apache.dolphinscheduler.dao") @EnableAutoConfiguration -@ComponentScan({"org.apache.dolphinscheduler.dao.plugin"}) @MapperScan(basePackages = "org.apache.dolphinscheduler.dao.mapper", sqlSessionFactoryRef = "sqlSessionFactory") public class DaoConfiguration { diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/BaseDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/BaseDaoTest.java index 231c947f65..6f14eb436e 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/BaseDaoTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/BaseDaoTest.java @@ -22,7 +22,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; -import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.Transactional; @ExtendWith(MockitoExtension.class) @@ -30,6 +29,5 @@ import org.springframework.transaction.annotation.Transactional; @SpringBootApplication(scanBasePackageClasses = DaoConfiguration.class) @Transactional @Rollback -@EnableTransactionManagement public abstract class BaseDaoTest { } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java index c0a841c1a0..fe8545f3e2 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java @@ -34,7 +34,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; import org.springframework.test.context.ActiveProfiles; -import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.Transactional; @ActiveProfiles(ProfileType.H2) @@ -43,7 +42,6 @@ import org.springframework.transaction.annotation.Transactional; @SpringBootTest(classes = DaoConfiguration.class) @Transactional @Rollback -@EnableTransactionManagement public class AlertDaoTest { @Autowired diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index fd262a8c6e..7988570135 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -17,15 +17,18 @@ package org.apache.dolphinscheduler.server.master; +import org.apache.dolphinscheduler.common.CommonConfiguration; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.dao.DaoConfiguration; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryAutoConfiguration; +import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; +import org.apache.dolphinscheduler.registry.api.RegistryConfiguration; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; @@ -35,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap; import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; +import org.apache.dolphinscheduler.service.ServiceConfiguration; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import javax.annotation.PostConstruct; @@ -45,18 +49,15 @@ import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cache.annotation.EnableCaching; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.FilterType; -import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.context.annotation.Import; -@SpringBootApplication -@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { - @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = JdbcRegistryAutoConfiguration.class) -}) -@EnableTransactionManagement -@EnableCaching @Slf4j +@Import({DaoConfiguration.class, + ServiceConfiguration.class, + CommonConfiguration.class, + StorageConfiguration.class, + RegistryConfiguration.class}) +@SpringBootApplication public class MasterServer implements IStoppable { @Autowired @@ -65,9 +66,6 @@ public class MasterServer implements IStoppable { @Autowired private MasterRegistryClient masterRegistryClient; - @Autowired - private TaskPluginManager taskPluginManager; - @Autowired private MasterSchedulerBootstrap masterSchedulerBootstrap; @@ -109,7 +107,7 @@ public class MasterServer implements IStoppable { this.masterRPCServer.start(); // install task plugin - this.taskPluginManager.loadPlugin(); + TaskPluginManager.loadPlugin(); this.masterSlotManager.start(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java index ab1806ff67..b3b0cf67ea 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java @@ -90,9 +90,6 @@ public class TaskExecutionContextFactory { @Autowired private ProcessService processService; - @Autowired - private TaskPluginManager taskPluginManager; - @Autowired private CuringParamsService curingParamsService; @@ -106,14 +103,14 @@ public class TaskExecutionContextFactory { ProcessInstance workflowInstance = taskInstance.getProcessInstance(); ResourceParametersHelper resources = - Optional.ofNullable(taskPluginManager.getTaskChannel(taskInstance.getTaskType())) + Optional.ofNullable(TaskPluginManager.getTaskChannel(taskInstance.getTaskType())) .map(taskChannel -> taskChannel.getResources(taskInstance.getTaskParams())) .orElse(null); setTaskResourceInfo(resources); Map businessParamsMap = curingParamsService.preBuildBusinessParams(workflowInstance); - AbstractParameters baseParam = taskPluginManager.getParameters(ParametersNode.builder() + AbstractParameters baseParam = TaskPluginManager.getParameters(ParametersNode.builder() .taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build()); Map propertyMap = curingParamsService.paramParsingPreparation(taskInstance, baseParam, workflowInstance); diff --git a/dolphinscheduler-master/src/test/resources/logback.xml b/dolphinscheduler-master/src/test/resources/logback.xml index 4470a46391..286e35cd1f 100644 --- a/dolphinscheduler-master/src/test/resources/logback.xml +++ b/dolphinscheduler-master/src/test/resources/logback.xml @@ -65,7 +65,7 @@ - + diff --git a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterAutoConfiguration.java similarity index 87% rename from dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java rename to dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterAutoConfiguration.java index e3b140a578..fa8933d132 100644 --- a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java +++ b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterAutoConfiguration.java @@ -20,7 +20,8 @@ package org.apache.dolphinscheduler.meter; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.apache.dolphinscheduler.meter.metrics.DefaultMetricsProvider; + import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -42,11 +43,15 @@ import io.micrometer.core.instrument.MeterRegistry; * } * */ -@Configuration +@Configuration(proxyBeanMethods = false) @EnableAspectJAutoProxy -@EnableAutoConfiguration @ConditionalOnProperty(prefix = "metrics", name = "enabled", havingValue = "true") -public class MeterConfiguration { +public class MeterAutoConfiguration { + + @Bean + public DefaultMetricsProvider metricsProvider(MeterRegistry meterRegistry) { + return new DefaultMetricsProvider(meterRegistry); + } @Bean public TimedAspect timedAspect(MeterRegistry registry) { diff --git a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java index f1240a0541..e293e44a8d 100644 --- a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java +++ b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java @@ -19,11 +19,8 @@ package org.apache.dolphinscheduler.meter.metrics; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.springframework.stereotype.Component; - import io.micrometer.core.instrument.MeterRegistry; -@Component public class DefaultMetricsProvider implements MetricsProvider { private final MeterRegistry meterRegistry; diff --git a/dolphinscheduler-meter/src/main/resources/META-INF/spring.factories b/dolphinscheduler-meter/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..77bc56d86e --- /dev/null +++ b/dolphinscheduler-meter/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.apache.dolphinscheduler.meter.MeterAutoConfiguration diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryConfiguration.java new file mode 100644 index 0000000000..bae3694964 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryConfiguration.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RegistryConfiguration { + + @Bean + @ConditionalOnMissingBean + public RegistryClient registryClient(Registry registry) { + return new RegistryClient(registry); + } + +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java index 1d1397db54..6833a6607b 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java @@ -41,8 +41,6 @@ import javax.net.ssl.SSLException; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import com.google.common.base.Splitter; @@ -68,8 +66,6 @@ import io.netty.handler.ssl.SslContext; * This is one of the implementation of {@link Registry}, with this implementation, you need to rely on Etcd cluster to * store the DolphinScheduler master/worker's metadata and do the server registry/unRegistry. */ -@Component -@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "etcd") @Slf4j public class EtcdRegistry implements Registry { @@ -86,6 +82,7 @@ public class EtcdRegistry implements Registry { private final Map watcherMap = new ConcurrentHashMap<>(); private static final long TIME_TO_LIVE_SECONDS = 30L; + public EtcdRegistry(EtcdRegistryProperties registryProperties) throws SSLException { ClientBuilder clientBuilder = Client.builder() .endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(registryProperties.getEndpoints()))) @@ -141,8 +138,7 @@ public class EtcdRegistry implements Registry { } /** - * - * @param path The prefix of the key being listened to + * @param path The prefix of the key being listened to * @param listener * @return if subcribe Returns true if no exception was thrown */ @@ -165,8 +161,8 @@ public class EtcdRegistry implements Registry { } /** - * @throws throws an exception if the unsubscribe path does not exist * @param path The prefix of the key being listened to + * @throws throws an exception if the unsubscribe path does not exist */ @Override public void unsubscribe(String path) { @@ -184,7 +180,6 @@ public class EtcdRegistry implements Registry { } /** - * * @return Returns the value corresponding to the key * @throws throws an exception if the key does not exist */ @@ -202,7 +197,6 @@ public class EtcdRegistry implements Registry { } /** - * * @param deleteOnDisconnect Does the put data disappear when the client disconnects */ @Override diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryAutoConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryAutoConfiguration.java new file mode 100644 index 0000000000..1038d312ff --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryAutoConfiguration.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.etcd; + +import org.apache.dolphinscheduler.registry.api.Registry; + +import javax.net.ssl.SSLException; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@ComponentScan +@Configuration(proxyBeanMethods = false) +@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "etcd") +public class EtcdRegistryAutoConfiguration { + + public EtcdRegistryAutoConfiguration() { + log.info("Load EtcdRegistryAutoConfiguration"); + } + + @Bean + @ConditionalOnMissingBean(value = Registry.class) + public EtcdRegistry etcdRegistry(EtcdRegistryProperties etcdRegistryProperties) throws SSLException { + return new EtcdRegistry(etcdRegistryProperties); + } + +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java index faded2a506..babb6dea76 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java @@ -21,13 +21,11 @@ import java.time.Duration; import lombok.Data; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Data @Configuration -@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "etcd") @ConfigurationProperties(prefix = "registry") public class EtcdRegistryProperties { diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/resources/META-INF/spring.factories b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..689817bb96 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.apache.dolphinscheduler.plugin.registry.etcd.EtcdRegistryAutoConfiguration diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java index 09211f99fb..f21ce0d67c 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration; @@ -37,6 +38,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import com.zaxxer.hikari.HikariDataSource; @Slf4j +@ComponentScan @Configuration(proxyBeanMethods = false) @MapperScan("org.apache.dolphinscheduler.plugin.registry.jdbc.mapper") @ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc") diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java index 7333c10f05..3f0c3ccb59 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java @@ -50,14 +50,11 @@ import java.util.concurrent.ConcurrentHashMap; import javax.annotation.PostConstruct; import lombok.NonNull; - -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; +import lombok.extern.slf4j.Slf4j; import com.google.common.base.Strings; -@Component -@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper") +@Slf4j public final class ZookeeperRegistry implements Registry { private final ZookeeperRegistryProperties.ZookeeperProperties properties; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryAutoConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryAutoConfiguration.java new file mode 100644 index 0000000000..e8fc31327e --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryAutoConfiguration.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.zookeeper; + +import org.apache.dolphinscheduler.registry.api.Registry; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@ComponentScan +@Configuration(proxyBeanMethods = false) +@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper") +public class ZookeeperRegistryAutoConfiguration { + + public ZookeeperRegistryAutoConfiguration() { + log.info("Load ZookeeperRegistryAutoConfiguration"); + } + + @Bean + @ConditionalOnMissingBean(value = Registry.class) + public ZookeeperRegistry zookeeperRegistry(ZookeeperRegistryProperties zookeeperRegistryProperties) { + return new ZookeeperRegistry(zookeeperRegistryProperties); + } + +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java index 42dbc5d256..b54ebc0b1a 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java @@ -19,25 +19,19 @@ package org.apache.dolphinscheduler.plugin.registry.zookeeper; import java.time.Duration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import lombok.Data; + import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; +@Data @Configuration -@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper") @ConfigurationProperties(prefix = "registry") public class ZookeeperRegistryProperties { private ZookeeperProperties zookeeper = new ZookeeperProperties(); - public ZookeeperProperties getZookeeper() { - return zookeeper; - } - - public void setZookeeper(ZookeeperProperties zookeeper) { - this.zookeeper = zookeeper; - } - + @Data public static final class ZookeeperProperties { private String namespace; @@ -48,91 +42,13 @@ public class ZookeeperRegistryProperties { private Duration connectionTimeout = Duration.ofSeconds(9); private Duration blockUntilConnected = Duration.ofMillis(600); - public String getNamespace() { - return namespace; - } - - public void setNamespace(String namespace) { - this.namespace = namespace; - } - - public String getConnectString() { - return connectString; - } - - public void setConnectString(String connectString) { - this.connectString = connectString; - } - - public RetryPolicy getRetryPolicy() { - return retryPolicy; - } - - public void setRetryPolicy(RetryPolicy retryPolicy) { - this.retryPolicy = retryPolicy; - } - - public String getDigest() { - return digest; - } - - public void setDigest(String digest) { - this.digest = digest; - } - - public Duration getSessionTimeout() { - return sessionTimeout; - } - - public void setSessionTimeout(Duration sessionTimeout) { - this.sessionTimeout = sessionTimeout; - } - - public Duration getConnectionTimeout() { - return connectionTimeout; - } - - public void setConnectionTimeout(Duration connectionTimeout) { - this.connectionTimeout = connectionTimeout; - } - - public Duration getBlockUntilConnected() { - return blockUntilConnected; - } - - public void setBlockUntilConnected(Duration blockUntilConnected) { - this.blockUntilConnected = blockUntilConnected; - } - + @Data public static final class RetryPolicy { private Duration baseSleepTime = Duration.ofMillis(60); private int maxRetries; private Duration maxSleep = Duration.ofMillis(300); - public Duration getBaseSleepTime() { - return baseSleepTime; - } - - public void setBaseSleepTime(Duration baseSleepTime) { - this.baseSleepTime = baseSleepTime; - } - - public int getMaxRetries() { - return maxRetries; - } - - public void setMaxRetries(int maxRetries) { - this.maxRetries = maxRetries; - } - - public Duration getMaxSleep() { - return maxSleep; - } - - public void setMaxSleep(Duration maxSleep) { - this.maxSleep = maxSleep; - } } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/resources/META-INF/spring.factories b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..821f1a70c1 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.apache.dolphinscheduler.plugin.registry.zookeeper.ZookeeperRegistryAutoConfiguration diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerAutoConfiguration.java similarity index 97% rename from dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java rename to dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerAutoConfiguration.java index 07ff8af434..34fb782587 100644 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerAutoConfiguration.java @@ -28,7 +28,7 @@ import org.springframework.context.annotation.Bean; @AutoConfiguration(after = {QuartzAutoConfiguration.class}) @ConditionalOnClass(value = Scheduler.class) -public class QuartzSchedulerConfiguration { +public class QuartzSchedulerAutoConfiguration { @Bean @ConditionalOnMissingBean diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/resources/META-INF/spring.factories b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..b34f896b84 --- /dev/null +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.apache.dolphinscheduler.scheduler.quartz.QuartzSchedulerAutoConfiguration diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/ServiceConfiguration.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/ServiceConfiguration.java new file mode 100644 index 0000000000..fa831a5b6b --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/ServiceConfiguration.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service; + +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +@ComponentScan("org.apache.dolphinscheduler.service") +@Configuration +public class ServiceConfiguration { +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 3c207ae982..635948fc86 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -111,7 +111,6 @@ import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcCli import org.apache.dolphinscheduler.extract.common.ILogService; import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener; import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState; @@ -262,9 +261,6 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private WorkFlowLineageMapper workFlowLineageMapper; - @Autowired - private TaskPluginManager taskPluginManager; - @Autowired private ClusterMapper clusterMapper; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java deleted file mode 100644 index e00212823a..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.constants.DataSourceConstants; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang3.StringUtils; - -import java.net.URL; -import java.nio.charset.StandardCharsets; - -import lombok.extern.slf4j.Slf4j; - -/** - * common utils - */ -@Slf4j -public class CommonUtils { - - private static final Base64 BASE64 = new Base64(); - - protected CommonUtils() { - throw new UnsupportedOperationException("Construct CommonUtils"); - } - - /** - * @return get the path of system environment variables - */ - public static String getSystemEnvPath() { - String envPath = PropertyUtils.getString(Constants.DOLPHINSCHEDULER_ENV_PATH); - if (StringUtils.isEmpty(envPath)) { - URL envDefaultPath = CommonUtils.class.getClassLoader().getResource(Constants.ENV_PATH); - - if (envDefaultPath != null) { - envPath = envDefaultPath.getPath(); - log.debug("env path :{}", envPath); - } else { - envPath = "/etc/profile"; - } - } - - return envPath; - } - - /** - * encode password - */ - public static String encodePassword(String password) { - if (StringUtils.isEmpty(password)) { - return StringUtils.EMPTY; - } - // if encryption is not turned on, return directly - boolean encryptionEnable = PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false); - if (!encryptionEnable) { - return password; - } - - // Using Base64 + salt to process password - String salt = PropertyUtils.getString(DataSourceConstants.DATASOURCE_ENCRYPTION_SALT, - DataSourceConstants.DATASOURCE_ENCRYPTION_SALT_DEFAULT); - String passwordWithSalt = salt + new String(BASE64.encode(password.getBytes(StandardCharsets.UTF_8))); - return new String(BASE64.encode(passwordWithSalt.getBytes(StandardCharsets.UTF_8))); - } - - /** - * decode password - */ - public static String decodePassword(String password) { - if (StringUtils.isEmpty(password)) { - return StringUtils.EMPTY; - } - - // if encryption is not turned on, return directly - boolean encryptionEnable = PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false); - if (!encryptionEnable) { - return password; - } - - // Using Base64 + salt to process password - String salt = PropertyUtils.getString(DataSourceConstants.DATASOURCE_ENCRYPTION_SALT, - DataSourceConstants.DATASOURCE_ENCRYPTION_SALT_DEFAULT); - String passwordWithSalt = new String(BASE64.decode(password), StandardCharsets.UTF_8); - if (!passwordWithSalt.startsWith(salt)) { - log.warn("There is a password and salt mismatch: {} ", password); - return password; - } - return new String(BASE64.decode(passwordWithSalt.substring(salt.length())), StandardCharsets.UTF_8); - } - -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 509f9a4cf1..5c998e1f9f 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -79,7 +79,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.apache.dolphinscheduler.service.cron.CronUtilsTest; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; @@ -102,8 +101,6 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * process service test @@ -112,7 +109,6 @@ import org.slf4j.LoggerFactory; @MockitoSettings(strictness = Strictness.LENIENT) public class ProcessServiceTest { - private static final Logger logger = LoggerFactory.getLogger(CronUtilsTest.class); @InjectMocks private ProcessServiceImpl processService; @Mock @@ -667,7 +663,6 @@ public class ProcessServiceTest { taskDefinition.setVersion(1); taskDefinition.setCreateTime(new Date()); taskDefinition.setUpdateTime(new Date()); - when(taskPluginManager.getParameters(any())).thenReturn(null); when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(), taskDefinition.getVersion())).thenReturn(taskDefinition); when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java deleted file mode 100644 index cab280a702..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.common.utils.FileUtils; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * configuration test - */ -@ExtendWith(MockitoExtension.class) -public class CommonUtilsTest { - - private static final Logger logger = LoggerFactory.getLogger(CommonUtilsTest.class); - - @Test - public void getSystemEnvPath() { - String envPath; - envPath = CommonUtils.getSystemEnvPath(); - Assertions.assertEquals("/etc/profile", envPath); - } - - @Test - public void getDownloadFilename() { - logger.info(FileUtils.getDownloadFilename("a.txt")); - Assertions.assertTrue(true); - } - - @Test - public void getUploadFilename() { - logger.info(FileUtils.getUploadFilename("1234", "a.txt")); - Assertions.assertTrue(true); - } - - @Test - public void test() { - InetAddress ip; - try { - ip = InetAddress.getLocalHost(); - logger.info(ip.getHostAddress()); - } catch (UnknownHostException e) { - e.printStackTrace(); - } - Assertions.assertTrue(true); - } - -} diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java index 14808ab629..a39d3c9a22 100644 --- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java +++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java @@ -24,8 +24,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -@SpringBootApplication @Slf4j +@SpringBootApplication public class StandaloneServer { public static void main(String[] args) throws Exception { diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 654113471d..5122eea2a1 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -329,4 +329,4 @@ spring: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 username: root - password: root@123 + password: root diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java index be3417466f..938aa77459 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java @@ -36,21 +36,18 @@ import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Component @Slf4j public class TaskPluginManager { - private final Map taskChannelFactoryMap = new HashMap<>(); - private final Map taskChannelMap = new HashMap<>(); + private static final Map taskChannelFactoryMap = new HashMap<>(); + private static final Map taskChannelMap = new HashMap<>(); - private final AtomicBoolean loadedFlag = new AtomicBoolean(false); + private static final AtomicBoolean loadedFlag = new AtomicBoolean(false); /** * Load task plugins from classpath. */ - public void loadPlugin() { + public static void loadPlugin() { if (!loadedFlag.compareAndSet(false, true)) { log.warn("The task plugin has already been loaded"); return; @@ -70,24 +67,24 @@ public class TaskPluginManager { } - public Map getTaskChannelMap() { + public static Map getTaskChannelMap() { return Collections.unmodifiableMap(taskChannelMap); } - public Map getTaskChannelFactoryMap() { + public static Map getTaskChannelFactoryMap() { return Collections.unmodifiableMap(taskChannelFactoryMap); } - public TaskChannel getTaskChannel(String type) { - return this.getTaskChannelMap().get(type); + public static TaskChannel getTaskChannel(String type) { + return getTaskChannelMap().get(type); } - public boolean checkTaskParameters(ParametersNode parametersNode) { - AbstractParameters abstractParameters = this.getParameters(parametersNode); + public static boolean checkTaskParameters(ParametersNode parametersNode) { + AbstractParameters abstractParameters = getParameters(parametersNode); return abstractParameters != null && abstractParameters.checkParameters(); } - public AbstractParameters getParameters(ParametersNode parametersNode) { + public static AbstractParameters getParameters(ParametersNode parametersNode) { String taskType = parametersNode.getTaskType(); if (Objects.isNull(taskType)) { return null; @@ -106,7 +103,7 @@ public class TaskPluginManager { case TaskConstants.TASK_TYPE_DYNAMIC: return JSONUtils.parseObject(parametersNode.getTaskParams(), DynamicParameters.class); default: - TaskChannel taskChannel = this.getTaskChannelMap().get(taskType); + TaskChannel taskChannel = getTaskChannelMap().get(taskType); if (Objects.isNull(taskChannel)) { return null; } diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java index aa9553a308..0588b20dda 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java @@ -23,12 +23,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; -@ImportAutoConfiguration(DaoConfiguration.class) +@Import(DaoConfiguration.class) @SpringBootApplication public class UpgradeDolphinScheduler { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index b0866f7fd8..8dd842a8ed 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker; +import org.apache.dolphinscheduler.common.CommonConfiguration; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; @@ -24,11 +25,12 @@ import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryAutoConfiguration; +import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; +import org.apache.dolphinscheduler.registry.api.RegistryConfiguration; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; @@ -47,24 +49,18 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.FilterType; -import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.context.annotation.Import; -@SpringBootApplication -@EnableTransactionManagement -@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { - @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = JdbcRegistryAutoConfiguration.class) -}) @Slf4j +@Import({CommonConfiguration.class, + StorageConfiguration.class, + RegistryConfiguration.class}) +@SpringBootApplication public class WorkerServer implements IStoppable { @Autowired private WorkerRegistryClient workerRegistryClient; - @Autowired - private TaskPluginManager taskPluginManager; - @Autowired private WorkerRpcServer workerRpcServer; @@ -89,7 +85,7 @@ public class WorkerServer implements IStoppable { @PostConstruct public void run() { this.workerRpcServer.start(); - this.taskPluginManager.loadPlugin(); + TaskPluginManager.loadPlugin(); this.workerRegistryClient.setRegistryStoppable(this); this.workerRegistryClient.start(); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java index 19421ee05e..ea44f1790f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; @@ -35,13 +34,11 @@ public class DefaultWorkerTaskExecutor extends WorkerTaskExecutor { public DefaultWorkerTaskExecutor(@NonNull TaskExecutionContext taskExecutionContext, @NonNull WorkerConfig workerConfig, @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, @Nullable StorageOperate storageOperate, @NonNull WorkerRegistryClient workerRegistryClient) { super(taskExecutionContext, workerConfig, workerMessageSender, - taskPluginManager, storageOperate, workerRegistryClient); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java index 0141a5cd17..20fa6a5e2a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; @@ -35,20 +34,17 @@ public class DefaultWorkerTaskExecutorFactory private final @NonNull TaskExecutionContext taskExecutionContext; private final @NonNull WorkerConfig workerConfig; private final @NonNull WorkerMessageSender workerMessageSender; - private final @NonNull TaskPluginManager taskPluginManager; private final @Nullable StorageOperate storageOperate; private final @NonNull WorkerRegistryClient workerRegistryClient; public DefaultWorkerTaskExecutorFactory(@NonNull TaskExecutionContext taskExecutionContext, @NonNull WorkerConfig workerConfig, @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, @Nullable StorageOperate storageOperate, @NonNull WorkerRegistryClient workerRegistryClient) { this.taskExecutionContext = taskExecutionContext; this.workerConfig = workerConfig; this.workerMessageSender = workerMessageSender; - this.taskPluginManager = taskPluginManager; this.storageOperate = storageOperate; this.workerRegistryClient = workerRegistryClient; } @@ -59,7 +55,6 @@ public class DefaultWorkerTaskExecutorFactory taskExecutionContext, workerConfig, workerMessageSender, - taskPluginManager, storageOperate, workerRegistryClient); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java index f713605a48..41cef62028 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java @@ -75,7 +75,6 @@ public abstract class WorkerTaskExecutor implements Runnable { protected final TaskExecutionContext taskExecutionContext; protected final WorkerConfig workerConfig; protected final WorkerMessageSender workerMessageSender; - protected final TaskPluginManager taskPluginManager; protected final @Nullable StorageOperate storageOperate; protected final WorkerRegistryClient workerRegistryClient; @@ -85,13 +84,11 @@ public abstract class WorkerTaskExecutor implements Runnable { @NonNull TaskExecutionContext taskExecutionContext, @NonNull WorkerConfig workerConfig, @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, @Nullable StorageOperate storageOperate, @NonNull WorkerRegistryClient workerRegistryClient) { this.taskExecutionContext = taskExecutionContext; this.workerConfig = workerConfig; this.workerMessageSender = workerMessageSender; - this.taskPluginManager = taskPluginManager; this.storageOperate = storageOperate; this.workerRegistryClient = workerRegistryClient; SensitiveDataConverter.addMaskPattern(K8S_CONFIG_REGEX); @@ -220,7 +217,7 @@ public abstract class WorkerTaskExecutor implements Runnable { log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath()); TaskChannel taskChannel = - Optional.ofNullable(taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType())) + Optional.ofNullable(TaskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType())) .orElseThrow(() -> new TaskPluginException(taskExecutionContext.getTaskType() + " task plugin not found, please check the task type is correct.")); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java index 599746818d..56f3207884 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; @@ -36,12 +35,6 @@ public class WorkerTaskExecutorFactoryBuilder { @Autowired private WorkerMessageSender workerMessageSender; - @Autowired - private TaskPluginManager taskPluginManager; - - @Autowired - private WorkerTaskExecutorThreadPool workerManager; - @Autowired(required = false) private StorageOperate storageOperate; @@ -51,14 +44,11 @@ public class WorkerTaskExecutorFactoryBuilder { public WorkerTaskExecutorFactoryBuilder( WorkerConfig workerConfig, WorkerMessageSender workerMessageSender, - TaskPluginManager taskPluginManager, WorkerTaskExecutorThreadPool workerManager, StorageOperate storageOperate, WorkerRegistryClient workerRegistryClient) { this.workerConfig = workerConfig; this.workerMessageSender = workerMessageSender; - this.taskPluginManager = taskPluginManager; - this.workerManager = workerManager; this.storageOperate = storageOperate; this.workerRegistryClient = workerRegistryClient; } @@ -67,7 +57,6 @@ public class WorkerTaskExecutorFactoryBuilder { return new DefaultWorkerTaskExecutorFactory(taskExecutionContext, workerConfig, workerMessageSender, - taskPluginManager, storageOperate, workerRegistryClient); } diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java index 43ef6f87d0..e211fcdd1f 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; @@ -40,8 +39,6 @@ public class DefaultWorkerTaskExecutorTest { private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class); - private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class); - private StorageOperate storageOperate = Mockito.mock(StorageOperate.class); private WorkerRegistryClient workerRegistryClient = Mockito.mock(WorkerRegistryClient.class); @@ -58,7 +55,6 @@ public class DefaultWorkerTaskExecutorTest { taskExecutionContext, workerConfig, workerMessageSender, - taskPluginManager, storageOperate, workerRegistryClient); @@ -82,7 +78,6 @@ public class DefaultWorkerTaskExecutorTest { taskExecutionContext, workerConfig, workerMessageSender, - taskPluginManager, storageOperate, workerRegistryClient); diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java index 988f1f7fec..182ac6a1c2 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java @@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; @@ -68,7 +67,7 @@ class WorkerTaskExecutorThreadPoolTest { protected MockWorkerTaskExecutor(Runnable runnable) { super(TaskExecutionContext.builder().taskInstanceId((int) System.nanoTime()).build(), new WorkerConfig(), - new WorkerMessageSender(), new TaskPluginManager(), new StorageOperate() { + new WorkerMessageSender(), new StorageOperate() { @Override public void createTenantDirIfNotExists(String tenantCode) { diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java index 592340214f..f761dd61d6 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java @@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHost import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; @@ -70,8 +69,6 @@ public class TaskInstanceOperationFunctionTest { private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class); - private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class); - private WorkerTaskExecutorThreadPool workerManager = Mockito.mock(WorkerTaskExecutorThreadPool.class); private StorageOperate storageOperate = Mockito.mock(StorageOperate.class); @@ -94,7 +91,6 @@ public class TaskInstanceOperationFunctionTest { WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = new WorkerTaskExecutorFactoryBuilder( workerConfig, workerMessageSender, - taskPluginManager, workerManager, storageOperate, workerRegistryClient); @@ -189,7 +185,6 @@ public class TaskInstanceOperationFunctionTest { WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = new WorkerTaskExecutorFactoryBuilder( workerConfig, workerMessageSender, - taskPluginManager, workerManager, storageOperate, workerRegistryClient); diff --git a/dolphinscheduler-worker/src/test/resources/logback.xml b/dolphinscheduler-worker/src/test/resources/logback.xml index d63ea4a0b5..10fc24d974 100644 --- a/dolphinscheduler-worker/src/test/resources/logback.xml +++ b/dolphinscheduler-worker/src/test/resources/logback.xml @@ -22,7 +22,8 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - + [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n UTF-8 @@ -60,18 +61,15 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - + [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n UTF-8 - - - - - - + +