Browse Source

Merge remote-tracking branch 'upstream/dev' into spilit

pull/3/MERGE
lenboo 4 years ago
parent
commit
22729669f9
  1. 4
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java
  2. 21
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java
  3. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  4. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  6. 34
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  7. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  8. 180
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
  9. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
  10. 24
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  11. 13
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
  12. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
  13. 61
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
  14. 65
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java
  15. 57
      dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
  16. BIN
      dolphinscheduler-ui.zip
  17. 30
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue

4
dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java

@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.spi.alert.ShowType;
import org.apache.dolphinscheduler.spi.params.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
import org.apache.dolphinscheduler.spi.params.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.DataType;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
@ -65,10 +64,9 @@ public class EmailAlertChannelFactory implements AlertChannelFactory {
.build();
InputParam mailSmtpPort = InputParam.newBuilder(MailParamsConstants.NAME_MAIL_SMTP_PORT, MailParamsConstants.MAIL_SMTP_PORT)
.setValue(25)
.setValue("25")
.addValidate(Validate.newBuilder()
.setRequired(true)
.setType(DataType.NUMBER.getDataType())
.build())
.build();

21
dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java

@ -91,9 +91,8 @@ public class EmailAlertChannelTest {
InputParam mailSmtpPort = InputParam.newBuilder("serverPort", "smtp.port")
.addValidate(Validate.newBuilder()
.setRequired(true)
.setType(DataType.NUMBER.getDataType())
.build())
.setValue(25)
.setValue("25")
.build();
InputParam mailSender = InputParam.newBuilder("sender", "sender")
@ -102,10 +101,10 @@ public class EmailAlertChannelTest {
.build();
RadioParam enableSmtpAuth = RadioParam.newBuilder("enableSmtpAuth", "smtp.auth")
.addParamsOptions(new ParamsOptions("YES", true, false))
.addParamsOptions(new ParamsOptions("NO", false, false))
.addParamsOptions(new ParamsOptions("YES", "true", false))
.addParamsOptions(new ParamsOptions("NO", "false", false))
.addValidate(Validate.newBuilder().setRequired(true).build())
.setValue(false)
.setValue("false")
.build();
InputParam mailUser = InputParam.newBuilder("user", "user")
@ -119,17 +118,17 @@ public class EmailAlertChannelTest {
.build();
RadioParam enableTls = RadioParam.newBuilder("starttlsEnable", "starttls.enable")
.addParamsOptions(new ParamsOptions("YES", true, false))
.addParamsOptions(new ParamsOptions("NO", false, false))
.addParamsOptions(new ParamsOptions("YES", "true", false))
.addParamsOptions(new ParamsOptions("NO", "false", false))
.addValidate(Validate.newBuilder().setRequired(true).build())
.setValue(true)
.setValue("true")
.build();
RadioParam enableSsl = RadioParam.newBuilder("sslEnable", "smtp.ssl.enable")
.addParamsOptions(new ParamsOptions("YES", true, false))
.addParamsOptions(new ParamsOptions("NO", false, false))
.addParamsOptions(new ParamsOptions("YES", "true", false))
.addParamsOptions(new ParamsOptions("NO", "false", false))
.addValidate(Validate.newBuilder().setRequired(true).build())
.setValue(true)
.setValue("true")
.build();
InputParam sslTrust = InputParam.newBuilder("smtpSslTrust", "smtp.ssl.trust")

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -25,12 +25,14 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import javax.annotation.PostConstruct;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,8 +42,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import javax.annotation.PostConstruct;
@ -73,12 +73,6 @@ public class MasterServer {
*/
private NettyRemotingServer nettyRemotingServer;
/**
* master registry
*/
@Autowired
private MasterRegistry masterRegistry;
/**
* zk master client
*/
@ -117,9 +111,6 @@ public class MasterServer {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
// register
this.masterRegistry.registry();
// self tolerant
this.zkMasterClient.start();
@ -178,7 +169,6 @@ public class MasterServer {
//
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
this.masterRegistry.unRegistry();
this.zkMasterClient.close();
//close quartz
try{

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -62,6 +62,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -124,12 +125,11 @@ public class TaskPriorityQueueConsumer extends Thread {
int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
failedDispatchTasks.clear();
for (int i = 0; i < fetchTaskNum; i++) {
if (taskPriorityQueue.size() <= 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (Objects.isNull(taskPriority)) {
continue;
}
// if not task , blocking here
TaskPriority taskPriority = taskPriorityQueue.take();
boolean dispatchResult = dispatch(taskPriority);
if (!dispatchResult) {
failedDispatchTasks.add(taskPriority);

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -97,7 +97,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private void setTaskCache(TaskExecutionContext taskExecutionContext) {
TaskExecutionContext preTaskCache = new TaskExecutionContext();
preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
taskExecutionContextCacheManager.cacheTaskExecutionContext(preTaskCache);
}
public TaskExecuteProcessor(AlertClientService alertClientService) {

34
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -85,6 +85,8 @@ public abstract class AbstractCommandExecutor {
*/
protected final List<String> logBuffer;
protected boolean logOutputIsScuccess = false;
/**
* SHELL result string
*/
@ -348,18 +350,13 @@ public abstract class AbstractCommandExecutor {
*/
private void parseProcessOutput(Process process) {
String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId());
ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(new Runnable() {
@Override
public void run() {
ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService");
getOutputLogService.submit(() -> {
BufferedReader inReader = null;
try {
inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
long lastFlushTime = System.currentTimeMillis();
logBuffer.add("welcome to use bigdata scheduling system...");
while ((line = inReader.readLine()) != null) {
if (line.startsWith("${setValue(")) {
varPool.append(line.substring("${setValue(".length(), line.length() - 2));
@ -367,15 +364,32 @@ public abstract class AbstractCommandExecutor {
} else {
logBuffer.add(line);
taskResultString = line;
lastFlushTime = flush(lastFlushTime);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
clear();
logOutputIsScuccess = true;
close(inReader);
}
});
getOutputLogService.shutdown();
ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(() -> {
try {
long lastFlushTime = System.currentTimeMillis();
while (logBuffer.size() > 0 || !logOutputIsScuccess) {
if (logBuffer.size() > 0) {
lastFlushTime = flush(lastFlushTime);
} else {
Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
clear();
}
});
parseProcessOutputExecutorService.shutdown();

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
@ -66,6 +67,12 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired
private ProcessService processService;
/**
* master registry
*/
@Autowired
private MasterRegistry masterRegistry;
public void start() {
InterProcessMutex mutex = null;
@ -75,6 +82,9 @@ public class ZKMasterClient extends AbstractZKClient {
mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire();
// Master registry
masterRegistry.registry();
// init system znode
this.initSystemZNode();
@ -98,6 +108,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Override
public void close() {
super.close();
masterRegistry.unRegistry();
}
/**

180
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java

@ -16,39 +16,61 @@
*/
package org.apache.dolphinscheduler.server.worker.shell;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.dolphinscheduler.common.utils.*;
import org.springframework.context.ApplicationContext;
import java.util.Date;
import java.util.List;
/**
* python shell command executor test
*/
@Ignore
@RunWith(PowerMockRunner.class)
@PrepareForTest(OSUtils.class)
@PowerMockIgnore({"javax.management.*"})
public class ShellCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(ShellCommandExecutorTest.class);
private ProcessService processService = null;
private ApplicationContext applicationContext;
@Before
public void before(){
processService = SpringApplicationContext.getBean(ProcessService.class);
public void before() {
applicationContext = PowerMockito.mock(ApplicationContext.class);
processService = PowerMockito.mock(ProcessService.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
}
@Ignore
@Test
public void test() throws Exception {
@ -63,19 +85,18 @@ public class ShellCommandExecutorTest {
taskProps.setTaskInstanceId(7657);
TaskInstance taskInstance = processService.findTaskInstanceById(7657);
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
taskProps.setTaskParams(taskNode.getParams());
// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
// taskProps.setTaskParams(taskNode.getParams());
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
// Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
// taskInstance.getProcessDefinitionId(),
// taskInstance.getProcessInstanceId(),
// taskInstance.getId()));
// AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
@ -91,14 +112,141 @@ public class ShellCommandExecutorTest {
task.handle();
ExecutionStatus status = ExecutionStatus.SUCCESS;
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) {
status = ExecutionStatus.SUCCESS;
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
} else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL) {
status = ExecutionStatus.KILL;
}else {
} else {
status = ExecutionStatus.FAILURE;
}
logger.info(status.toString());
}
@Test
public void testParseProcessOutput() {
Class<AbstractCommandExecutor> shellCommandExecutorClass = AbstractCommandExecutor.class;
try {
Method method = shellCommandExecutorClass.getDeclaredMethod("parseProcessOutput", Process.class);
method.setAccessible(true);
Object[] arg1s = {new Process() {
@Override
public OutputStream getOutputStream() {
return new OutputStream() {
@Override
public void write(int b) throws IOException {
logger.info("unit test");
}
};
}
@Override
public InputStream getInputStream() {
return new InputStream() {
@Override
public int read() throws IOException {
return 0;
}
};
}
@Override
public InputStream getErrorStream() {
return null;
}
@Override
public int waitFor() throws InterruptedException {
return 0;
}
@Override
public int exitValue() {
return 0;
}
@Override
public void destroy() {
logger.info("unit test");
}
} };
method.invoke(new AbstractCommandExecutor(null, new TaskExecutionContext(), logger) {
@Override
protected String buildCommandFilePath() {
return null;
}
@Override
protected String commandInterpreter() {
return null;
}
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
logger.info("unit test");
}
}, arg1s);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testFindAppId() {
Class<AbstractCommandExecutor> shellCommandExecutorClass = AbstractCommandExecutor.class;
try {
Method method = shellCommandExecutorClass.getDeclaredMethod("findAppId", new Class[]{String.class});
method.setAccessible(true);
Object[] arg1s = {"11111"};
String result = (String) method.invoke(new AbstractCommandExecutor(null, null, null) {
@Override
protected String buildCommandFilePath() {
return null;
}
@Override
protected String commandInterpreter() {
return null;
}
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
logger.info("unit test");
}
}, arg1s);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
@Test
public void testConvertFile2List() {
Class<AbstractCommandExecutor> shellCommandExecutorClass = AbstractCommandExecutor.class;
try {
Method method = shellCommandExecutorClass.getDeclaredMethod("convertFile2List", String.class);
method.setAccessible(true);
Object[] arg1s = {"/opt/1.txt"};
List<String> result = (List<String>) method.invoke(new AbstractCommandExecutor(null, null, null) {
@Override
protected String buildCommandFilePath() {
return null;
}
@Override
protected String commandInterpreter() {
return null;
}
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
logger.info("unit test");
}
}, arg1s);
Assert.assertTrue(true);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java

@ -27,11 +27,11 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.sql.DriverManager;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -103,6 +103,8 @@ public class ShellTaskTest {
public void testComplementData() throws Exception {
shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init();
shellCommandExecutor.isSuccessOfYarnState(new ArrayList<>());
shellCommandExecutor.isSuccessOfYarnState(null);
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
shellTask.handle();
}

24
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* Task instances priority queue implementation
@ -39,6 +41,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/
private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
/**
* Lock used for all public operations
*/
private final ReentrantLock lock = new ReentrantLock(true);
/**
* put task instance to priority queue
*
@ -61,6 +68,23 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
return queue.poll();
}
/**
* poll task info with timeout
* <p>
* WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit)
* because this method of override interface used without considering accuracy of timeout
*
* @param timeout
* @param unit
* @return
* @throws TaskPriorityQueueException
* @throws InterruptedException
*/
@Override
public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException {
throw new TaskPriorityQueueException("This operation is not currently supported and suggest to use PriorityBlockingQueue if you want!");
}
/**
* peek taskInfo
*

13
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.concurrent.TimeUnit;
/**
* task priority queue
* @param <T>
@ -41,6 +43,17 @@ public interface TaskPriorityQueue<T> {
*/
T take() throws TaskPriorityQueueException, InterruptedException;
/**
* poll taskInfo with timeout
* @param timeout
* @param unit
* @return
* @throws TaskPriorityQueueException
* @throws InterruptedException
*/
T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException;
/**
* size
*

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Service;
@ -61,6 +62,20 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
return queue.take();
}
/**
* poll taskInfo with timeout
*
* @param timeout
* @param unit
* @return
* @throws TaskPriorityQueueException
* @throws InterruptedException
*/
@Override
public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException {
return queue.poll(timeout,unit);
}
/**
* queue size
*

61
dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java → dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

@ -15,46 +15,76 @@
* limitations under the License.
*/
package queue;
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
/**
* Task instances priority queue implementation
* All the task instances are in the same process instance.
*/
public class PeerTaskInstancePriorityQueueTest {
@Test
public void testPut() throws Exception {
public void put() throws TaskPriorityQueueException {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM);
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
Assert.assertEquals(2,queue.size());
Assert.assertEquals(2, queue.size());
}
@Test
public void testPeek() throws Exception {
public void take() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
queue.peek();
Assert.assertEquals(peekBeforeLength,queue.size());
queue.take();
Assert.assertTrue(queue.size() < peekBeforeLength);
}
@Test
public void poll() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
try {
queue.poll(1000, TimeUnit.MILLISECONDS);
} catch (TaskPriorityQueueException e) {
e.printStackTrace();
}
}
@Test
public void testTake() throws Exception {
public void peek() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
queue.take();
Assert.assertTrue(queue.size() < peekBeforeLength);
queue.peek();
Assert.assertEquals(peekBeforeLength, queue.size());
}
@Test
public void size() throws Exception {
Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size());
}
@Test
public void contains() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
queue.put(taskInstanceMediumPriority);
Assert.assertTrue(queue.contains(taskInstanceMediumPriority));
}
@Test
public void remove() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
queue.put(taskInstanceMediumPriority);
int peekBeforeLength = queue.size();
queue.remove(taskInstanceMediumPriority);
Assert.assertNotEquals(peekBeforeLength, queue.size());
}
/**
@ -66,7 +96,7 @@ public class PeerTaskInstancePriorityQueueTest {
private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
return queue;
@ -85,5 +115,4 @@ public class PeerTaskInstancePriorityQueueTest {
taskInstance.setTaskInstancePriority(priority);
return taskInstance;
}
}

65
dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java → dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java

@ -15,18 +15,19 @@
* limitations under the License.
*/
package queue;
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.common.enums.Priority;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
public class TaskPriorityTest {
public class TaskPriorityQueueImplTest {
@Test
public void testSort() {
@ -80,4 +81,62 @@ public class TaskPriorityTest {
taskPrioritys
);
}
@Test
public void put() throws Exception {
TaskPriorityQueue queue = getPriorityQueue();
Assert.assertEquals(2, queue.size());
}
@Test
public void take() throws Exception {
TaskPriorityQueue queue = getPriorityQueue();
int peekBeforeLength = queue.size();
queue.take();
Assert.assertTrue(queue.size() < peekBeforeLength);
}
@Test
public void poll() throws Exception {
TaskPriorityQueue queue = getPriorityQueue();
int peekBeforeLength = queue.size();
queue.poll(1000, TimeUnit.MILLISECONDS);
queue.poll(1000, TimeUnit.MILLISECONDS);
Assert.assertTrue(queue.size() == 0);
System.out.println(System.currentTimeMillis());
queue.poll(1000, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis());
}
@Test
public void size() throws Exception {
Assert.assertTrue(getPriorityQueue().size() == 2);
}
/**
* get queue
*
* @return queue
* @throws Exception
*/
private TaskPriorityQueue getPriorityQueue() throws Exception {
TaskPriorityQueue queue = new TaskPriorityQueueImpl();
TaskPriority taskInstanceHigPriority = createTaskPriority(Priority.HIGH.getCode(), 1);
TaskPriority taskInstanceMediumPriority = createTaskPriority(Priority.MEDIUM.getCode(), 2);
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
return queue;
}
/**
* create task priority
*
* @param priority
* @param processInstanceId
* @return
*/
private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) {
TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, "default");
return priorityOne;
}
}

57
dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package queue;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.junit.Test;
import static org.junit.Assert.*;
public class TaskUpdateQueueTest {
/**
* test put
*/
@Test
public void testQueue() throws Exception{
/**
* 1_1_2_1_default
* 1_1_2_2_default
* 1_1_0_3_default
* 1_1_0_4_default
*/
TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default");
TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default");
TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default");
TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default");
TaskPriorityQueue queue = new TaskPriorityQueueImpl();
queue.put(taskInfo1);
queue.put(taskInfo2);
queue.put(taskInfo3);
queue.put(taskInfo4);
assertEquals(taskInfo3, queue.take());
assertEquals(taskInfo4, queue.take());
assertEquals(taskInfo1, queue.take());
assertEquals(taskInfo2, queue.take());
}
}

BIN
dolphinscheduler-ui.zip

Binary file not shown.

30
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue

@ -110,21 +110,6 @@
</div>
</div>
</div>
<div class="clearfix list">
<div class="text">
<span>{{$t('Startup parameter')}}</span>
</div>
<div class="cont" style="width: 688px;">
<div style="padding-top: 6px;">
<m-local-params
ref="refLocalParams"
@on-local-params="_onLocalParams"
:udp-list="udpList"
:hide="false">
</m-local-params>
</div>
</div>
</div>
<template v-if="execType">
<div class="clearfix list" style="margin:-6px 0 16px 0">
<div class="text">
@ -156,6 +141,21 @@
</div>
</div>
</template>
<div class="clearfix list">
<div class="text">
<span>{{$t('Startup parameter')}}</span>
</div>
<div class="cont" style="width: 688px;">
<div style="padding-top: 6px;">
<m-local-params
ref="refLocalParams"
@on-local-params="_onLocalParams"
:udp-list="udpList"
:hide="false">
</m-local-params>
</div>
</div>
</div>
<div class="submit">
<el-button type="text" size="small" @click="close()"> {{$t('Cancel')}} </el-button>
<el-button type="primary" size="small" round :loading="spinnerLoading" @click="ok()">{{spinnerLoading ? 'Loading...' : $t('Start')}} </el-button>

Loading…
Cancel
Save