Browse Source

Merge pull request #20 from apache/refactor-worker

task prioriry refator (#2094)
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
9964cf9057
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  3. 146
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
  4. 169
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
  5. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  6. 152
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  7. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  8. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  9. 42
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 102
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
  11. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
  12. 375
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
  13. 50
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
  14. 115
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
  15. 229
      dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
  16. 59
      dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -1004,4 +1004,6 @@ public final class Constants {
* default worker group
*/
public static final String DEFAULT_WORKER_GROUP = "default";
public static final Integer TASK_INFO_LENGTH = 5;
}

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -191,6 +191,11 @@ public class TaskInstance implements Serializable {
*/
private int workerGroupId;
/**
* workerGroup
*/
private String workerGroup;
public ProcessInstance getProcessInstance() {
return processInstance;
}
@ -460,6 +465,14 @@ public class TaskInstance implements Serializable {
this.dependentResult = dependentResult;
}
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
@Override
public String toString() {
return "TaskInstance{" +
@ -492,6 +505,7 @@ public class TaskInstance implements Serializable {
", processInstancePriority=" + processInstancePriority +
", dependentResult='" + dependentResult + '\'' +
", workerGroupId=" + workerGroupId +
", workerGroup='" + workerGroup + '\'' +
'}';
}
}

146
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* task priority info
*/
public class TaskPriority {
/**
* processInstancePriority
*/
private int processInstancePriority;
/**
* processInstanceId
*/
private int processInstanceId;
/**
* taskInstancePriority
*/
private int taskInstancePriority;
/**
* taskId
*/
private int taskId;
/**
* groupName
*/
private String groupName;
/**
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
*/
private String taskPriorityInfo;
public TaskPriority(){}
public TaskPriority(int processInstancePriority,
int processInstanceId,
int taskInstancePriority,
int taskId, String groupName) {
this.processInstancePriority = processInstancePriority;
this.processInstanceId = processInstanceId;
this.taskInstancePriority = taskInstancePriority;
this.taskId = taskId;
this.groupName = groupName;
this.taskPriorityInfo = this.processInstancePriority +
UNDERLINE +
this.processInstanceId +
UNDERLINE +
this.taskInstancePriority +
UNDERLINE +
this.taskId +
UNDERLINE +
this.groupName;
}
public int getProcessInstancePriority() {
return processInstancePriority;
}
public void setProcessInstancePriority(int processInstancePriority) {
this.processInstancePriority = processInstancePriority;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public int getTaskInstancePriority() {
return taskInstancePriority;
}
public void setTaskInstancePriority(int taskInstancePriority) {
this.taskInstancePriority = taskInstancePriority;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskPriorityInfo() {
return taskPriorityInfo;
}
public void setTaskPriorityInfo(String taskPriorityInfo) {
this.taskPriorityInfo = taskPriorityInfo;
}
/**
* taskPriorityInfo convert taskPriority
*
* @param taskPriorityInfo taskPriorityInfo
* @return TaskPriority
*/
public static TaskPriority of(String taskPriorityInfo){
String[] parts = taskPriorityInfo.split(UNDERLINE);
if (parts.length != 4) {
throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo));
}
TaskPriority taskPriority = new TaskPriority(
Integer.parseInt(parts[0]),
Integer.parseInt(parts[1]),
Integer.parseInt(parts[2]),
Integer.parseInt(parts[3]),
parts[4]);
return taskPriority;
}
}

169
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskPriority;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* TaskUpdateQueue consumer
*/
@Component
public class TaskUpdateQueueConsumer extends Thread{
/**
* logger of TaskUpdateQueueConsumer
*/
private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueConsumer.class);
/**
* taskUpdateQueue
*/
@Autowired
private TaskUpdateQueue taskUpdateQueue;
/**
* processService
*/
@Autowired
private ProcessService processService;
/**
* executor dispatcher
*/
@Autowired
private ExecutorDispatcher dispatcher;
@Override
public void run() {
while (Stopper.isRunning()){
try {
if (taskUpdateQueue.size() == 0){
continue;
}
String taskPriorityInfo = taskUpdateQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
dispatch(taskPriority.getTaskId());
}catch (Exception e){
logger.error("dispatcher task error",e);
}
}
}
/**
* TODO dispatch task
*
* @param taskInstanceId taskInstanceId
* @return result
*/
private Boolean dispatch(int taskInstanceId){
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
try {
return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("execute exception", e);
return false;
}
}
/**
* get TaskExecutionContext
* @param taskInstanceId taskInstanceId
* @return TaskExecutionContext
*/
protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId){
TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId);
Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(ExecutionStatus.FAILURE,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null,
taskInstance.getId());
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setExecutePath(getExecLocalPath(taskInstance));
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.create();
}
/**
* get execute local path
*
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
/**
* whehter tenant is null
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if(tenant == null){
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;
}
return false;
}
}

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
@ -66,12 +67,14 @@ public class TaskAckProcessor implements NettyRequestProcessor {
logger.info("taskAckCommand : {}", taskAckCommand);
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
/**
* change Task state
*/
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
taskAckCommand.getHost(),
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId());

