Browse Source

[1.3.6-prepare][cherry-pick]support distributed tracing #4270 (#4771)

Kirs 4 years ago committed by GitHub
parent
commit
c2d1145366
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  2. 33
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
  3. 7
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
  4. 14
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
  5. 56
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java
  6. 26
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
  7. 19
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  8. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  9. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  10. 27
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  11. 44
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
  12. 97
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
  13. 53
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
  14. 81
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityTest.java
  15. 23
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueTest.java
  16. 1
      pom.xml

37
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -16,6 +16,8 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
@ -91,6 +93,21 @@ public class JSONUtils {
return null; return null;
} }
/**
* deserialize
*
* @param src byte array
* @param clazz class
* @param <T> deserialize type
* @return deserialize type
*/
public static <T> T parseObject(byte[] src, Class<T> clazz) {
if (src == null) {
return null;
}
String json = new String(src, UTF_8);
return parseObject(json, clazz);
}
/** /**
* json to list * json to list
@ -114,6 +131,26 @@ public class JSONUtils {
} }
/**
* serialize to json byte
*
* @param obj object
* @param <T> object type
* @return byte array
*/
public static <T> byte[] toJsonByteArray(T obj) {
if (obj == null) {
return null;
}
String json = "";
try {
json = toJsonString(obj);
} catch (Exception e) {
logger.error("json serialize exception.", e);
}
return json.getBytes(UTF_8);
}
/** /**
* check json object valid * check json object valid

33
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java

@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.ReplayingDecoder;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandContext;
import org.apache.dolphinscheduler.remote.command.CommandHeader; import org.apache.dolphinscheduler.remote.command.CommandHeader;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -51,16 +52,34 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
switch (state()){ switch (state()){
case MAGIC: case MAGIC:
checkMagic(in.readByte()); checkMagic(in.readByte());
checkpoint(State.VERSION);
// fallthru
case VERSION:
checkVersion(in.readByte());
checkpoint(State.COMMAND); checkpoint(State.COMMAND);
// fallthru
case COMMAND: case COMMAND:
commandHeader.setType(in.readByte()); commandHeader.setType(in.readByte());
checkpoint(State.OPAQUE); checkpoint(State.OPAQUE);
// fallthru
case OPAQUE: case OPAQUE:
commandHeader.setOpaque(in.readLong()); commandHeader.setOpaque(in.readLong());
checkpoint(State.CONTEXT_LENGTH);
// fallthru
case CONTEXT_LENGTH:
commandHeader.setContextLength(in.readInt());
checkpoint(State.CONTEXT);
// fallthru
case CONTEXT:
byte[] context = new byte[commandHeader.getContextLength()];
in.readBytes(context);
commandHeader.setContext(context);
checkpoint(State.BODY_LENGTH); checkpoint(State.BODY_LENGTH);
// fallthru
case BODY_LENGTH: case BODY_LENGTH:
commandHeader.setBodyLength(in.readInt()); commandHeader.setBodyLength(in.readInt());
checkpoint(State.BODY); checkpoint(State.BODY);
// fallthru
case BODY: case BODY:
byte[] body = new byte[commandHeader.getBodyLength()]; byte[] body = new byte[commandHeader.getBodyLength()];
in.readBytes(body); in.readBytes(body);
@ -68,6 +87,7 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
Command packet = new Command(); Command packet = new Command();
packet.setType(commandType(commandHeader.getType())); packet.setType(commandType(commandHeader.getType()));
packet.setOpaque(commandHeader.getOpaque()); packet.setOpaque(commandHeader.getOpaque());
packet.setContext(CommandContext.valueOf(commandHeader.getContext()));
packet.setBody(body); packet.setBody(body);
out.add(packet); out.add(packet);
// //
@ -99,10 +119,23 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
} }
} }
/**
* check version
* @param version
*/
private void checkVersion(byte version) {
if (version != Command.VERSION) {
throw new IllegalArgumentException("illegal protocol [version]" + version);
}
}
enum State{ enum State{
MAGIC, MAGIC,
VERSION,
COMMAND, COMMAND,
OPAQUE, OPAQUE,
CONTEXT_LENGTH,
CONTEXT,
BODY_LENGTH, BODY_LENGTH,
BODY; BODY;
} }

