Browse Source

[DSIP-28] Donnot scan whole bean under classpath (#15874)

3.2.2-release-bak
Wenjun Ruan 7 months ago committed by GitHub
parent
commit
285c5a8eb5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 13
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  2. 26
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
  3. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java
  4. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  5. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  6. 75
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  7. 38
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  8. 490
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  9. 57
      dolphinscheduler-api/src/test/resources/logback-spring.xml
  10. 26
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/CommonConfiguration.java
  11. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java
  12. 124
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/HttpProperty.java
  13. 39
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-api/src/main/java/org/apache/dolphinscheduler/dao/plugin/api/DatabaseEnvironmentCondition.java
  14. 8
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginAutoConfiguration.java
  15. 28
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DatabaseEnvironmentCondition.java
  16. 19
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/resources/META-INF/spring.factories
  17. 8
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginAutoConfiguration.java
  18. 28
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDatabaseEnvironmentCondition.java
  19. 19
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/resources/META-INF/spring.factories
  20. 8
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginAutoConfiguration.java
  21. 28
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDatabaseEnvironmentCondition.java
  22. 19
      dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/resources/META-INF/spring.factories
  23. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
  24. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/BaseDaoTest.java
  25. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java
  26. 28
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  27. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java
  28. 2
      dolphinscheduler-master/src/test/resources/logback.xml
  29. 13
      dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterAutoConfiguration.java
  30. 3
      dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java
  31. 19
      dolphinscheduler-meter/src/main/resources/META-INF/spring.factories
  32. 33
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryConfiguration.java
  33. 12
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
  34. 48
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryAutoConfiguration.java
  35. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java
  36. 19
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/resources/META-INF/spring.factories
  37. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java
  38. 7
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
  39. 46
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryAutoConfiguration.java
  40. 94
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
  41. 19
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/resources/META-INF/spring.factories
  42. 2
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerAutoConfiguration.java
  43. 19
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/resources/META-INF/spring.factories
  44. 26
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/ServiceConfiguration.java
  45. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  46. 108
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java
  47. 5
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  48. 71
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java
  49. 2
      dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
  50. 2
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  51. 27
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
  52. 4
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java
  53. 22
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  54. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java
  55. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java
  56. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
  57. 11
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
  58. 5
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java
  59. 3
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
  60. 5
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java
  61. 14
      dolphinscheduler-worker/src/test/resources/logback.xml

13
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -23,11 +23,13 @@ import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.ListenerEventPostService;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryAutoConfiguration;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -37,14 +39,13 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.context.annotation.Import;
@Slf4j
@Import({CommonConfiguration.class,
DaoConfiguration.class,
RegistryConfiguration.class})
@SpringBootApplication
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = JdbcRegistryAutoConfiguration.class)
})
public class AlertServer {
@Autowired

26
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

@ -18,13 +18,17 @@
package org.apache.dolphinscheduler.api;
import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryAutoConfiguration;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
import org.apache.dolphinscheduler.service.ServiceConfiguration;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
@ -38,21 +42,19 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.context.annotation.Import;
import org.springframework.context.event.EventListener;
@Slf4j
@Import({DaoConfiguration.class,
CommonConfiguration.class,
ServiceConfiguration.class,
StorageConfiguration.class,
RegistryConfiguration.class})
@ServletComponentScan
@SpringBootApplication
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = JdbcRegistryAutoConfiguration.class)
})
@Slf4j
public class ApiApplicationServer {
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private PluginDao pluginDao;
@ -66,8 +68,8 @@ public class ApiApplicationServer {
public void run(ApplicationReadyEvent readyEvent) {
log.info("Received spring application context ready event will load taskPlugin and write to DB");
// install task plugin
taskPluginManager.loadPlugin();
for (Map.Entry<String, TaskChannelFactory> entry : taskPluginManager.getTaskChannelFactoryMap().entrySet()) {
TaskPluginManager.loadPlugin();
for (Map.Entry<String, TaskChannelFactory> entry : TaskPluginManager.getTaskChannelFactoryMap().entrySet()) {
String taskPluginName = entry.getKey();
TaskChannelFactory taskChannelFactory = entry.getValue();
List<PluginParams> params = taskChannelFactory.getParams();

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskUpdateRequest.java

@ -25,10 +25,10 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.commons.beanutils.BeanUtils;
import java.lang.reflect.InvocationTargetException;
import java.util.Date;
import lombok.Data;
import lombok.SneakyThrows;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
@ -107,7 +107,8 @@ public class TaskUpdateRequest {
* @param taskDefinition exists task definition object
* @return task definition
*/
public TaskDefinition mergeIntoTaskDefinition(TaskDefinition taskDefinition) throws InvocationTargetException, IllegalAccessException, InstantiationException, NoSuchMethodException {
@SneakyThrows
public TaskDefinition mergeIntoTaskDefinition(TaskDefinition taskDefinition) {
TaskDefinition taskDefinitionDeepCopy = (TaskDefinition) BeanUtils.cloneBean(taskDefinition);
assert taskDefinitionDeepCopy != null;
if (this.name != null) {

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -238,9 +238,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private DataSourceMapper dataSourceMapper;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private WorkFlowLineageService workFlowLineageService;
@ -424,7 +421,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJson);
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionLog.getTaskType())
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
@ -1618,7 +1615,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// check whether the process definition json is normal
for (TaskNode taskNode : taskNodes) {
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskNode.getType())
.taskParams(taskNode.getTaskParams())
.dependence(taskNode.getDependence())

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -70,11 +70,9 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.RelationSubWorkflowMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
@ -177,18 +175,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
UsersService usersService;
@Autowired
private TenantMapper tenantMapper;
@Autowired
TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private ScheduleMapper scheduleMapper;
@Autowired
private RelationSubWorkflowMapper relationSubWorkflowMapper;
@ -725,7 +714,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionLog.getTaskType())
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())

75
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -75,7 +75,6 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -135,9 +134,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessService processService;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private ProcessDefinitionService processDefinitionService;
@ -147,8 +143,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* create task definition
*
* @param loginUser login user
* @param projectCode project code
* @param loginUser login user
* @param projectCode project code
* @param taskDefinitionJson task definition json
*/
@Transactional
@ -171,7 +167,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionLog.getTaskType())
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
@ -212,7 +208,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
Project project = projectMapper.queryByCode(taskDefinition.getProjectCode());
projectService.checkProjectAndAuthThrowException(user, project, permissions);
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinition.getTaskType())
.taskParams(taskDefinition.getTaskParams())
.dependence(taskDefinition.getDependence())
@ -242,7 +238,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* Create resource task definition
*
* @param loginUser login user
* @param loginUser login user
* @param taskCreateRequest task definition json
* @return new TaskDefinition have created
*/
@ -286,11 +282,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* create single task definition that binds the workflow
*
* @param loginUser login user
* @param projectCode project code
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param taskDefinitionJsonObj task definition json object
* @param upstreamCodes upstream task codes, sep comma
* @param upstreamCodes upstream task codes, sep comma
* @return create result code
*/
@Transactional
@ -325,7 +321,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return result;
}
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinition.getTaskType())
.taskParams(taskDefinition.getTaskParams())
.dependence(taskDefinition.getDependence())
@ -412,10 +408,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* query task definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param processCode process code
* @param taskName task name
* @param taskName task name
*/
@Override
public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, long processCode,
@ -473,12 +469,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* Delete resource task definition by code
*
* <p>
* Only task release state offline and no downstream tasks can be deleted, will also remove the exists
* task relation [upstreamTaskCode, taskCode]
*
* @param loginUser login user
* @param taskCode task code
* @param taskCode task code
*/
@Transactional
@Override
@ -546,9 +542,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* update task definition
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param taskDefinitionJsonObj task definition json object
*/
@Transactional
@ -604,8 +600,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* update task definition
*
* @param loginUser login user
* @param taskCode task code
* @param loginUser login user
* @param taskCode task code
* @param taskUpdateRequest task definition json object
* @return new TaskDefinition have updated
*/
@ -619,13 +615,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
throw new ServiceException(Status.TASK_DEFINITION_NOT_EXISTS, taskCode);
}
TaskDefinition taskDefinitionUpdate;
try {
taskDefinitionUpdate = taskUpdateRequest.mergeIntoTaskDefinition(taskDefinitionOriginal);
} catch (InvocationTargetException | IllegalAccessException | InstantiationException
| NoSuchMethodException e) {
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, taskUpdateRequest.toString());
}
TaskDefinition taskDefinitionUpdate = taskUpdateRequest.mergeIntoTaskDefinition(taskDefinitionOriginal);
this.checkTaskDefinitionValid(loginUser, taskDefinitionUpdate, TASK_DEFINITION_UPDATE);
this.TaskDefinitionUpdateValid(taskDefinitionOriginal, taskDefinitionUpdate);
@ -656,7 +647,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* Get resource task definition by code
*
* @param loginUser login user
* @param taskCode task code
* @param taskCode task code
* @return TaskDefinition
*/
@Override
@ -674,7 +665,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* Get resource task definition according to query parameter
*
* @param loginUser login user
* @param loginUser login user
* @param taskFilterRequest taskFilterRequest object you want to filter the resource task definitions
* @return TaskDefinitions of page
*/
@ -741,7 +732,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return null;
}
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionToUpdate.getTaskType())
.taskParams(taskDefinitionToUpdate.getTaskParams())
.dependence(taskDefinitionToUpdate.getDependence())
@ -846,11 +837,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* update task definition and upstream
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task definition code
* @param loginUser login user
* @param projectCode project code
* @param taskCode task definition code
* @param taskDefinitionJsonObj task definition json object
* @param upstreamCodes upstream task codes, sep comma
* @param upstreamCodes upstream task codes, sep comma
* @return update result code
*/
@Override
@ -1019,10 +1010,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* switch task definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param version the version user want to switch
* @param taskCode task code
* @param version the version user want to switch
*/
@Transactional
@Override
@ -1277,9 +1268,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
/**
* release task definition
*
* @param loginUser login user
* @param projectCode project code
* @param code task definition code
* @param loginUser login user
* @param projectCode project code
* @param code task definition code
* @param releaseState releaseState
* @return update result code
*/