152
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -17,26 +17,18 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
import java.util.concurrent.Callable;
@ -71,11 +63,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
*/
protected TaskInstance taskInstance;
/**
* task queue
*/
protected ITaskQueue taskQueue;
/**
* whether need cancel
*/
@ -86,12 +73,10 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
*/
private MasterConfig masterConfig;
/**
* executor dispatcher
* taskUpdateQueue
*/
private ExecutorDispatcher dispatcher;
private TaskUpdateQueue taskUpdateQueue;
/**
* constructor of MasterBaseTaskExecThread
* @param taskInstance task instance
@ -101,11 +86,10 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.processInstance = processInstance;
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class);
this.taskUpdateQueue = new TaskUpdateQueueImpl();
}
/**
@ -123,87 +107,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.cancel = true;
}
/**
* TODO 分发任务
* dispatch task to worker
* @param taskInstance
*/
private Boolean dispatch(TaskInstance taskInstance){
TaskExecutionContext context = getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
try {
return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("execute exception", e);
return false;
}
}
/**
* get TaskExecutionContext
*
* @param taskInstance taskInstance
* @return TaskExecutionContext
*/
protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(ExecutionStatus.FAILURE,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null,
taskInstance.getId());
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setExecutePath(getExecLocalPath(taskInstance));
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.create();
}
/**
* get execute local path
*
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
/**
* whehter tenant is null
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if(tenant == null){
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;
}
return false;
}
/**
* submit master base task exec thread
* @return TaskInstance
@ -268,10 +171,20 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));
return true;
}
logger.info("task ready to submit: {}" , taskInstance);
boolean submitTask = dispatch(taskInstance);
logger.info("task ready to submit: {}", taskInstance);
/**
* taskPriorityInfo
*/
String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(),
taskInstance.getWorkerGroup());
taskUpdateQueue.put(taskPriorityInfo);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
return submitTask;
return true;
}catch (Exception e){
logger.error("submit task Exception: ", e);
logger.error("task error : %s", JSONUtils.toJson(taskInstance));
@ -279,6 +192,33 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
}
/**
* buildTaskPriorityInfo
*
* @param processInstancePriority processInstancePriority
* @param processInstanceId processInstanceId
* @param taskInstancePriority taskInstancePriority
* @param taskInstanceId taskInstanceId
* @param workerGroup workerGroup
* @return TaskPriorityInfo
*/
private String buildTaskPriorityInfo(int processInstancePriority,
int processInstanceId,
int taskInstancePriority,
int taskInstanceId,
String workerGroup){
return processInstancePriority +
UNDERLINE +
processInstanceId +
UNDERLINE +
taskInstancePriority +
UNDERLINE +
taskInstanceId +
UNDERLINE +
workerGroup;
}
/**
* submit wait complete
* @return true

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -182,7 +182,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
}
alreadyKilled = true;
TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setProcessId(taskInstance.getPid());
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
Host host = Host.of(taskInstance.getHost());

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,10 +60,7 @@ public class WorkerServer implements IStoppable {
@Autowired
private ZKWorkerClient zkWorkerClient = null;
/**
* task queue impl
*/
protected ITaskQueue taskQueue;
/**
* fetch task executor service
@ -136,7 +132,7 @@ public class WorkerServer implements IStoppable {
this.zkWorkerClient.init();
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");

42
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -98,11 +97,6 @@ public class ProcessService {
@Autowired
private ProjectMapper projectMapper;
/**
* task queue impl
*/
@Autowired
private ITaskQueue taskQueue;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
* @param logger logger
@ -960,40 +954,7 @@ public class ProcessService {
return taskInstance;
}
/**
* submit task to queue
* @param taskInstance taskInstance
* @return whether submit task to queue success
*/
public Boolean submitTaskToQueue(TaskInstance taskInstance) {
try{
if(taskInstance.isSubProcess()){
return true;
}
if(taskInstance.getState().typeIsFinished()){
logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
}
// task cannot submit when running
if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));
return true;
}
if(checkTaskExistsInTaskQueue(taskInstance)){
logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName()));
return true;
}
logger.info("task ready to queue: {}" , taskInstance);
boolean insertQueueResult = taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );
return insertQueueResult;
}catch (Exception e){
logger.error("submit task to queue Exception: ", e);
logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));
return false;
}
}
/**
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task executed by ip1},${ip2}...
@ -1127,7 +1088,8 @@ public class ProcessService {
String taskZkInfo = taskZkInfo(taskInstance);
return taskQueue.checkTaskExists(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo);
// return taskQueue.checkTaskExists(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo);
return false;
}
/**

102
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java

@ -1,102 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import java.util.List;
import java.util.Set;
public interface ITaskQueue {
/**
* take out all the elements
*
*
* @param key
* @return
*/
List<String> getAllTasks(String key);
/**
* check if has a task
* @param key queue name
* @return true if has; false if not
*/
boolean hasTask(String key);
/**
* check task exists in the task queue or not
*
* @param key queue name
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
*/
boolean checkTaskExists(String key, String task);
/**
* add an element to the queue
*
* @param key queue name
* @param value
*/
boolean add(String key, String value);
/**
* an element pops out of the queue
*
* @param key queue name
* @param n how many elements to poll
* @return
*/
List<String> poll(String key, int n);
/**
* remove a element from queue
* @param key
* @param value
*/
void removeNode(String key, String value);
/**
* add an element to the set
*
* @param key
* @param value
*/
void sadd(String key, String value);
/**
* delete the value corresponding to the key in the set
*
* @param key
* @param value
*/
void srem(String key, String value);
/**
* gets all the elements of the set based on the key
*
* @param key
* @return
*/
Set<String> smembers(String key);
/**
* clear the task queue for use by junit tests only
*/
void delete();
}

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java