7
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java

@ -42,11 +42,18 @@ public class NettyEncoder extends MessageToByteEncoder<Command> {
throw new Exception("encode msg is null"); throw new Exception("encode msg is null");
} }
out.writeByte(Command.MAGIC); out.writeByte(Command.MAGIC);
out.writeByte(Command.VERSION);
out.writeByte(msg.getType().ordinal()); out.writeByte(msg.getType().ordinal());
out.writeLong(msg.getOpaque()); out.writeLong(msg.getOpaque());
writeContext(msg, out);
out.writeInt(msg.getBody().length); out.writeInt(msg.getBody().length);
out.writeBytes(msg.getBody()); out.writeBytes(msg.getBody());
} }
private void writeContext(Command msg, ByteBuf out) {
byte[] headerBytes = msg.getContext().toBytes();
out.writeInt(headerBytes.length);
out.writeBytes(headerBytes);
}
} }

14
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java

@ -28,6 +28,7 @@ public class Command implements Serializable {
private static final AtomicLong REQUEST_ID = new AtomicLong(1); private static final AtomicLong REQUEST_ID = new AtomicLong(1);
public static final byte MAGIC = (byte) 0xbabe; public static final byte MAGIC = (byte) 0xbabe;
public static final byte VERSION = 0;
public Command(){ public Command(){
this.opaque = REQUEST_ID.getAndIncrement(); this.opaque = REQUEST_ID.getAndIncrement();
@ -47,6 +48,11 @@ public class Command implements Serializable {
*/ */
private long opaque; private long opaque;
/**
* request context
*/
private CommandContext context = new CommandContext();
/** /**
* data body * data body
*/ */
@ -76,6 +82,14 @@ public class Command implements Serializable {
this.body = body; this.body = body;
} }
public CommandContext getContext() {
return context;
}
public void setContext(CommandContext context) {
this.context = context;
}
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 31; final int prime = 31;

56
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* command context
*/
public class CommandContext implements Serializable {
private Map<String, String> items = new LinkedHashMap<>();
public Map<String, String> getItems() {
return items;
}
public void setItems(Map<String, String> items) {
this.items = items;
}
public void put(String key, String value) {
items.put(key, value);
}
public String get(String key) {
return items.get(key);
}
public byte[] toBytes() {
return JSONUtils.toJsonByteArray(this);
}
public static CommandContext valueOf(byte[] src) {
return JSONUtils.parseObject(src, CommandContext.class);
}
}

26
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java

@ -33,6 +33,16 @@ public class CommandHeader implements Serializable {
*/ */
private long opaque; private long opaque;
/**
* context length
*/
private int contextLength;
/**
* context
*/
private byte[] context;
/** /**
* body length * body length
*/ */
@ -61,4 +71,20 @@ public class CommandHeader implements Serializable {
public void setOpaque(long opaque) { public void setOpaque(long opaque) {
this.opaque = opaque; this.opaque = opaque;
} }
public int getContextLength() {
return contextLength;
}
public void setContextLength(int contextLength) {
this.contextLength = contextLength;
}
public byte[] getContext() {
return context;
}
public void setContext(byte[] context) {
this.context = context;
}
} }

19
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -67,7 +68,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* taskUpdateQueue * taskUpdateQueue
*/ */
@Autowired @Autowired
private TaskPriorityQueue<String> taskPriorityQueue; private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
/** /**
* processService * processService
@ -96,7 +97,7 @@ public class TaskPriorityQueueConsumer extends Thread{
@Override @Override
public void run() { public void run() {
List<String> failedDispatchTasks = new ArrayList<>(); List<TaskPriority> failedDispatchTasks = new ArrayList<>();
while (Stopper.isRunning()){ while (Stopper.isRunning()){
try { try {
int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
@ -107,15 +108,14 @@ public class TaskPriorityQueueConsumer extends Thread{
continue; continue;
} }
// if not task , blocking here // if not task , blocking here
String taskPriorityInfo = taskPriorityQueue.take(); TaskPriority taskPriority = taskPriorityQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); boolean dispatchResult = dispatch(taskPriority);
boolean dispatchResult = dispatch(taskPriority.getTaskId());
if(!dispatchResult){ if(!dispatchResult){
failedDispatchTasks.add(taskPriorityInfo); failedDispatchTasks.add(taskPriority);
} }
} }
if (!failedDispatchTasks.isEmpty()) { if (!failedDispatchTasks.isEmpty()) {
for (String dispatchFailedTask : failedDispatchTasks) { for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask); taskPriorityQueue.put(dispatchFailedTask);
} }
// If there are tasks in a cycle that cannot find the worker group, // If there are tasks in a cycle that cannot find the worker group,
@ -134,12 +134,13 @@ public class TaskPriorityQueueConsumer extends Thread{
/** /**
* dispatch task * dispatch task
* *
* @param taskInstanceId taskInstanceId * @param taskPriority taskPriority
* @return result * @return result
*/ */
protected boolean dispatch(int taskInstanceId) { protected boolean dispatch(TaskPriority taskPriority) {
boolean result = false; boolean result = false;
try { try {
int taskInstanceId = taskPriority.getTaskId();
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -32,11 +32,11 @@ import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
import java.util.Date; import java.util.Date;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -216,14 +216,14 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
logger.info("task ready to submit: {}", taskInstance); logger.info("task ready to submit: {}", taskInstance);
/** /**
* taskPriorityInfo * taskPriority
*/ */
String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(), TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(), taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(), taskInstance.getId(),
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
taskUpdateQueue.put(taskPriorityInfo); taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()) ); logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
return true; return true;
}catch (Exception e){ }catch (Exception e){
@ -234,33 +234,27 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
/** /**
* buildTaskPriorityInfo * buildTaskPriority
* *
* @param processInstancePriority processInstancePriority * @param processInstancePriority processInstancePriority
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @param taskInstancePriority taskInstancePriority * @param taskInstancePriority taskInstancePriority
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
* @param workerGroup workerGroup * @param workerGroup workerGroup
* @return TaskPriorityInfo * @return TaskPriority
*/ */
private String buildTaskPriorityInfo(int processInstancePriority, private TaskPriority buildTaskPriority(int processInstancePriority,
int processInstanceId, int processInstanceId,
int taskInstancePriority, int taskInstancePriority,
int taskInstanceId, int taskInstanceId,
String workerGroup){ String workerGroup){
return processInstancePriority + return new TaskPriority(processInstancePriority, processInstanceId,
UNDERLINE + taskInstancePriority, taskInstanceId, workerGroup);
processInstanceId +
UNDERLINE +
taskInstancePriority +
UNDERLINE +
taskInstanceId +
UNDERLINE +
workerGroup;
} }
/** /**
* submit wait complete * submit wait complete
*
* @return true * @return true
*/ */
protected Boolean submitWaitComplete(){ protected Boolean submitWaitComplete(){

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -114,7 +114,6 @@ public class MasterSchedulerService extends Thread {
public void run() { public void run() {
logger.info("master scheduler started"); logger.info("master scheduler started");
while (Stopper.isRunning()){ while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try { try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if(!runCheckFlag) { if(!runCheckFlag) {
@ -122,7 +121,17 @@ public class MasterSchedulerService extends Thread {
continue; continue;
} }
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
scheduleProcess();
}
} catch (Exception e) {
logger.error("master scheduler thread error", e);
}
}
}
private void scheduleProcess() throws Exception {
InterProcessMutex mutex = null;
try {
mutex = zkMasterClient.blockAcquireMutex(); mutex = zkMasterClient.blockAcquireMutex();
int activeCount = masterExecService.getActiveCount(); int activeCount = masterExecService.getActiveCount();
@ -138,7 +147,12 @@ public class MasterSchedulerService extends Thread {
this.masterConfig.getMasterExecThreads() - activeCount, command); this.masterConfig.getMasterExecThreads() - activeCount, command);
if (processInstance != null) { if (processInstance != null) {
logger.info("start master exec thread , split DAG ..."); logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient)); masterExecService.execute(
new MasterExecThread(
processInstance
, processService
, nettyRemotingClient
));
} }
}catch (Exception e){ }catch (Exception e){
logger.error("scan command error ", e); logger.error("scan command error ", e);
@ -148,14 +162,10 @@ public class MasterSchedulerService extends Thread {
//indicate that no command ,sleep for 1s //indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} }
}
} catch (Exception e){
logger.error("master scheduler thread error",e);
} finally{ } finally{
zkMasterClient.releaseMutex(mutex); zkMasterClient.releaseMutex(mutex);
} }
} }
}
private String getLocalAddress(){ private String getLocalAddress(){
return OSUtils.getAddr(masterConfig.getListenPort()); return OSUtils.getAddr(masterConfig.getListenPort());

27
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
@ -61,7 +62,7 @@ public class TaskPriorityQueueConsumerTest {
@Autowired @Autowired
private TaskPriorityQueue taskPriorityQueue; private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
@Autowired @Autowired
private TaskPriorityQueueConsumer taskPriorityQueueConsumer; private TaskPriorityQueueConsumer taskPriorityQueueConsumer;
@ -131,9 +132,8 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefine(processDefinition); taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
taskPriorityQueue.put("2_1_2_1_default");
TimeUnit.SECONDS.sleep(10); TimeUnit.SECONDS.sleep(10);
@ -169,7 +169,8 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1); processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition); taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default"); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource(); DataSource dataSource = new DataSource();
dataSource.setId(1); dataSource.setId(1);
@ -232,7 +233,8 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1); processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition); taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default"); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource(); DataSource dataSource = new DataSource();
dataSource.setId(80); dataSource.setId(80);
@ -299,7 +301,8 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1); processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition); taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default"); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource(); DataSource dataSource = new DataSource();
dataSource.setId(1); dataSource.setId(1);
@ -391,7 +394,8 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
taskPriorityQueue.put("2_1_2_1_NoWorkGroup"); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
taskPriorityQueue.put(taskPriority);
TimeUnit.SECONDS.sleep(10); TimeUnit.SECONDS.sleep(10);
@ -444,7 +448,9 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
boolean res = taskPriorityQueueConsumer.dispatch(1); TaskPriority taskPriority = new TaskPriority();
taskPriority.setTaskId(1);
boolean res = taskPriorityQueueConsumer.dispatch(taskPriority);
Assert.assertFalse(res); Assert.assertFalse(res);
} }
@ -611,7 +617,8 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
taskPriorityQueue.put("2_1_2_1_NoWorkGroup"); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
taskPriorityQueue.put(taskPriority);
taskPriorityQueueConsumer.run(); taskPriorityQueueConsumer.run();