38
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -84,6 +84,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
@ -142,9 +143,6 @@ public class ProcessInstanceServiceTest {
@Mock
TaskDefinitionMapper taskDefinitionMapper;
@Mock
TaskPluginManager taskPluginManager;
@Mock
ScheduleMapper scheduleMapper;
@ -625,21 +623,27 @@ public class ProcessInstanceServiceTest {
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
when(processDefinitionService.checkProcessNodeList(taskRelationJson, taskDefinitionLogs)).thenReturn(result);
putMsg(result, Status.SUCCESS, projectCode);
when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Map<String, Object> processInstanceFinishRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
taskRelationJson, taskDefinitionJson, "2020-02-21 00:00:00", true, "", "", 0);
Assertions.assertEquals(Status.SUCCESS, processInstanceFinishRes.get(Constants.STATUS));
// success
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.FALSE, Boolean.FALSE))
.thenReturn(1);
Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
taskRelationJson, taskDefinitionJson, "2020-02-21 00:00:00", Boolean.FALSE, "", "", 0);
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any()))
.thenReturn(true);
Map<String, Object> processInstanceFinishRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
taskRelationJson, taskDefinitionJson, "2020-02-21 00:00:00", true, "", "", 0);
Assertions.assertEquals(Status.SUCCESS, processInstanceFinishRes.get(Constants.STATUS));
// success
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.FALSE, Boolean.FALSE))
.thenReturn(1);
Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
taskRelationJson, taskDefinitionJson, "2020-02-21 00:00:00", Boolean.FALSE, "", "", 0);
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
}
@Test