@ -40,11 +40,11 @@ public class TaskQueueFactory {
*
* @return instance
*/
public static ITaskQueue getTaskQueueInstance() {
public static TaskUpdateQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
logger.info("task queue impl use zookeeper ");
return SpringApplicationContext.getBean(TaskQueueZkImpl.class);
return SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
}else{
logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
System.exit(-1);

375
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java

@ -1,375 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.service.zk.ZookeeperOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implementation
*/
@Service
public class TaskQueueZkImpl implements ITaskQueue {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
private final ZookeeperOperator zookeeperOperator;
@Autowired
public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) {
this.zookeeperOperator = zookeeperOperator;
try {
String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
for (String key : new String[]{tasksQueuePath,tasksKillPath}){
if (!zookeeperOperator.isExisted(key)){
zookeeperOperator.persist(key, "");
logger.info("create tasks queue parent node success : {}", key);
}
}
} catch (Exception e) {
logger.error("create tasks queue parent node failure", e);
}
}
/**
* get all tasks from tasks queue
* @param key task queue name
* @return
*/
@Override
public List<String> getAllTasks(String key) {
try {
List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
return list;
} catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e);
}
return Collections.emptyList();
}
/**
* check if has a task
* @param key queue name
* @return true if has; false if not
*/
@Override
public boolean hasTask(String key) {
try {
return zookeeperOperator.hasChildren(getTasksPath(key));
} catch (Exception e) {
logger.error("check has task in tasks queue exception",e);
}
return false;
}
/**
* check task exists in the task queue or not
*
* @param key queue name
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
*/
@Override
public boolean checkTaskExists(String key, String task) {
String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task;
return zookeeperOperator.isExisted(taskPath);
}
/**
* add task to tasks queue
*
* @param key task queue name
* @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
*/
@Override
public boolean add(String key, String value){
try {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
zookeeperOperator.persist(taskIdPath, value);
return true;
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
return false;
}
}
/**
* An element pops out of the queue <p>
* note:
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
*
* @param key task queue name
* @param tasksNum how many elements to poll
* @return the task ids to be executed
*/
@Override
public List<String> poll(String key, int tasksNum) {
try{
List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
if(list != null && list.size() > 0){
String workerIp = OSUtils.getHost();
String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
int size = list.size();
Set<String> taskTreeSet = new TreeSet<>(new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
String s1 = o1;
String s2 = o2;
String[] s1Array = s1.split(Constants.UNDERLINE);
if(s1Array.length>4){
// warning: if this length > 5, need to be changed
s1 = s1.substring(0, s1.lastIndexOf(Constants.UNDERLINE) );
}
String[] s2Array = s2.split(Constants.UNDERLINE);
if(s2Array.length>4){
// warning: if this length > 5, need to be changed
s2 = s2.substring(0, s2.lastIndexOf(Constants.UNDERLINE) );
}
return s1.compareTo(s2);
}
});
for (int i = 0; i < size; i++) {
String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
//forward compatibility
if(taskDetailArrs.length >= 4){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
if(taskDetailArrs.length > 4){
String taskHosts = taskDetailArrs[4];
//task can assign to any worker host if equals default ip value of worker server
if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){
String[] taskHostsArr = taskHosts.split(Constants.COMMA);
if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
continue;
}
}
formatTask += Constants.UNDERLINE + taskDetailArrs[4];
}
taskTreeSet.add(formatTask);
}
}
List<String> tasksList = getTasksListFromTreeSet(tasksNum, taskTreeSet);
logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(tasksList.toArray()), size - tasksList.size());
return tasksList;
}else{
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
}
return Collections.emptyList();
}
/**
* get task list from tree set
*
* @param tasksNum
* @param taskTreeSet
*/
public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
Iterator<String> iterator = taskTreeSet.iterator();
int j = 0;
List<String> tasksList = new ArrayList<>(tasksNum);
while(iterator.hasNext()){
if(j++ >= tasksNum){
break;
}
String task = iterator.next();
tasksList.add(getOriginTaskFormat(task));
}
return tasksList;
}
/**
* format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* processInstanceId and task id need to be convert to int.
* @param formatTask
* @return
*/
private String getOriginTaskFormat(String formatTask){
String[] taskArray = formatTask.split(Constants.UNDERLINE);
if(taskArray.length< 4){
return formatTask;
}
int processInstanceId = Integer.parseInt(taskArray[1]);
int taskId = Integer.parseInt(taskArray[3]);
StringBuilder sb = new StringBuilder(50);
String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[2], taskId);
sb.append(destTask);
if(taskArray.length > 4){
for(int index = 4; index < taskArray.length; index++){
sb.append(Constants.UNDERLINE).append(taskArray[index]);
}
}
return sb.toString();
}
@Override
public void removeNode(String key, String nodeValue){
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
String taskIdPath = tasksQueuePath + nodeValue;
logger.info("removeNode task {}", taskIdPath);
try{
zookeeperOperator.remove(taskIdPath);
}catch(Exception e){
logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e);
}
}
/**
* In order to be compatible with redis implementation
*
* To be compatible with the redis implementation, add an element to the set
* @param key The key is the kill/cancel queue path name
* @param value host-taskId The name of the zookeeper node
*/
@Override
public void sadd(String key,String value) {
try {
if(value != null && value.trim().length() > 0){
String path = getTasksPath(key) + Constants.SINGLE_SLASH;
if(!zookeeperOperator.isExisted(path + value)){
zookeeperOperator.persist(path + value,value);
logger.info("add task:{} to tasks set ",value);
} else{
logger.info("task {} exists in tasks set ",value);
}
}else{
logger.warn("add host-taskId:{} to tasks set is empty ",value);
}
} catch (Exception e) {
logger.error("add task to tasks set exception",e);
}
}
/**
* delete the value corresponding to the key in the set
* @param key The key is the kill/cancel queue path name
* @param value host-taskId-taskType The name of the zookeeper node
*/
@Override
public void srem(String key, String value) {
try{
String path = getTasksPath(key) + Constants.SINGLE_SLASH;
zookeeperOperator.remove(path + value);
}catch(Exception e){
logger.error(String.format("delete task:" + value + " exception"),e);
}
}
/**
* Gets all the elements of the set based on the key
* @param key The key is the kill/cancel queue path name
* @return
*/
@Override
public Set<String> smembers(String key) {
try {
List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
return new HashSet<>(list);
} catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e);
}
return Collections.emptySet();
}
/**
* Clear the task queue of zookeeper node
*/
@Override
public void delete(){
try {
String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
for (String key : new String[]{tasksQueuePath,tasksKillPath}){
if (zookeeperOperator.isExisted(key)){
List<String> list = zookeeperOperator.getChildrenKeys(key);
for (String task : list) {
zookeeperOperator.remove(key + Constants.SINGLE_SLASH + task);
logger.info("delete task from tasks queue : {}/{} ", key, task);
}
}
}
} catch (Exception e) {
logger.error("delete all tasks in tasks queue failure", e);
}
}
/**
* Get the task queue path
* @param key task queue name
* @return
*/
public String getTasksPath(String key){
return zookeeperOperator.getZookeeperConfig().getDsRoot() + Constants.SINGLE_SLASH + key;
}
}