44
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.exceptions;
/**
* task priority queue exception
*/
public class TaskPriorityQueueException extends Exception {
/**
* Construct a new runtime exception with the detail message
*
* @param message message
*/
public TaskPriorityQueueException(String message) {
super(message);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param message message
* @param cause cause
*/
public TaskPriorityQueueException(String message, Throwable cause) {
super(message, cause);
}
}

97
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java

@ -15,14 +15,15 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.entity; package org.apache.dolphinscheduler.service.queue;
import static org.apache.dolphinscheduler.common.Constants.*; import java.util.Map;
import java.util.Objects;
/** /**
* task priority info * task priority info
*/ */
public class TaskPriority { public class TaskPriority implements Comparable<TaskPriority> {
/** /**
* processInstancePriority * processInstancePriority
@ -50,9 +51,9 @@ public class TaskPriority {
private String groupName; private String groupName;
/** /**
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName} * context
*/ */
private String taskPriorityInfo; private Map<String, String> context;
public TaskPriority(){} public TaskPriority(){}
@ -65,15 +66,6 @@ public class TaskPriority {
this.taskInstancePriority = taskInstancePriority; this.taskInstancePriority = taskInstancePriority;
this.taskId = taskId; this.taskId = taskId;
this.groupName = groupName; this.groupName = groupName;
this.taskPriorityInfo = this.processInstancePriority +
UNDERLINE +
this.processInstanceId +
UNDERLINE +
this.taskInstancePriority +
UNDERLINE +
this.taskId +
UNDERLINE +
this.groupName;
} }
public int getProcessInstancePriority() { public int getProcessInstancePriority() {
@ -104,6 +96,10 @@ public class TaskPriority {
return taskId; return taskId;
} }
public Map<String, String> getContext() {
return context;
}
public void setTaskId(int taskId) { public void setTaskId(int taskId) {
this.taskId = taskId; this.taskId = taskId;
} }
@ -116,32 +112,61 @@ public class TaskPriority {
this.groupName = groupName; this.groupName = groupName;
} }
public String getTaskPriorityInfo() { public void setContext(Map<String, String> context) {
return taskPriorityInfo; this.context = context;
} }
public void setTaskPriorityInfo(String taskPriorityInfo) { @Override
this.taskPriorityInfo = taskPriorityInfo; public int compareTo(TaskPriority other) {
} if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
return 1;
}
if (this.getProcessInstancePriority() < other.getProcessInstancePriority()) {
return -1;
}
/** if (this.getProcessInstanceId() > other.getProcessInstanceId()) {
* taskPriorityInfo convert taskPriority return 1;
* }
* @param taskPriorityInfo taskPriorityInfo if (this.getProcessInstanceId() < other.getProcessInstanceId()) {
* @return TaskPriority return -1;
*/ }
public static TaskPriority of(String taskPriorityInfo){
String[] parts = taskPriorityInfo.split(UNDERLINE); if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) {
return 1;
}
if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) {
return -1;
}
if (this.getTaskId() > other.getTaskId()) {
return 1;
}
if (this.getTaskId() < other.getTaskId()) {
return -1;
}
if (parts.length != 5) { return this.getGroupName().compareTo(other.getGroupName());
throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo)); }
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
} }
TaskPriority taskPriority = new TaskPriority( TaskPriority that = (TaskPriority) o;
Integer.parseInt(parts[0]), return processInstancePriority == that.processInstancePriority
Integer.parseInt(parts[1]), && processInstanceId == that.processInstanceId
Integer.parseInt(parts[2]), && taskInstancePriority == that.taskInstancePriority
Integer.parseInt(parts[3]), && taskId == that.taskId
parts[4]); && Objects.equals(groupName, that.groupName);
return taskPriority; }
@Override
public int hashCode() {
return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName);
} }
} }

