BoYiZhang 4 years ago
parent
commit
e4d76c5d8b
  1. 4
      .github/ISSUE_TEMPLATE/bug_report.md
  2. 7
      .github/ISSUE_TEMPLATE/feature_request.md
  3. 22
      .github/ISSUE_TEMPLATE/improvement_suggestion.md
  4. 8
      .github/ISSUE_TEMPLATE/question.md
  5. 22
      .github/ISSUE_TEMPLATE/test.md
  6. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
  7. 174
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
  8. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  9. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
  10. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  11. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
  12. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
  13. 11
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
  14. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
  15. 3
      dolphinscheduler-service/src/main/resources/zookeeper.properties

4
.github/ISSUE_TEMPLATE/bug_report.md

@ -1,7 +1,7 @@
---
name: Bug report
about: Create a report to help us improve
title: "[BUG] bug title "
title: "[Bug][Module Name] Bug title "
labels: bug
assignees: ''
@ -32,5 +32,5 @@ If applicable, add screenshots to help explain your problem.
**Additional context**
Add any other context about the problem here.
**Requirement or improvement
**Requirement or improvement**
- Please describe about your requirements or improvement suggestions.

7
.github/ISSUE_TEMPLATE/feature_request.md

@ -1,12 +1,17 @@
---
name: Feature request
about: Suggest an idea for this project
title: "[Feature]"
title: "[Feature][Module Name] Feature title"
labels: new feature
assignees: ''
---
*For better global communication, please give priority to using English description, thx! *
**Describe the feature**
A clear and concise description of what the feature is.
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

22
.github/ISSUE_TEMPLATE/improvement_suggestion.md

@ -0,0 +1,22 @@
---
name: Improvement suggestion
about: Improvement suggestion for this project
title: "[Improvement][Module Name] Improvement title"
labels: improvement
assignees: ''
---
*For better global communication, please give priority to using English description, thx! *
**Describe the question**
A clear and concise description of what the improvement is.
**What are the current deficiencies and the benefits of improvement**
- A clear and concise description of the current deficiencies and the benefits of this improvement.
**Which version of DolphinScheduler:**
-[1.1.0-preview]
**Describe alternatives you've considered**
A clear and concise description of any alternative improvement solutions you've considered.

8
.github/ISSUE_TEMPLATE/question.md

@ -1,7 +1,7 @@
---
name: question
about: have a question wanted to be help
title: "[QUESTION] question title"
name: Question
about: Have a question wanted to be help
title: "[Question] Question title"
labels: question
assignees: ''
@ -19,5 +19,5 @@ A clear and concise description of what the question is.
**Additional context**
Add any other context about the problem here.
**Requirement or improvement
**Requirement or improvement**
- Please describe about your requirements or improvement suggestions.

22
.github/ISSUE_TEMPLATE/test.md