490
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -17,13 +17,19 @@
package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow;
import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
import org.apache.dolphinscheduler.api.dto.task.TaskUpdateRequest;
@ -75,13 +81,17 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class TaskDefinitionServiceImplTest {
@InjectMocks
@ -114,9 +124,6 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock
private TaskPluginManager taskPluginManager;
@Mock
private ProcessTaskRelationService processTaskRelationService;
@ -155,61 +162,73 @@ public class TaskDefinitionServiceImplTest {
@Test
public void createTaskDefinition() {
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
Mockito.when(projectService.hasProjectAndWritePerm(user, project, result))
.thenReturn(true);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any()))
.thenReturn(true);
Project project = getProject();
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
when(projectService.hasProjectAndWritePerm(user, project, result))
.thenReturn(true);
String createTaskDefinitionJson =
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(user, PROJECT_CODE, createTaskDefinitionJson);
assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
String createTaskDefinitionJson =
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(user, PROJECT_CODE, createTaskDefinitionJson);
Assertions.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
}
@Test
public void updateTaskDefinition() {
String taskDefinitionJson = getTaskDefinitionJson();;
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, PROJECT_CODE);
Mockito.when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true);
Mockito.when(processService.isTaskOnline(TASK_CODE)).thenReturn(Boolean.FALSE);
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1);
Mockito.when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition());
Mockito.when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1);
Mockito.when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Mockito.when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0))
.thenReturn(getProcessTaskRelationList2());
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
// failure
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson));
Assertions.assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
((ServiceException) exception).getCode());
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any()))
.thenReturn(true);
String taskDefinitionJson = getTaskDefinitionJson();
Project project = getProject();
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, PROJECT_CODE);
when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true);
when(processService.isTaskOnline(TASK_CODE)).thenReturn(Boolean.FALSE);
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition());
when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1);
when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition());
when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1);
when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1);
when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0))
.thenReturn(getProcessTaskRelationList2());
when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
// failure
when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE,
taskDefinitionJson));
assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
((ServiceException) exception).getCode());
}
}
@ -217,72 +236,72 @@ public class TaskDefinitionServiceImplTest {
public void queryTaskDefinitionByName() {
String taskName = "task";
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, PROJECT_CODE);
Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION))
when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION))
.thenReturn(result);
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), PROCESS_DEFINITION_CODE, taskName))
when(taskDefinitionMapper.queryByName(project.getCode(), PROCESS_DEFINITION_CODE, taskName))
.thenReturn(new TaskDefinition());
Map<String, Object> relation = taskDefinitionService
.queryTaskDefinitionByName(user, PROJECT_CODE, PROCESS_DEFINITION_CODE, taskName);
Assertions.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@Test
public void deleteTaskDefinitionByCode() {
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
// error task definition not find
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error delete single task definition object
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
Mockito.when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(0);
Mockito.when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true);
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(0);
when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR.getCode(),
assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project,
doNothing().when(projectService).checkProjectAndAuthThrowException(user, project,
TASK_DEFINITION_DELETE);
Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(TASK_CODE)).thenReturn(new ArrayList<>());
Mockito.when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(1);
when(processTaskRelationMapper.queryDownstreamByTaskCode(TASK_CODE)).thenReturn(new ArrayList<>());
when(taskDefinitionMapper.deleteByCode(TASK_CODE)).thenReturn(1);
Assertions.assertDoesNotThrow(() -> taskDefinitionService.deleteTaskDefinitionByCode(user, TASK_CODE));
}
@Test
public void switchVersion() {
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, PROJECT_CODE);
Mockito.when(
when(
projectService.checkProjectAndAuth(user, project, PROJECT_CODE, WORKFLOW_SWITCH_TO_THIS_VERSION))
.thenReturn(result);
.thenReturn(result);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, VERSION))
when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, VERSION))
.thenReturn(new TaskDefinitionLog());
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(PROJECT_CODE);
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE))
when(taskDefinitionMapper.queryByCode(TASK_CODE))
.thenReturn(taskDefinition);
Mockito.when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1);
when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.switchVersion(user, PROJECT_CODE, TASK_CODE, VERSION);
Assertions.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
@ -331,7 +350,7 @@ public class TaskDefinitionServiceImplTest {
@Test
public void genTaskCodeList() {
Map<String, Object> genTaskCodeList = taskDefinitionService.genTaskCodeList(10);
Assertions.assertEquals(Status.SUCCESS, genTaskCodeList.get(Constants.STATUS));
assertEquals(Status.SUCCESS, genTaskCodeList.get(Constants.STATUS));
}
@Test
@ -348,31 +367,31 @@ public class TaskDefinitionServiceImplTest {
taskMainInfo.setUpstreamTaskName("4");
taskMainInfoIPage.setRecords(Collections.singletonList(taskMainInfo));
taskMainInfoIPage.setTotal(10L);
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION))
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, TASK_DEFINITION))
.thenReturn(checkResult);
Mockito.when(taskDefinitionMapper.queryDefineListPaging(Mockito.any(Page.class), Mockito.anyLong(),
when(taskDefinitionMapper.queryDefineListPaging(Mockito.any(Page.class), Mockito.anyLong(),
Mockito.isNull(), Mockito.anyString(), Mockito.isNull()))
.thenReturn(taskMainInfoIPage);
Mockito.when(taskDefinitionMapper.queryDefineListByCodeList(PROJECT_CODE, Collections.singletonList(3L)))
.thenReturn(taskMainInfoIPage);
when(taskDefinitionMapper.queryDefineListByCodeList(PROJECT_CODE, Collections.singletonList(3L)))
.thenReturn(Collections.singletonList(taskMainInfo));
Result result = taskDefinitionService.queryTaskDefinitionListPaging(user, PROJECT_CODE,
null, null, null, pageNo, pageSize);
Assertions.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
@Test
public void testReleaseTaskDefinition() {
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
Project project = getProject();
// check task dose not exist
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, TASK_CODE);
Mockito.when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, null)).thenReturn(result);
when(projectService.checkProjectAndAuth(user, project, PROJECT_CODE, null)).thenReturn(result);
Map<String, Object> map =
taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE);
Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST, map.get(Constants.STATUS));
assertEquals(Status.TASK_DEFINE_NOT_EXIST, map.get(Constants.STATUS));
// process definition offline
putMsg(result, Status.SUCCESS);
@ -384,23 +403,23 @@ public class TaskDefinitionServiceImplTest {
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";
taskDefinition.setTaskParams(params);
taskDefinition.setTaskType("SHELL");
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, taskDefinition.getVersion()))
when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, taskDefinition.getVersion()))
.thenReturn(taskDefinitionLog);
Map<String, Object> offlineTaskResult =
taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE);
Assertions.assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS));
assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS));
// process definition online, resource exist
Map<String, Object> onlineTaskResult =
taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.ONLINE);
Assertions.assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS));
assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS));
// release error code
Map<String, Object> failResult =
taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.getEnum(2));
Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS));
assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS));
}
@Test
@ -410,133 +429,127 @@ public class TaskDefinitionServiceImplTest {
taskCreateRequest.setWorkflowCode(PROCESS_DEFINITION_CODE);
// error process definition not find
exception = Assertions.assertThrows(ServiceException.class,
assertThrowsServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error project not find
Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
.checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE);
exception = Assertions.assertThrows(ServiceException.class,
assertThrowsServiceException(Status.PROJECT_NOT_EXIST,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error task definition
taskCreateRequest.setTaskParams(TASK_PARAMETER);
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(),
TASK_DEFINITION_CREATE);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(),
((ServiceException) exception).getCode());
// error create task definition object
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Mockito.when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_CREATE);
assertThrowsServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR.getCode(),
((ServiceException) exception).getCode());
// error sync to task definition log
Mockito.when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_LOG_ERROR.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
Mockito.when(
processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
isA(Boolean.class),
isA(TaskRelationUpdateUpstreamRequest.class)))
.thenReturn(getProcessTaskRelationList());
Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class),
isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition());
Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any()))
.thenReturn(true);
// error create task definition object
when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(0);
assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_ERROR,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
// error sync to task definition log
when(taskDefinitionMapper.insert(isA(TaskDefinition.class))).thenReturn(1);
when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0);
assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR,
() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
// success
when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
when(
processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
isA(Boolean.class),
isA(TaskRelationUpdateUpstreamRequest.class)))
.thenReturn(getProcessTaskRelationList());
when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class),
isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition());
assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
}
}
@Test
public void testUpdateTaskDefinitionV2() {
TaskUpdateRequest taskUpdateRequest = new TaskUpdateRequest();
TaskDefinition taskDefinition = getTaskDefinition();
Project project = getProject();
// error task definition not exists
exception = Assertions.assertThrows(ServiceException.class,
assertThrowsServiceException(Status.TASK_DEFINITION_NOT_EXISTS,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.TASK_DEFINITION_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
// error project not find
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
Mockito.when(projectMapper.queryByCode(isA(Long.class))).thenReturn(getProject());
Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
.checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION_UPDATE);
exception = Assertions.assertThrows(ServiceException.class,
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
when(projectMapper.queryByCode(isA(Long.class))).thenReturn(project);
doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
.checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_UPDATE);
assertThrowsServiceException(Status.PROJECT_NOT_EXIST,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error task definition
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(),
TASK_DEFINITION_UPDATE);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(),
((ServiceException) exception).getCode());
// error task definition already online
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID.getCode(),
((ServiceException) exception).getCode());
// error task definition nothing update
Mockito.when(processService.isTaskOnline(TASK_CODE)).thenReturn(false);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.TASK_DEFINITION_NOT_CHANGE.getCode(), ((ServiceException) exception).getCode());
// error task definition version invalid
taskUpdateRequest.setTaskPriority(String.valueOf(Priority.HIGH));
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ((ServiceException) exception).getCode());
// error task definition update effect number
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(VERSION);
Mockito.when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.UPDATE_TASK_DEFINITION_ERROR.getCode(),
((ServiceException) exception).getCode());
// error task definition log insert
Mockito.when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
Assertions.assertEquals(Status.CREATE_TASK_DEFINITION_LOG_ERROR.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
Mockito.when(
processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
isA(Boolean.class),
isA(TaskRelationUpdateUpstreamRequest.class)))
.thenReturn(getProcessTaskRelationList());
Assertions.assertDoesNotThrow(
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, TASK_DEFINITION_UPDATE);
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any()))
.thenReturn(false);
assertThrowsServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
}
TaskDefinition taskDefinition =
taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest);
Assertions.assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion());
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any()))
.thenReturn(true);
// error task definition nothing update
when(processService.isTaskOnline(TASK_CODE)).thenReturn(false);
assertThrowsServiceException(Status.TASK_DEFINITION_NOT_CHANGE,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// error task definition version invalid
taskUpdateRequest.setTaskPriority(String.valueOf(Priority.HIGH));
assertThrowsServiceException(Status.DATA_IS_NOT_VALID,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// error task definition update effect number
when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(VERSION);
when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(0);
assertThrowsServiceException(Status.UPDATE_TASK_DEFINITION_ERROR,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// error task definition log insert
when(taskDefinitionMapper.updateById(isA(TaskDefinition.class))).thenReturn(1);
when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(0);
assertThrowsServiceException(Status.CREATE_TASK_DEFINITION_LOG_ERROR,
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
// success
when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
when(
processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
isA(Boolean.class),
isA(TaskRelationUpdateUpstreamRequest.class)))
.thenReturn(getProcessTaskRelationList());
Assertions.assertDoesNotThrow(
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
taskDefinition =
taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest);
assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion());
}
}
@Test
@ -549,28 +562,28 @@ public class TaskDefinitionServiceImplTest {
ArrayList<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
taskDefinitionLogs.add(taskDefinitionLog);
Integer version = 1;
Mockito.when(processDefinitionMapper.queryByCode(isA(long.class))).thenReturn(processDefinition);
when(processDefinitionMapper.queryByCode(isA(long.class))).thenReturn(processDefinition);
// saveProcessDefine
Mockito.when(processDefineLogMapper.queryMaxVersionForDefinition(isA(long.class))).thenReturn(version);
Mockito.when(processDefineLogMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
Mockito.when(processDefinitionMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
when(processDefineLogMapper.queryMaxVersionForDefinition(isA(long.class))).thenReturn(version);
when(processDefineLogMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
when(processDefinitionMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
int insertVersion =
processServiceImpl.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE))
when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE))
.thenReturn(insertVersion);
Assertions.assertEquals(insertVersion, version + 1);
assertEquals(insertVersion, version + 1);
// saveTaskRelation
List<ProcessTaskRelationLog> processTaskRelationLogList = getProcessTaskRelationLogList();
Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getCode())))
when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getCode())))
.thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1);
when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1);
when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1);
int insertResult = processServiceImpl.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, processTaskRelationLogList, taskDefinitionLogs,
Boolean.TRUE);
Assertions.assertEquals(Constants.EXIT_CODE_SUCCESS, insertResult);
assertEquals(Constants.EXIT_CODE_SUCCESS, insertResult);
Assertions.assertDoesNotThrow(
() -> taskDefinitionService.updateDag(loginUser, processDefinition.getCode(), processTaskRelationList,
taskDefinitionLogs));
@ -581,55 +594,60 @@ public class TaskDefinitionServiceImplTest {
// error task definition not exists
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
Assertions.assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
assertEquals(Status.TASK_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// error task definition not exists
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
Mockito.doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(getTaskDefinition());
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
.checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
Assertions.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM.getCode(),
((ServiceException) exception).getCode());
// success
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION);
doNothing().when(projectService).checkProjectAndAuthThrowException(user, getProject(), TASK_DEFINITION);
Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
}
@Test
public void testUpdateTaskWithUpstream() {
String taskDefinitionJson = getTaskDefinitionJson();
TaskDefinition taskDefinition = getTaskDefinition();
taskDefinition.setFlag(Flag.NO);
TaskDefinition taskDefinitionSecond = getTaskDefinition();
taskDefinitionSecond.setCode(5);
user.setUserType(UserType.ADMIN_USER);
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
Mockito.when(projectService.hasProjectAndWritePerm(user, getProject(), new HashMap<>())).thenReturn(true);
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
Mockito.when(taskDefinitionMapper.queryByCodeList(Mockito.anySet()))
.thenReturn(Arrays.asList(taskDefinition, taskDefinitionSecond));
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(PROJECT_CODE, TASK_CODE))
.thenReturn(getProcessTaskRelationListV2());
Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
Mockito.when(processTaskRelationMapper.batchInsert(Mockito.anyList())).thenReturn(1);
Mockito.when(processTaskRelationMapper.updateById(Mockito.any())).thenReturn(1);
Mockito.when(processTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2);
// success
Map<String, Object> successMap = taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE,
taskDefinitionJson, UPSTREAM_CODE);
Assertions.assertEquals(Status.SUCCESS, successMap.get(Constants.STATUS));
user.setUserType(UserType.GENERAL_USER);
try (
MockedStatic<TaskPluginManager> taskPluginManagerMockedStatic =
Mockito.mockStatic(TaskPluginManager.class)) {
taskPluginManagerMockedStatic.when(() -> TaskPluginManager.checkTaskParameters(Mockito.any()))
.thenReturn(true);
String taskDefinitionJson = getTaskDefinitionJson();
TaskDefinition taskDefinition = getTaskDefinition();
taskDefinition.setFlag(Flag.NO);
TaskDefinition taskDefinitionSecond = getTaskDefinition();
taskDefinitionSecond.setCode(5);
user.setUserType(UserType.ADMIN_USER);
when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
when(projectService.hasProjectAndWritePerm(user, getProject(), new HashMap<>())).thenReturn(true);
when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1);
when(taskDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
when(taskDefinitionMapper.queryByCodeList(Mockito.anySet()))
.thenReturn(Arrays.asList(taskDefinition, taskDefinitionSecond));
when(processTaskRelationMapper.queryUpstreamByCode(PROJECT_CODE, TASK_CODE))
.thenReturn(getProcessTaskRelationListV2());
when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE))
.thenReturn(getProcessDefinition());
when(processTaskRelationMapper.batchInsert(Mockito.anyList())).thenReturn(1);
when(processTaskRelationMapper.updateById(Mockito.any())).thenReturn(1);
when(processTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2);
// success
Map<String, Object> successMap = taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE,
taskDefinitionJson, UPSTREAM_CODE);
assertEquals(Status.SUCCESS, successMap.get(Constants.STATUS));
user.setUserType(UserType.GENERAL_USER);
}
}
private String getTaskDefinitionJson() {

57
dolphinscheduler-api/src/test/resources/logback-spring.xml

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<property name="log.base" value="logs"/>
<property scope="context" name="log.base.ctx" value="${log.base}" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="APILOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.base}/dolphinscheduler-api.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-api.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>64MB</maxFileSize>
<totalSizeCap>50GB</totalSizeCap>
<cleanHistoryOnStart>true</cleanHistoryOnStart>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{10}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.hbase" level="WARN"/>
<logger name="org.apache.hadoop" level="WARN"/>
<root level="ERROR">
<appender-ref ref="STDOUT"/>
<appender-ref ref="APILOGFILE"/>
</root>
</configuration>

26
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/CommonConfiguration.java

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("org.apache.dolphinscheduler.common")
public class CommonConfiguration {
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java

@ -40,7 +40,6 @@ public class RemoteLogUtils {
@Autowired
private RemoteLogService autowiredRemoteLogService;
@PostConstruct
private void init() {
remoteLogService = autowiredRemoteLogService;

124
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/HttpProperty.java

@ -1,124 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.process;
import org.apache.dolphinscheduler.common.enums.HttpParametersType;
import java.util.Objects;
public class HttpProperty {
/**
* key
*/
private String prop;
/**
* httpParametersType
*/
private HttpParametersType httpParametersType;
/**
* value
*/
private String value;
public HttpProperty() {
}
public HttpProperty(String prop, HttpParametersType httpParametersType, String value) {
this.prop = prop;
this.httpParametersType = httpParametersType;
this.value = value;
}
/**
* getter method
*
* @return the prop
* @see HttpProperty#prop
*/
public String getProp() {
return prop;
}
/**
* setter method
*
* @param prop the prop to set
* @see HttpProperty#prop
*/
public void setProp(String prop) {
this.prop = prop;
}
/**
* getter method
*
* @return the value
* @see HttpProperty#value
*/
public String getValue() {
return value;
}
/**
* setter method
*
* @param value the value to set
* @see HttpProperty#value
*/
public void setValue(String value) {
this.value = value;
}
public HttpParametersType getHttpParametersType() {
return httpParametersType;
}
public void setHttpParametersType(HttpParametersType httpParametersType) {
this.httpParametersType = httpParametersType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HttpProperty property = (HttpProperty) o;
return Objects.equals(prop, property.prop)
&& Objects.equals(value, property.value);
}
@Override
public int hashCode() {
return Objects.hash(prop, value);
}
@Override
public String toString() {
return "HttpProperty{"
+ "prop='" + prop + '\''
+ ", httpParametersType=" + httpParametersType
+ ", value='" + value + '\''
+ '}';
}
}

39
dolphinscheduler-dao-plugin/dolphinscheduler-dao-api/src/main/java/org/apache/dolphinscheduler/dao/plugin/api/DatabaseEnvironmentCondition.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.plugin.api;
import java.util.Arrays;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
public class DatabaseEnvironmentCondition implements Condition {
private final String profile;
public DatabaseEnvironmentCondition(String profile) {
this.profile = profile;
}
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
String[] activeProfiles = context.getEnvironment().getActiveProfiles();
return Arrays.asList(activeProfiles).contains(profile);
}
}

8
dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginConfiguration.java → dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DaoPluginAutoConfiguration.java

@ -29,14 +29,14 @@ import org.apache.dolphinscheduler.dao.plugin.h2.monitor.H2Monitor;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import com.baomidou.mybatisplus.annotation.DbType;
@Profile("h2")
@Configuration
public class H2DaoPluginConfiguration implements DaoPluginConfiguration {
@Conditional(H2DatabaseEnvironmentCondition.class)
@Configuration(proxyBeanMethods = false)
public class H2DaoPluginAutoConfiguration implements DaoPluginConfiguration {
@Autowired
private DataSource dataSource;

28
dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/java/org/apache/dolphinscheduler/dao/plugin/h2/H2DatabaseEnvironmentCondition.java

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.plugin.h2;
import org.apache.dolphinscheduler.dao.plugin.api.DatabaseEnvironmentCondition;
public class H2DatabaseEnvironmentCondition extends DatabaseEnvironmentCondition {
public H2DatabaseEnvironmentCondition() {
super("h2");
}
}

19
dolphinscheduler-dao-plugin/dolphinscheduler-dao-h2/src/main/resources/META-INF/spring.factories

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.dolphinscheduler.dao.plugin.h2.H2DaoPluginAutoConfiguration

8
dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginConfiguration.java → dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDaoPluginAutoConfiguration.java

@ -28,14 +28,14 @@ import org.apache.dolphinscheduler.dao.plugin.mysql.monitor.MysqlMonitor;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import com.baomidou.mybatisplus.annotation.DbType;
@Profile("mysql")
@Configuration
public class MysqlDaoPluginConfiguration implements DaoPluginConfiguration {
@Configuration(proxyBeanMethods = false)
@Conditional(MysqlDatabaseEnvironmentCondition.class)
public class MysqlDaoPluginAutoConfiguration implements DaoPluginConfiguration {
@Autowired
private DataSource dataSource;

28
dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/java/org/apache/dolphinscheduler/dao/plugin/mysql/MysqlDatabaseEnvironmentCondition.java

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.plugin.mysql;
import org.apache.dolphinscheduler.dao.plugin.api.DatabaseEnvironmentCondition;
public class MysqlDatabaseEnvironmentCondition extends DatabaseEnvironmentCondition {
public MysqlDatabaseEnvironmentCondition() {
super("mysql");
}
}

19
dolphinscheduler-dao-plugin/dolphinscheduler-dao-mysql/src/main/resources/META-INF/spring.factories

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.dolphinscheduler.dao.plugin.mysql.MysqlDaoPluginAutoConfiguration

8
dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginConfiguration.java → dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDaoPluginAutoConfiguration.java

@ -29,14 +29,14 @@ import org.apache.dolphinscheduler.dao.plugin.postgresql.monitor.PostgresqlMonit
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import com.baomidou.mybatisplus.annotation.DbType;
@Profile("postgresql")
@Configuration
public class PostgresqlDaoPluginConfiguration implements DaoPluginConfiguration {
@Conditional(PostgresqlDatabaseEnvironmentCondition.class)
@Configuration(proxyBeanMethods = false)
public class PostgresqlDaoPluginAutoConfiguration implements DaoPluginConfiguration {
@Autowired
private DataSource dataSource;

28
dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/java/org/apache/dolphinscheduler/dao/plugin/postgresql/PostgresqlDatabaseEnvironmentCondition.java

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.plugin.postgresql;
import org.apache.dolphinscheduler.dao.plugin.api.DatabaseEnvironmentCondition;
public class PostgresqlDatabaseEnvironmentCondition extends DatabaseEnvironmentCondition {
public PostgresqlDatabaseEnvironmentCondition() {
super("postgresql");
}
}

19
dolphinscheduler-dao-plugin/dolphinscheduler-dao-postgresql/src/main/resources/META-INF/spring.factories

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.dolphinscheduler.dao.plugin.postgresql.PostgresqlDaoPluginAutoConfiguration

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java

@ -40,8 +40,8 @@ import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
@Configuration
@ComponentScan("org.apache.dolphinscheduler.dao")
@EnableAutoConfiguration
@ComponentScan({"org.apache.dolphinscheduler.dao.plugin"})
@MapperScan(basePackages = "org.apache.dolphinscheduler.dao.mapper", sqlSessionFactoryRef = "sqlSessionFactory")
public class DaoConfiguration {

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/BaseDaoTest.java

@ -22,7 +22,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
@ExtendWith(MockitoExtension.class)
@ -30,6 +29,5 @@ import org.springframework.transaction.annotation.Transactional;
@SpringBootApplication(scanBasePackageClasses = DaoConfiguration.class)
@Transactional
@Rollback
@EnableTransactionManagement
public abstract class BaseDaoTest {
}

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java

@ -34,7 +34,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
@ActiveProfiles(ProfileType.H2)
@ -43,7 +42,6 @@ import org.springframework.transaction.annotation.Transactional;
@SpringBootTest(classes = DaoConfiguration.class)
@Transactional
@Rollback
@EnableTransactionManagement
public class AlertDaoTest {
@Autowired

28
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -17,15 +17,18 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryAutoConfiguration;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
@ -35,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator;
import org.apache.dolphinscheduler.service.ServiceConfiguration;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import javax.annotation.PostConstruct;
@ -45,18 +49,15 @@ import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = JdbcRegistryAutoConfiguration.class)
})
@EnableTransactionManagement
@EnableCaching
@Slf4j
@Import({DaoConfiguration.class,
ServiceConfiguration.class,
CommonConfiguration.class,
StorageConfiguration.class,
RegistryConfiguration.class})
@SpringBootApplication
public class MasterServer implements IStoppable {
@Autowired
@ -65,9 +66,6 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterRegistryClient masterRegistryClient;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private MasterSchedulerBootstrap masterSchedulerBootstrap;
@ -109,7 +107,7 @@ public class MasterServer implements IStoppable {
this.masterRPCServer.start();
// install task plugin
this.taskPluginManager.loadPlugin();
TaskPluginManager.loadPlugin();
this.masterSlotManager.start();

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java

@ -90,9 +90,6 @@ public class TaskExecutionContextFactory {
@Autowired
private ProcessService processService;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private CuringParamsService curingParamsService;
@ -106,14 +103,14 @@ public class TaskExecutionContextFactory {
ProcessInstance workflowInstance = taskInstance.getProcessInstance();
ResourceParametersHelper resources =
Optional.ofNullable(taskPluginManager.getTaskChannel(taskInstance.getTaskType()))
Optional.ofNullable(TaskPluginManager.getTaskChannel(taskInstance.getTaskType()))
.map(taskChannel -> taskChannel.getResources(taskInstance.getTaskParams()))
.orElse(null);
setTaskResourceInfo(resources);
Map<String, Property> businessParamsMap = curingParamsService.preBuildBusinessParams(workflowInstance);
AbstractParameters baseParam = taskPluginManager.getParameters(ParametersNode.builder()
AbstractParameters baseParam = TaskPluginManager.getParameters(ParametersNode.builder()
.taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
Map<String, Property> propertyMap =
curingParamsService.paramParsingPreparation(taskInstance, baseParam, workflowInstance);

2
dolphinscheduler-master/src/test/resources/logback.xml

@ -65,7 +65,7 @@
</encoder>
</appender>
<root level="INFO">
<root level="ERROR">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

13
dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java → dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterAutoConfiguration.java

@ -20,7 +20,8 @@
package org.apache.dolphinscheduler.meter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.apache.dolphinscheduler.meter.metrics.DefaultMetricsProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -42,11 +43,15 @@ import io.micrometer.core.instrument.MeterRegistry;
* }
* </pre>
*/
@Configuration
@Configuration(proxyBeanMethods = false)
@EnableAspectJAutoProxy
@EnableAutoConfiguration
@ConditionalOnProperty(prefix = "metrics", name = "enabled", havingValue = "true")
public class MeterConfiguration {
public class MeterAutoConfiguration {
@Bean
public DefaultMetricsProvider metricsProvider(MeterRegistry meterRegistry) {
return new DefaultMetricsProvider(meterRegistry);
}
@Bean
public TimedAspect timedAspect(MeterRegistry registry) {

3
dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/metrics/DefaultMetricsProvider.java

@ -19,11 +19,8 @@ package org.apache.dolphinscheduler.meter.metrics;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.springframework.stereotype.Component;
import io.micrometer.core.instrument.MeterRegistry;
@Component
public class DefaultMetricsProvider implements MetricsProvider {
private final MeterRegistry meterRegistry;

19
dolphinscheduler-meter/src/main/resources/META-INF/spring.factories

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.dolphinscheduler.meter.MeterAutoConfiguration

33
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryConfiguration.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RegistryConfiguration {
@Bean
@ConditionalOnMissingBean
public RegistryClient registryClient(Registry registry) {
return new RegistryClient(registry);
}
}

12
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java

@ -41,8 +41,6 @@ import javax.net.ssl.SSLException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import com.google.common.base.Splitter;
@ -68,8 +66,6 @@ import io.netty.handler.ssl.SslContext;
* This is one of the implementation of {@link Registry}, with this implementation, you need to rely on Etcd cluster to
* store the DolphinScheduler master/worker's metadata and do the server registry/unRegistry.
*/
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "etcd")
@Slf4j
public class EtcdRegistry implements Registry {
@ -86,6 +82,7 @@ public class EtcdRegistry implements Registry {
private final Map<String, Watch.Watcher> watcherMap = new ConcurrentHashMap<>();
private static final long TIME_TO_LIVE_SECONDS = 30L;
public EtcdRegistry(EtcdRegistryProperties registryProperties) throws SSLException {
ClientBuilder clientBuilder = Client.builder()
.endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(registryProperties.getEndpoints())))
@ -141,8 +138,7 @@ public class EtcdRegistry implements Registry {
}
/**
*
* @param path The prefix of the key being listened to
* @param path The prefix of the key being listened to
* @param listener
* @return if subcribe Returns true if no exception was thrown
*/
@ -165,8 +161,8 @@ public class EtcdRegistry implements Registry {
}
/**
* @throws throws an exception if the unsubscribe path does not exist
* @param path The prefix of the key being listened to
* @throws throws an exception if the unsubscribe path does not exist
*/
@Override
public void unsubscribe(String path) {
@ -184,7 +180,6 @@ public class EtcdRegistry implements Registry {
}
/**
*
* @return Returns the value corresponding to the key
* @throws throws an exception if the key does not exist
*/
@ -202,7 +197,6 @@ public class EtcdRegistry implements Registry {
}
/**
*
* @param deleteOnDisconnect Does the put data disappear when the client disconnects
*/
@Override

48
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryAutoConfiguration.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.etcd;
import org.apache.dolphinscheduler.registry.api.Registry;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Slf4j
@ComponentScan
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "etcd")
public class EtcdRegistryAutoConfiguration {
public EtcdRegistryAutoConfiguration() {
log.info("Load EtcdRegistryAutoConfiguration");
}
@Bean
@ConditionalOnMissingBean(value = Registry.class)
public EtcdRegistry etcdRegistry(EtcdRegistryProperties etcdRegistryProperties) throws SSLException {
return new EtcdRegistry(etcdRegistryProperties);
}
}

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java

@ -21,13 +21,11 @@ import java.time.Duration;
import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "etcd")
@ConfigurationProperties(prefix = "registry")
public class EtcdRegistryProperties {

19
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/resources/META-INF/spring.factories

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.dolphinscheduler.plugin.registry.etcd.EtcdRegistryAutoConfiguration

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java

@ -30,6 +30,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;
@ -37,6 +38,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.zaxxer.hikari.HikariDataSource;
@Slf4j
@ComponentScan
@Configuration(proxyBeanMethods = false)
@MapperScan("org.apache.dolphinscheduler.plugin.registry.jdbc.mapper")
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")

7
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java

@ -50,14 +50,11 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Strings;
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper")
@Slf4j
public final class ZookeeperRegistry implements Registry {
private final ZookeeperRegistryProperties.ZookeeperProperties properties;

46
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryAutoConfiguration.java

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.registry.api.Registry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Slf4j
@ComponentScan
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper")
public class ZookeeperRegistryAutoConfiguration {
public ZookeeperRegistryAutoConfiguration() {
log.info("Load ZookeeperRegistryAutoConfiguration");
}
@Bean
@ConditionalOnMissingBean(value = Registry.class)
public ZookeeperRegistry zookeeperRegistry(ZookeeperRegistryProperties zookeeperRegistryProperties) {
return new ZookeeperRegistry(zookeeperRegistryProperties);
}
}

94
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java

@ -19,25 +19,19 @@ package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import java.time.Duration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper")
@ConfigurationProperties(prefix = "registry")
public class ZookeeperRegistryProperties {
private ZookeeperProperties zookeeper = new ZookeeperProperties();
public ZookeeperProperties getZookeeper() {
return zookeeper;
}
public void setZookeeper(ZookeeperProperties zookeeper) {
this.zookeeper = zookeeper;
}
@Data
public static final class ZookeeperProperties {
private String namespace;
@ -48,91 +42,13 @@ public class ZookeeperRegistryProperties {
private Duration connectionTimeout = Duration.ofSeconds(9);
private Duration blockUntilConnected = Duration.ofMillis(600);
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getConnectString() {
return connectString;
}
public void setConnectString(String connectString) {
this.connectString = connectString;
}
public RetryPolicy getRetryPolicy() {
return retryPolicy;
}
public void setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}
public String getDigest() {
return digest;
}
public void setDigest(String digest) {
this.digest = digest;
}
public Duration getSessionTimeout() {
return sessionTimeout;
}
public void setSessionTimeout(Duration sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
public Duration getConnectionTimeout() {
return connectionTimeout;
}
public void setConnectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public Duration getBlockUntilConnected() {
return blockUntilConnected;
}
public void setBlockUntilConnected(Duration blockUntilConnected) {
this.blockUntilConnected = blockUntilConnected;
}
@Data
public static final class RetryPolicy {
private Duration baseSleepTime = Duration.ofMillis(60);
private int maxRetries;
private Duration maxSleep = Duration.ofMillis(300);
public Duration getBaseSleepTime() {
return baseSleepTime;
}
public void setBaseSleepTime(Duration baseSleepTime) {
this.baseSleepTime = baseSleepTime;
}
public int getMaxRetries() {
return maxRetries;
}
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
public Duration getMaxSleep() {
return maxSleep;
}
public void setMaxSleep(Duration maxSleep) {
this.maxSleep = maxSleep;
}
}
}

19
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/resources/META-INF/spring.factories

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.dolphinscheduler.plugin.registry.zookeeper.ZookeeperRegistryAutoConfiguration

2
dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerConfiguration.java → dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/QuartzSchedulerAutoConfiguration.java

@ -28,7 +28,7 @@ import org.springframework.context.annotation.Bean;
@AutoConfiguration(after = {QuartzAutoConfiguration.class})
@ConditionalOnClass(value = Scheduler.class)
public class QuartzSchedulerConfiguration {
public class QuartzSchedulerAutoConfiguration {
@Bean
@ConditionalOnMissingBean

19
dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/resources/META-INF/spring.factories

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.dolphinscheduler.scheduler.quartz.QuartzSchedulerAutoConfiguration

26
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/ServiceConfiguration.java

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@ComponentScan("org.apache.dolphinscheduler.service")
@Configuration
public class ServiceConfiguration {
}

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -111,7 +111,6 @@ import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcCli
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
@ -262,9 +261,6 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private WorkFlowLineageMapper workFlowLineageMapper;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private ClusterMapper clusterMapper;

108
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
/**
* common utils
*/
@Slf4j
public class CommonUtils {
private static final Base64 BASE64 = new Base64();
protected CommonUtils() {
throw new UnsupportedOperationException("Construct CommonUtils");
}
/**
* @return get the path of system environment variables
*/
public static String getSystemEnvPath() {
String envPath = PropertyUtils.getString(Constants.DOLPHINSCHEDULER_ENV_PATH);
if (StringUtils.isEmpty(envPath)) {
URL envDefaultPath = CommonUtils.class.getClassLoader().getResource(Constants.ENV_PATH);
if (envDefaultPath != null) {
envPath = envDefaultPath.getPath();
log.debug("env path :{}", envPath);
} else {
envPath = "/etc/profile";
}
}
return envPath;
}
/**
* encode password
*/
public static String encodePassword(String password) {
if (StringUtils.isEmpty(password)) {
return StringUtils.EMPTY;
}
// if encryption is not turned on, return directly
boolean encryptionEnable = PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false);
if (!encryptionEnable) {
return password;
}
// Using Base64 + salt to process password
String salt = PropertyUtils.getString(DataSourceConstants.DATASOURCE_ENCRYPTION_SALT,
DataSourceConstants.DATASOURCE_ENCRYPTION_SALT_DEFAULT);
String passwordWithSalt = salt + new String(BASE64.encode(password.getBytes(StandardCharsets.UTF_8)));
return new String(BASE64.encode(passwordWithSalt.getBytes(StandardCharsets.UTF_8)));
}
/**
* decode password
*/
public static String decodePassword(String password) {
if (StringUtils.isEmpty(password)) {
return StringUtils.EMPTY;
}
// if encryption is not turned on, return directly
boolean encryptionEnable = PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false);
if (!encryptionEnable) {
return password;
}
// Using Base64 + salt to process password
String salt = PropertyUtils.getString(DataSourceConstants.DATASOURCE_ENCRYPTION_SALT,
DataSourceConstants.DATASOURCE_ENCRYPTION_SALT_DEFAULT);
String passwordWithSalt = new String(BASE64.decode(password), StandardCharsets.UTF_8);
if (!passwordWithSalt.startsWith(salt)) {
log.warn("There is a password and salt mismatch: {} ", password);
return password;
}
return new String(BASE64.decode(passwordWithSalt.substring(salt.length())), StandardCharsets.UTF_8);
}
}

5
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -79,7 +79,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
@ -102,8 +101,6 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* process service test
@ -112,7 +109,6 @@ import org.slf4j.LoggerFactory;
@MockitoSettings(strictness = Strictness.LENIENT)
public class ProcessServiceTest {
private static final Logger logger = LoggerFactory.getLogger(CronUtilsTest.class);
@InjectMocks
private ProcessServiceImpl processService;
@Mock
@ -667,7 +663,6 @@ public class ProcessServiceTest {
taskDefinition.setVersion(1);
taskDefinition.setCreateTime(new Date());
taskDefinition.setUpdateTime(new Date());
when(taskPluginManager.getParameters(any())).thenReturn(null);
when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(),
taskDefinition.getVersion())).thenReturn(taskDefinition);
when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1);

71
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* configuration test
*/
@ExtendWith(MockitoExtension.class)
public class CommonUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(CommonUtilsTest.class);
@Test
public void getSystemEnvPath() {
String envPath;
envPath = CommonUtils.getSystemEnvPath();
Assertions.assertEquals("/etc/profile", envPath);
}
@Test
public void getDownloadFilename() {
logger.info(FileUtils.getDownloadFilename("a.txt"));
Assertions.assertTrue(true);
}
@Test
public void getUploadFilename() {
logger.info(FileUtils.getUploadFilename("1234", "a.txt"));
Assertions.assertTrue(true);
}
@Test
public void test() {
InetAddress ip;
try {
ip = InetAddress.getLocalHost();
logger.info(ip.getHostAddress());
} catch (UnknownHostException e) {
e.printStackTrace();
}
Assertions.assertTrue(true);
}
}

2
dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java

@ -24,8 +24,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@Slf4j
@SpringBootApplication
public class StandaloneServer {
public static void main(String[] args) throws Exception {

2
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -329,4 +329,4 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
username: root
password: root@123
password: root

27
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java

@ -36,21 +36,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TaskPluginManager {
private final Map<String, TaskChannelFactory> taskChannelFactoryMap = new HashMap<>();
private final Map<String, TaskChannel> taskChannelMap = new HashMap<>();
private static final Map<String, TaskChannelFactory> taskChannelFactoryMap = new HashMap<>();
private static final Map<String, TaskChannel> taskChannelMap = new HashMap<>();
private final AtomicBoolean loadedFlag = new AtomicBoolean(false);
private static final AtomicBoolean loadedFlag = new AtomicBoolean(false);
/**
* Load task plugins from classpath.
*/
public void loadPlugin() {
public static void loadPlugin() {
if (!loadedFlag.compareAndSet(false, true)) {
log.warn("The task plugin has already been loaded");
return;
@ -70,24 +67,24 @@ public class TaskPluginManager {
}
public Map<String, TaskChannel> getTaskChannelMap() {
public static Map<String, TaskChannel> getTaskChannelMap() {
return Collections.unmodifiableMap(taskChannelMap);
}
public Map<String, TaskChannelFactory> getTaskChannelFactoryMap() {
public static Map<String, TaskChannelFactory> getTaskChannelFactoryMap() {
return Collections.unmodifiableMap(taskChannelFactoryMap);
}
public TaskChannel getTaskChannel(String type) {
return this.getTaskChannelMap().get(type);
public static TaskChannel getTaskChannel(String type) {
return getTaskChannelMap().get(type);
}
public boolean checkTaskParameters(ParametersNode parametersNode) {
AbstractParameters abstractParameters = this.getParameters(parametersNode);
public static boolean checkTaskParameters(ParametersNode parametersNode) {
AbstractParameters abstractParameters = getParameters(parametersNode);
return abstractParameters != null && abstractParameters.checkParameters();
}
public AbstractParameters getParameters(ParametersNode parametersNode) {
public static AbstractParameters getParameters(ParametersNode parametersNode) {
String taskType = parametersNode.getTaskType();
if (Objects.isNull(taskType)) {
return null;
@ -106,7 +103,7 @@ public class TaskPluginManager {
case TaskConstants.TASK_TYPE_DYNAMIC:
return JSONUtils.parseObject(parametersNode.getTaskParams(), DynamicParameters.class);
default:
TaskChannel taskChannel = this.getTaskChannelMap().get(taskType);
TaskChannel taskChannel = getTaskChannelMap().get(taskType);
if (Objects.isNull(taskChannel)) {
return null;
}

4
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java

@ -23,12 +23,12 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
@ImportAutoConfiguration(DaoConfiguration.class)
@Import(DaoConfiguration.class)
@SpringBootApplication
public class UpgradeDolphinScheduler {

22
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
@ -24,11 +25,12 @@ import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryAutoConfiguration;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@ -47,24 +49,18 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@EnableTransactionManagement
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = JdbcRegistryAutoConfiguration.class)
})
@Slf4j
@Import({CommonConfiguration.class,
StorageConfiguration.class,
RegistryConfiguration.class})
@SpringBootApplication
public class WorkerServer implements IStoppable {
@Autowired
private WorkerRegistryClient workerRegistryClient;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private WorkerRpcServer workerRpcServer;
@ -89,7 +85,7 @@ public class WorkerServer implements IStoppable {
@PostConstruct
public void run() {
this.workerRpcServer.start();
this.taskPluginManager.loadPlugin();
TaskPluginManager.loadPlugin();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
@ -35,13 +34,11 @@ public class DefaultWorkerTaskExecutor extends WorkerTaskExecutor {
public DefaultWorkerTaskExecutor(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
super(taskExecutionContext,
workerConfig,
workerMessageSender,
taskPluginManager,
storageOperate,
workerRegistryClient);
}

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
@ -35,20 +34,17 @@ public class DefaultWorkerTaskExecutorFactory
private final @NonNull TaskExecutionContext taskExecutionContext;
private final @NonNull WorkerConfig workerConfig;
private final @NonNull WorkerMessageSender workerMessageSender;
private final @NonNull TaskPluginManager taskPluginManager;
private final @Nullable StorageOperate storageOperate;
private final @NonNull WorkerRegistryClient workerRegistryClient;
public DefaultWorkerTaskExecutorFactory(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.workerMessageSender = workerMessageSender;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
this.workerRegistryClient = workerRegistryClient;
}
@ -59,7 +55,6 @@ public class DefaultWorkerTaskExecutorFactory
taskExecutionContext,
workerConfig,
workerMessageSender,
taskPluginManager,
storageOperate,
workerRegistryClient);
}

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java

@ -75,7 +75,6 @@ public abstract class WorkerTaskExecutor implements Runnable {
protected final TaskExecutionContext taskExecutionContext;
protected final WorkerConfig workerConfig;
protected final WorkerMessageSender workerMessageSender;
protected final TaskPluginManager taskPluginManager;
protected final @Nullable StorageOperate storageOperate;
protected final WorkerRegistryClient workerRegistryClient;
@ -85,13 +84,11 @@ public abstract class WorkerTaskExecutor implements Runnable {
@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.workerMessageSender = workerMessageSender;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
this.workerRegistryClient = workerRegistryClient;
SensitiveDataConverter.addMaskPattern(K8S_CONFIG_REGEX);
@ -220,7 +217,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath());
TaskChannel taskChannel =
Optional.ofNullable(taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()))
Optional.ofNullable(TaskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()))
.orElseThrow(() -> new TaskPluginException(taskExecutionContext.getTaskType()
+ " task plugin not found, please check the task type is correct."));

11
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
@ -36,12 +35,6 @@ public class WorkerTaskExecutorFactoryBuilder {
@Autowired
private WorkerMessageSender workerMessageSender;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private WorkerTaskExecutorThreadPool workerManager;
@Autowired(required = false)
private StorageOperate storageOperate;
@ -51,14 +44,11 @@ public class WorkerTaskExecutorFactoryBuilder {
public WorkerTaskExecutorFactoryBuilder(
WorkerConfig workerConfig,
WorkerMessageSender workerMessageSender,
TaskPluginManager taskPluginManager,
WorkerTaskExecutorThreadPool workerManager,
StorageOperate storageOperate,
WorkerRegistryClient workerRegistryClient) {
this.workerConfig = workerConfig;
this.workerMessageSender = workerMessageSender;
this.taskPluginManager = taskPluginManager;
this.workerManager = workerManager;
this.storageOperate = storageOperate;
this.workerRegistryClient = workerRegistryClient;
}
@ -67,7 +57,6 @@ public class WorkerTaskExecutorFactoryBuilder {
return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
workerConfig,
workerMessageSender,
taskPluginManager,
storageOperate,
workerRegistryClient);
}

5
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@ -40,8 +39,6 @@ public class DefaultWorkerTaskExecutorTest {
private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class);
private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class);
private StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
private WorkerRegistryClient workerRegistryClient = Mockito.mock(WorkerRegistryClient.class);
@ -58,7 +55,6 @@ public class DefaultWorkerTaskExecutorTest {
taskExecutionContext,
workerConfig,
workerMessageSender,
taskPluginManager,
storageOperate,
workerRegistryClient);
@ -82,7 +78,6 @@ public class DefaultWorkerTaskExecutorTest {
taskExecutionContext,
workerConfig,
workerMessageSender,
taskPluginManager,
storageOperate,
workerRegistryClient);

3
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java

@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@ -68,7 +67,7 @@ class WorkerTaskExecutorThreadPoolTest {
protected MockWorkerTaskExecutor(Runnable runnable) {
super(TaskExecutionContext.builder().taskInstanceId((int) System.nanoTime()).build(), new WorkerConfig(),
new WorkerMessageSender(), new TaskPluginManager(), new StorageOperate() {
new WorkerMessageSender(), new StorageOperate() {
@Override
public void createTenantDirIfNotExists(String tenantCode) {

5
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java

@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHost
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
@ -70,8 +69,6 @@ public class TaskInstanceOperationFunctionTest {
private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class);
private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class);
private WorkerTaskExecutorThreadPool workerManager = Mockito.mock(WorkerTaskExecutorThreadPool.class);
private StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
@ -94,7 +91,6 @@ public class TaskInstanceOperationFunctionTest {
WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = new WorkerTaskExecutorFactoryBuilder(
workerConfig,
workerMessageSender,
taskPluginManager,
workerManager,
storageOperate,
workerRegistryClient);
@ -189,7 +185,6 @@ public class TaskInstanceOperationFunctionTest {
WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = new WorkerTaskExecutorFactoryBuilder(
workerConfig,
workerMessageSender,
taskPluginManager,
workerManager,
storageOperate,
workerRegistryClient);

14
dolphinscheduler-worker/src/test/resources/logback.xml

@ -22,7 +22,8 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] -
[WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
@ -60,18 +61,15 @@
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] -
[WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<if condition="${DOCKER:-false}">
<then>
<appender-ref ref="STDOUT"/>
</then>
</if>
<root level="ERROR">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/>
</root>

Loading…
Cancel
Save