53
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -16,21 +16,16 @@
*/ */
package org.apache.dolphinscheduler.service.queue; package org.apache.dolphinscheduler.service.queue;
import org.springframework.stereotype.Service;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH; import org.springframework.stereotype.Service;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
/** /**
* A singleton of a task queue implemented with zookeeper * A singleton of a task queue implemented with zookeeper
* tasks queue implementation * tasks queue implementation
*/ */
@Service @Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> { public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
/** /**
* queue size * queue size
*/ */
@ -39,67 +34,35 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
/** /**
* queue * queue
*/ */
private PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); private PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE);
/** /**
* put task takePriorityInfo * put task takePriorityInfo
* *
* @param taskPriorityInfo takePriorityInfo * @param taskPriorityInfo takePriorityInfo
* @throws Exception
*/ */
@Override @Override
public void put(String taskPriorityInfo) throws Exception { public void put(TaskPriority taskPriorityInfo) {
queue.put(taskPriorityInfo); queue.put(taskPriorityInfo);
} }
/** /**
* take taskInfo * take taskInfo
*
* @return taskInfo * @return taskInfo
* @throws Exception
*/ */
@Override @Override
public String take() throws Exception { public TaskPriority take() throws InterruptedException {
return queue.take(); return queue.take();
} }
/** /**
* queue size * queue size
*
* @return size * @return size
* @throws Exception
*/ */
@Override @Override
public int size() throws Exception { public int size() {
return queue.size(); return queue.size();
} }
/**
* TaskInfoComparator
*/
private class TaskInfoComparator implements Comparator<String>{
/**
* compare o1 o2
* @param o1 o1
* @param o2 o2
* @return compare result
*/
@Override
public int compare(String o1, String o2) {
String s1 = o1;
String s2 = o2;
String[] s1Array = s1.split(UNDERLINE);
if(s1Array.length > TASK_INFO_LENGTH){
// warning: if this length > 5, need to be changed
s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
}
String[] s2Array = s2.split(UNDERLINE);
if(s2Array.length > TASK_INFO_LENGTH){
// warning: if this length > 5, need to be changed
s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
}
return s1.compareTo(s2);
}
}
} }