@ -0,0 +1,22 @@
---
name: Test
about: Test to enhance the robustness of this project
title: "[Test][Module Name] Test title"
labels: test
assignees: ''
---
*For better global communication, please give priority to using English description, thx! *
**Describe the question**
A clear and concise description of what the test part is.
**What are the current deficiencies and the benefits of changing or adding this test**
- A clear and concise description of the current deficiencies, the benefits of changing or adding this test, and the scope involved.
**Which version of DolphinScheduler:**
-[1.1.0-preview]
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions you've considered.

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
@ -333,10 +334,9 @@ public class SchedulerService extends BaseService {
if(scheduleStatus == ReleaseState.ONLINE){
// check process definition release state
if(processDefinition.getReleaseState() != ReleaseState.ONLINE){
ProcessDefinition definition = processDefinitionMapper.selectById(scheduleObj.getProcessDefinitionId());
logger.info("not release process definition id: {} , name : {}",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, definition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
}
// check sub process definition release state
@ -380,7 +380,7 @@ public class SchedulerService extends BaseService {
switch (scheduleStatus) {
case ONLINE: {
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), id);
setSchedule(project.getId(), scheduleObj);
break;
}
case OFFLINE: {
@ -395,7 +395,7 @@ public class SchedulerService extends BaseService {
}
} catch (Exception e) {
result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
throw new RuntimeException(result.get(Constants.MSG).toString());
throw new ServiceException(result.get(Constants.MSG).toString());
}
putMsg(result, Status.SUCCESS);
@ -472,15 +472,10 @@ public class SchedulerService extends BaseService {
return result;
}
public void setSchedule(int projectId, int scheduleId) throws RuntimeException{
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
public void setSchedule(int projectId, Schedule schedule) {
Schedule schedule = processService.querySchedule(scheduleId);
if (schedule == null) {
logger.warn("process schedule info not exists");
return;
}
int scheduleId = schedule.getId();
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
Date startDate = schedule.getStartTime();
Date endDate = schedule.getEndTime();
@ -502,7 +497,7 @@ public class SchedulerService extends BaseService {
* @param scheduleId schedule id
* @throws RuntimeException runtime exception
*/
public static void deleteSchedule(int projectId, int scheduleId) throws RuntimeException{
public static void deleteSchedule(int projectId, int scheduleId) {
logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
String jobName = QuartzExecutors.buildJobName(scheduleId);
@ -510,7 +505,7 @@ public class SchedulerService extends BaseService {
if(!QuartzExecutors.getInstance().deleteJob(jobName, jobGroupName)){
logger.warn("set offline failure:projectId:{},scheduleId:{}",projectId,scheduleId);
throw new RuntimeException(String.format("set offline failure"));
throw new ServiceException("set offline failure");
}
}

174
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java

@ -16,43 +16,183 @@
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApiApplicationServer.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest(QuartzExecutors.class)
public class SchedulerServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class);
@Autowired
@InjectMocks
private SchedulerService schedulerService;
@Autowired
private ExecutorService executorService;
@Mock
private MonitorService monitorService;
@Mock
private ProcessService processService;
@Mock
private ScheduleMapper scheduleMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectUserMapper projectUserMapper;
@Mock
private ProjectService projectService;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private QuartzExecutors quartzExecutors;
@Mock
private Scheduler scheduler;
@Before
public void setUp() {
quartzExecutors = PowerMockito.mock(QuartzExecutors.class);
PowerMockito.mockStatic(QuartzExecutors.class);
try {
PowerMockito.doReturn(quartzExecutors).when(QuartzExecutors.class, "getInstance");
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testSetScheduleState(){
public void testSetScheduleState() {
String projectName = "test";
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
loginUser.setId(1);
Map<String, Object> result = new HashMap<String, Object>();
Project project = getProject(projectName);
ProcessDefinition processDefinition = new ProcessDefinition();
Schedule schedule = new Schedule();
schedule.setId(1);
schedule.setProcessDefinitionId(1);
schedule.setReleaseState(ReleaseState.OFFLINE);
List<Server> masterServers = new ArrayList<>();
masterServers.add(new Server());
Mockito.when(scheduleMapper.selectById(1)).thenReturn(schedule);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
//hash no auth
result = schedulerService.setScheduleState(loginUser, projectName, 1, ReleaseState.ONLINE);
Mockito.when(projectService.hasProjectAndPerm(loginUser, project, result)).thenReturn(true);
//schedule not exists
result = schedulerService.setScheduleState(loginUser, projectName, 2, ReleaseState.ONLINE);
Assert.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS, result.get(Constants.STATUS));
//SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE
result = schedulerService.setScheduleState(loginUser, projectName, 1, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS));
//PROCESS_DEFINE_NOT_EXIST
schedule.setProcessDefinitionId(2);
result = schedulerService.setScheduleState(loginUser, projectName, 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS));
schedule.setProcessDefinitionId(1);
// PROCESS_DEFINE_NOT_RELEASE
result = schedulerService.setScheduleState(loginUser, projectName, 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_RELEASE, result.get(Constants.STATUS));
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
//MASTER_NOT_EXISTS
result = schedulerService.setScheduleState(loginUser, projectName, 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.MASTER_NOT_EXISTS, result.get(Constants.STATUS));
//set master
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(masterServers);
//SUCCESS
result = schedulerService.setScheduleState(loginUser, projectName, 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
//OFFLINE
Mockito.when(quartzExecutors.deleteJob(null, null)).thenReturn(true);
result = schedulerService.setScheduleState(loginUser, projectName, 1, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void testDeleteSchedule() {
Mockito.when(quartzExecutors.deleteJob("1", "1")).thenReturn(true);
Mockito.when(quartzExecutors.buildJobGroupName(1)).thenReturn("1");
Mockito.when(quartzExecutors.buildJobName(1)).thenReturn("1");
boolean flag = true;
try {
schedulerService.deleteSchedule(1, 1);
}catch (Exception e){
flag = false;
}
Assert.assertTrue(flag);
}
private Project getProject(String name) {
Project project = new Project();
project.setName("project_test1");
project.setId(-1);
project.setName(name);
project.setUserId(1);
Map<String, Object> map = schedulerService.setScheduleState(loginUser, project.getName(), 44, ReleaseState.ONLINE);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
return project;
}
}

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -786,6 +786,10 @@ public final class Constants {
*/
public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state";
/**
* com.amazonaws.services.s3.enableV4
*/
public static final String AWS_S3_V4 = "com.amazonaws.services.s3.enableV4";
/**
* loginUserFromKeytab user

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java

@ -70,7 +70,7 @@ public class CommonUtils {
* @return true if upload resource is HDFS and kerberos startup
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -110,7 +110,7 @@ public class HadoopUtils implements Closeable {
try {
configuration = new Configuration();
String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
String resourceStorageType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
if (resUploadType == ResUploadType.HDFS) {
@ -159,6 +159,7 @@ public class HadoopUtils implements Closeable {
}
}
} else if (resUploadType == ResUploadType.S3) {
System.setProperty(Constants.AWS_S3_V4, Constants.STRING_TRUE);
configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java

@ -72,7 +72,7 @@ public class PropertyUtils {
* @return judge whether resource upload startup
*/
public static Boolean getResUploadStartupState(){
String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
return resUploadType == ResUploadType.HDFS || resUploadType == ResUploadType.S3;
}
@ -87,6 +87,16 @@ public class PropertyUtils {
return properties.getProperty(key.trim());
}
/**
* get property value with upper case
*
* @param key property name
* @return property value with upper case
*/
public static String getUpperCaseString(String key) {
return properties.getProperty(key.trim()).toUpperCase();
}
/**
* get property value
*

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java

@ -55,19 +55,15 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
case MAGIC:
checkMagic(in.readByte());
checkpoint(State.COMMAND);
break;
case COMMAND:
commandHeader.setType(in.readByte());
checkpoint(State.OPAQUE);
break;
case OPAQUE:
commandHeader.setOpaque(in.readLong());
checkpoint(State.BODY_LENGTH);
break;
case BODY_LENGTH:
commandHeader.setBodyLength(in.readInt());
checkpoint(State.BODY);
break;
case BODY:
byte[] body = new byte[commandHeader.getBodyLength()];
in.readBytes(body);
@ -79,7 +75,6 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
out.add(packet);
//
checkpoint(State.MAGIC);
break;
default:
logger.warn("unknown decoder state {}", state());
}

11
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java

@ -52,6 +52,9 @@ public class ZookeeperConfig {
@Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}")
private String dsRoot;
@Value("${zookeeper.max.wait.time:10000}")
private int maxWaitTime;
public String getServerList() {
return serverList;
}
@ -115,4 +118,12 @@ public class ZookeeperConfig {
public void setDsRoot(String dsRoot) {
this.dsRoot = dsRoot;
}
public int getMaxWaitTime() {
return maxWaitTime;
}
public void setMaxWaitTime(int maxWaitTime) {
this.maxWaitTime = maxWaitTime;
}
}

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -37,6 +37,7 @@ import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
@ -109,7 +110,9 @@ public class ZookeeperOperator implements InitializingBean {
zkClient = builder.build();
zkClient.start();
try {
zkClient.blockUntilConnected();
if (!zkClient.blockUntilConnected(zookeeperConfig.getMaxWaitTime(), TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Connect zookeeper expire max wait time");
}
} catch (final Exception ex) {
throw new RuntimeException(ex);
}

3
dolphinscheduler-service/src/main/resources/zookeeper.properties

@ -26,4 +26,5 @@ zookeeper.quorum=localhost:2181
#zookeeper.connection.timeout=30000
#zookeeper.retry.base.sleep=100
#zookeeper.retry.max.sleep=30000
#zookeeper.retry.maxtime=10
#zookeeper.retry.maxtime=10
#zookeeper.max.wait.time=10000
Loading…
Cancel
Save