break60 5 years ago
parent
commit
2829121936
  1. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
  2. 29
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java
  3. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  4. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  5. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  6. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  7. 98
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  8. 79
      sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
  9. 84
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java

@ -64,7 +64,7 @@ public class SpringConnectionFactory {
* @return druid dataSource * @return druid dataSource
*/ */
@Bean(destroyMethod="") @Bean(destroyMethod="")
public static DruidDataSource dataSource() { public DruidDataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource(); DruidDataSource druidDataSource = new DruidDataSource();

29
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java

@ -16,11 +16,11 @@
*/ */
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -179,6 +179,23 @@ public class UserMapperTest {
return tenant; return tenant;
} }
/**
* insert one Tenant
* @return Tenant
*/
private Tenant insertOneTenant(Queue queue){
Tenant tenant = new Tenant();
tenant.setTenantCode("dolphin");
tenant.setTenantName("dolphin test");
tenant.setDescription("dolphin user use");
tenant.setQueueId(queue.getId());
tenant.setQueue(queue.getQueue());
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
tenantMapper.insert(tenant);
return tenant;
}
/** /**
* insert one Queue * insert one Queue
* @return Queue * @return Queue
@ -291,11 +308,13 @@ public class UserMapperTest {
*/ */
@Test @Test
public void testQueryDetailsById() { public void testQueryDetailsById() {
//insertOne //insertOneQueue and insertOneTenant
User user = insertOne(); Queue queue = insertOneQueue();
Tenant tenant = insertOneTenant(queue);
User user = insertOne(queue,tenant);
//queryDetailsById //queryDetailsById
User queryUser = userMapper.queryDetailsById(user.getId()); User queryUser = userMapper.queryDetailsById(user.getId());
Assert.assertEquals(queryUser.getUserName(), queryUser.getUserName()); Assert.assertEquals(user.getUserName(), queryUser.getUserName());
} }
/** /**

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -53,6 +53,8 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
*/ */
@ -68,7 +70,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* taskUpdateQueue * taskUpdateQueue
*/ */
@Autowired @Autowired
private TaskPriorityQueue taskUpdateQueue; private TaskPriorityQueue taskPriorityQueue;
/** /**
* processService * processService
@ -93,7 +95,7 @@ public class TaskPriorityQueueConsumer extends Thread{
while (Stopper.isRunning()){ while (Stopper.isRunning()){
try { try {
// if not task , blocking here // if not task , blocking here
String taskPriorityInfo = taskUpdateQueue.take(); String taskPriorityInfo = taskPriorityQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
@ -114,13 +116,22 @@ public class TaskPriorityQueueConsumer extends Thread{
private Boolean dispatch(int taskInstanceId){ private Boolean dispatch(int taskInstanceId){
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
Boolean result = false;
while (Stopper.isRunning()){
try { try {
return dispatcher.dispatch(executionContext); result = dispatcher.dispatch(executionContext);
} catch (ExecuteException e) { } catch (ExecuteException e) {
logger.error("execute exception", e); logger.error("dispatch error",e);
return false; try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e1) {}
} }
if (result){
break;
}
}
return result;
} }
/** /**

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -31,9 +33,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* task ack processor * task ack processor
*/ */
@ -51,9 +56,16 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/ */
private final TaskInstanceCacheManager taskInstanceCacheManager; private final TaskInstanceCacheManager taskInstanceCacheManager;
/**
* processService
*/
private ProcessService processService;
public TaskAckProcessor(){ public TaskAckProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
/** /**
@ -71,8 +83,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
String workerAddress = ChannelUtils.toAddress(channel).getAddress(); String workerAddress = ChannelUtils.toAddress(channel).getAddress();
ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());
// TaskResponseEvent // TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()), TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,
taskAckCommand.getStartTime(), taskAckCommand.getStartTime(),
workerAddress, workerAddress,
taskAckCommand.getExecutePath(), taskAckCommand.getExecutePath(),
@ -81,6 +95,18 @@ public class TaskAckProcessor implements NettyRequestProcessor {
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
if (taskInstance != null && ackStatus.typeIsRunning()){
break;
}
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
}
} }
} }

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@ -30,9 +32,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* task response processor * task response processor
*/ */
@ -50,9 +55,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/ */
private final TaskInstanceCacheManager taskInstanceCacheManager; private final TaskInstanceCacheManager taskInstanceCacheManager;
/**
* processService
*/
private ProcessService processService;
public TaskResponseProcessor(){ public TaskResponseProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
/** /**
@ -71,6 +82,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand); taskInstanceCacheManager.cacheTaskInstance(responseCommand);
ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus());
// TaskResponseEvent // TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(), responseCommand.getEndTime(),
@ -79,6 +92,18 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getTaskInstanceId()); responseCommand.getTaskInstanceId());
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null && responseStatus.typeIsFinished()){
break;
}
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
}
} }

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -18,9 +18,11 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -93,8 +95,17 @@ public class TaskCallbackService {
} }
logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost()); logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost());
Set<String> masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); Set<String> masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
if(CollectionUtils.isEmpty(masterNodes)){ while (Stopper.isRunning()) {
throw new IllegalStateException("no available master node exception"); if (CollectionUtils.isEmpty(masterNodes)) {
logger.error("no available master node");
try {
Thread.sleep(1000);
}catch (Exception e){
}
}else {
break;
}
} }
for(String masterNode : masterNodes){ for(String masterNode : masterNodes){
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));

98
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -17,21 +17,26 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Test; import org.junit.Test;
@ -47,9 +52,10 @@ import java.util.Date;
* test task call back service * test task call back service
*/ */
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class, @ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class}) ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class,
TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class})
public class TaskCallbackServiceTest { public class TaskCallbackServiceTest {
@Autowired @Autowired
@ -58,12 +64,22 @@ public class TaskCallbackServiceTest {
@Autowired @Autowired
private MasterRegistry masterRegistry; private MasterRegistry masterRegistry;
@Autowired
private TaskAckProcessor taskAckProcessor;
@Autowired
private TaskResponseProcessor taskResponseProcessor;
/**
* send ack test
* @throws Exception
*/
@Test @Test
public void testSendAck(){ public void testSendAck() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig(); final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000); serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start(); nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig(); final NettyClientConfig clientConfig = new NettyClientConfig();
@ -75,22 +91,64 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close(); nettyRemotingServer.close();
nettyRemotingClient.close(); nettyRemotingClient.close();
} }
/**
* send result test
* @throws Exception
*/
@Test
public void testSendResult() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
responseCommand.setTaskInstanceId(1);
responseCommand.setEndTime(new Date());
taskCallbackService.sendResult(1, responseCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close();
nettyRemotingClient.close();
}
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testSendAckWithIllegalArgumentException(){ public void testSendAckWithIllegalArgumentException(){
TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class);
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
Stopper.stop();
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testSendAckWithIllegalStateException1(){ public void testSendAckWithIllegalStateException1(){
masterRegistry.registry();
final NettyServerConfig serverConfig = new NettyServerConfig(); final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000); serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start(); nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig(); final NettyClientConfig clientConfig = new NettyClientConfig();
@ -103,7 +161,21 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
nettyRemotingServer.close(); nettyRemotingServer.close();
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Stopper.stop();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
@ -112,7 +184,7 @@ public class TaskCallbackServiceTest {
final NettyServerConfig serverConfig = new NettyServerConfig(); final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000); serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start(); nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig(); final NettyClientConfig clientConfig = new NettyClientConfig();
@ -125,6 +197,20 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
nettyRemotingServer.close(); nettyRemotingServer.close();
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Stopper.stop();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
} }

79
sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql

@ -277,4 +277,83 @@ delimiter ;
CALL dc_dolphin_T_t_ds_schedules_D_worker_group_id; CALL dc_dolphin_T_t_ds_schedules_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_schedules_D_worker_group_id; DROP PROCEDURE dc_dolphin_T_t_ds_schedules_D_worker_group_id;
-- ac_dolphin_T_t_ds_command_A_worker_group
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_command_A_worker_group;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_command_A_worker_group;
DROP PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group;
-- dc_dolphin_T_t_ds_command_D_worker_group_id
drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id;
delimiter d//
CREATE PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_command DROP COLUMN worker_group_id;
END IF;
END;
d//
delimiter ;
CALL dc_dolphin_T_t_ds_command_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id;
-- ac_dolphin_T_t_ds_error_command_A_worker_group
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_error_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_error_command ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_error_command_A_worker_group;
DROP PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group;
-- dc_dolphin_T_t_ds_error_command_D_worker_group_id
drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id;
delimiter d//
CREATE PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_error_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id;
END IF;
END;
d//
delimiter ;
CALL dc_dolphin_T_t_ds_error_command_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id;

84
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -279,4 +279,88 @@ delimiter ;
select dc_dolphin_T_t_ds_schedules_D_worker_group_id(); select dc_dolphin_T_t_ds_schedules_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_schedules_D_worker_group_id(); DROP FUNCTION dc_dolphin_T_t_ds_schedules_D_worker_group_id();
-- ac_dolphin_T_t_ds_command_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_command_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_command_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_command'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_command_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_command_A_worker_group();
-- dc_dolphin_T_t_ds_command_D_worker_group_id
delimiter ;
DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id();
delimiter d//
CREATE FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_command'
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_command DROP COLUMN worker_group_id;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select dc_dolphin_T_t_ds_command_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id();
-- ac_dolphin_T_t_ds_error_command_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_error_command'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_error_command_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group();
-- dc_dolphin_T_t_ds_error_command_D_worker_group_id
delimiter ;
DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id();
delimiter d//
CREATE FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_error_command'
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select dc_dolphin_T_t_ds_error_command_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id();

Loading…
Cancel
Save