81
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityTest.java

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class TaskPriorityTest {
@Test
public void testSort() {
TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default");
TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default");
TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default");
List<TaskPriority> taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPrioritys
);
priorityOne = new TaskPriority(0, 1, 0, 0, "default");
priorityTwo = new TaskPriority(0, 2, 0, 0, "default");
priorityThree = new TaskPriority(0, 3, 0, 0, "default");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPrioritys
);
priorityOne = new TaskPriority(0, 0, 1, 0, "default");
priorityTwo = new TaskPriority(0, 0, 2, 0, "default");
priorityThree = new TaskPriority(0, 0, 3, 0, "default");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPrioritys
);
priorityOne = new TaskPriority(0, 0, 0, 1, "default");
priorityTwo = new TaskPriority(0, 0, 0, 2, "default");
priorityThree = new TaskPriority(0, 0, 0, 3, "default");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPrioritys
);
priorityOne = new TaskPriority(0, 0, 0, 0, "default_1");
priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2");
priorityThree = new TaskPriority(0, 0, 0, 0, "default_3");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPrioritys
);
}
}

23
dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java → dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueTest.java

@ -15,10 +15,8 @@
* limitations under the License. * limitations under the License.
*/ */
package queue; package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -31,19 +29,16 @@ public class TaskUpdateQueueTest {
@Test @Test
public void testQueue() throws Exception{ public void testQueue() throws Exception{
// ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
/** /**
* 1_1_2_1_default * 1_1_2_1_default
* 1_1_2_2_default * 1_1_2_2_default
* 1_1_0_3_default * 1_1_0_3_default
* 1_1_0_4_default * 1_1_0_4_default
*/ */
TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default");
String taskInfo1 = "1_1_2_1_default"; TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default");
String taskInfo2 = "1_1_2_2_default"; TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default");
String taskInfo3 = "1_1_0_3_default"; TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default");
String taskInfo4 = "1_1_0_4_default";
TaskPriorityQueue queue = new TaskPriorityQueueImpl(); TaskPriorityQueue queue = new TaskPriorityQueueImpl();
queue.put(taskInfo1); queue.put(taskInfo1);
@ -51,9 +46,9 @@ public class TaskUpdateQueueTest {
queue.put(taskInfo3); queue.put(taskInfo3);
queue.put(taskInfo4); queue.put(taskInfo4);
assertEquals("1_1_0_3_default", queue.take()); assertEquals(taskInfo3, queue.take());
assertEquals("1_1_0_4_default", queue.take()); assertEquals(taskInfo4, queue.take());
assertEquals("1_1_2_1_default",queue.take()); assertEquals(taskInfo1, queue.take());
assertEquals("1_1_2_2_default",queue.take()); assertEquals(taskInfo2, queue.take());
} }
} }

1
pom.xml

@ -830,6 +830,7 @@
<include>**/service/zk/DefaultEnsembleProviderTest.java</include> <include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include> <include>**/service/zk/ZKServerTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include> <include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/queue/TaskPriorityTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include> <include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<include>**/dao/mapper/ErrorCommandMapperTest.java</include> <include>**/dao/mapper/ErrorCommandMapperTest.java</include>
<include>**/dao/mapper/ProcessDefinitionMapperTest.java</include> <include>**/dao/mapper/ProcessDefinitionMapperTest.java</include>

Loading…
Cancel
Save