50
dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java

@ -14,35 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package queue;
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.junit.*;
/**
* base task queue test for only start zk server once
*/
@Ignore
public class BaseTaskQueueTest {
public interface TaskUpdateQueue {
protected static ITaskQueue tasksQueue = null;
/**
* put task info
*
* @param taskInfo taskInfo
* @throws Exception
*/
void put(String taskInfo) throws Exception;
@BeforeClass
public static void setup() {
ZKServer.start();
tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
}
/**
* take taskInfo
* @return taskInfo
* @throws Exception
*/
String take()throws Exception;
@AfterClass
public static void tearDown() {
tasksQueue.delete();
ZKServer.stop();
}
@Test
public void tasksQueueNotNull(){
Assert.assertNotNull(tasksQueue);
}
}
/**
* size
*
* @return size
* @throws Exception
*/
int size() throws Exception;
}

115
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.PriorityBlockingQueue;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implementation
*/
@Service
public class TaskUpdateQueueImpl implements TaskUpdateQueue {
private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueImpl.class);
/**
* queue size
*/
private static final Integer QUEUE_MAX_SIZE = 100;
/**
* queue
*/
private PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
/**
* put task takePriorityInfo
*
* @param taskPriorityInfo takePriorityInfo
* @throws Exception
*/
@Override
public void put(String taskPriorityInfo) throws Exception {
if (QUEUE_MAX_SIZE.equals(queue.size())){
//TODO need persist db , then load from db to queue when queue size is zero
logger.error("queue is full...");
return;
}
queue.put(taskPriorityInfo);
}
/**
* take taskInfo
* @return taskInfo
* @throws Exception
*/
@Override
public String take() throws Exception {
return queue.take();
}
/**
* queue size
* @return size
* @throws Exception
*/
@Override
public int size() throws Exception {
return queue.size();
}
/**
* TaskInfoComparator
*/
private class TaskInfoComparator implements Comparator<String>{
/**
* compare o1 o2
* @param o1 o1
* @param o2 o2
* @return compare result
*/
@Override
public int compare(String o1, String o2) {
String s1 = o1;
String s2 = o2;
String[] s1Array = s1.split(UNDERLINE);
if(s1Array.length > TASK_INFO_LENGTH){
// warning: if this length > 5, need to be changed
s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
}
String[] s2Array = s2.split(UNDERLINE);
if(s2Array.length > TASK_INFO_LENGTH){
// warning: if this length > 5, need to be changed
s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
}
return s1.compareTo(s2);
}
}
}

229
dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java

@ -1,229 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.*;
/**
* task queue test
*/
@Ignore
public class TaskQueueZKImplTest extends BaseTaskQueueTest {
@Before
public void before(){
//clear all data
tasksQueue.delete();
}
@After
public void after(){
//clear all data
tasksQueue.delete();
}
/**
* test take out all the elements
*/
@Test
public void getAllTasks(){
//add
init();
// get all
List<String> allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertEquals(allTasks.size(),2);
//delete all
tasksQueue.delete();
allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertEquals(allTasks.size(),0);
}
@Test
public void hasTask(){
init();
boolean hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertTrue(hasTask);
//delete all
tasksQueue.delete();
hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertFalse(hasTask);
}
/**
* test check task exists in the task queue or not
*/
@Test
public void checkTaskExists(){
String task= "1_0_1_1_-1";
//add
init();
// check Exist true
boolean taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
assertTrue(taskExists);
//remove task
tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
// check Exist false
taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
assertFalse(taskExists);
}
/**
* test add element to the queue
*/
@Test
public void add(){
//add
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost()));
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_2_1_1_" + IpUtils.ipToLong(OSUtils.getHost()) + 10);
List<String> tasks = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1);
if(tasks.size() <= 0){
return;
}
//pop
String node1 = tasks.get(0);
assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost()));
}
/**
* test element pops out of the queue
*/
@Test
public void poll(){
//add
init();
List<String> taskList = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 2);
assertEquals(taskList.size(),2);
assertEquals(taskList.get(0),"0_1_1_1_-1");
assertEquals(taskList.get(1),"1_0_1_1_-1");
}
/**
* test remove element from queue
*/
@Test
public void removeNode(){
String task = "1_0_1_1_-1";
//add
init();
tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
assertFalse(tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task));
}
/**
* test add an element to the set
*/
@Test
public void sadd(){
String task = "1_0_1_1_-1";
tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
//check size
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
}
/**
* test delete the value corresponding to the key in the set
*/
@Test
public void srem(){
String task = "1_0_1_1_-1";
tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
//check size
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
//remove and get size
tasksQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0);
}
/**
* test gets all the elements of the set based on the key
*/
@Test
public void smembers(){
//first init
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0);
//add
String task = "1_0_1_1_-1";
tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
//check size
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1);
//add
task = "0_1_1_1_";
tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
//check size
assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),2);
}
/**
* init data
*/
private void init(){
//add
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
}
/**
* test one million data from zookeeper queue
*/
@Ignore
@Test
public void extremeTest(){
int total = 30 * 10000;
for(int i = 0; i < total; i++) {
for(int j = 0; j < total; j++) {
//${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%d_%s_%d", i, i + 1, j, j == 0 ? 0 : j + new Random().nextInt(100));
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, formatTask);
}
}
String node1 = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1).get(0);
assertEquals(node1,"0");
}
}

59
dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package queue;
import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
import org.junit.Test;
import static org.junit.Assert.*;
public class TaskUpdateQueueTest {
/**
* test put
*/
@Test
public void testQueue() throws Exception{
// ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
/**
* 1_1_2_1_default
* 1_1_2_2_default
* 1_1_0_3_default
* 1_1_0_4_default
*/
String taskInfo1 = "1_1_2_1_default";
String taskInfo2 = "1_1_2_2_default";
String taskInfo3 = "1_1_0_3_default";
String taskInfo4 = "1_1_0_4_default";
TaskUpdateQueue queue = new TaskUpdateQueueImpl();
queue.put(taskInfo1);
queue.put(taskInfo2);
queue.put(taskInfo3);
queue.put(taskInfo4);
assertEquals("1_1_0_3_default", queue.take());
assertEquals("1_1_0_4_default", queue.take());
assertEquals("1_1_2_1_default",queue.take());
assertEquals("1_1_2_2_default",queue.take());
}
}
Loading…
Cancel
Save