break60 5 years ago
parent
commit
4bfe816e9d
  1. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
  2. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  3. 7
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
  4. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
  5. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
  6. 13
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
  7. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  8. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  9. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  10. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  11. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  12. 6
      sql/dolphinscheduler-postgre.sql
  13. 4
      sql/dolphinscheduler_mysql.sql

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java

@ -106,12 +106,10 @@ public class DataAnalysisService extends BaseService{
List<ExecuteStatusCount> taskInstanceStateCounts = List<ExecuteStatusCount> taskInstanceStateCounts =
taskInstanceMapper.countTaskInstanceStateByUser(start, end, projectIds); taskInstanceMapper.countTaskInstanceStateByUser(start, end, projectIds);
if (taskInstanceStateCounts != null && !taskInstanceStateCounts.isEmpty()) { if (taskInstanceStateCounts != null) {
TaskCountDto taskCountResult = new TaskCountDto(taskInstanceStateCounts); TaskCountDto taskCountResult = new TaskCountDto(taskInstanceStateCounts);
result.put(Constants.DATA_LIST, taskCountResult); result.put(Constants.DATA_LIST, taskCountResult);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.TASK_INSTANCE_STATE_COUNT_ERROR);
} }
return result; return result;
} }
@ -153,12 +151,10 @@ public class DataAnalysisService extends BaseService{
processInstanceMapper.countInstanceStateByUser(start, end, processInstanceMapper.countInstanceStateByUser(start, end,
projectIdArray); projectIdArray);
if (processInstanceStateCounts != null && !processInstanceStateCounts.isEmpty()) { if (processInstanceStateCounts != null) {
TaskCountDto taskCountResult = new TaskCountDto(processInstanceStateCounts); TaskCountDto taskCountResult = new TaskCountDto(processInstanceStateCounts);
result.put(Constants.DATA_LIST, taskCountResult); result.put(Constants.DATA_LIST, taskCountResult);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.COUNT_PROCESS_INSTANCE_STATE_ERROR);
} }
return result; return result;
} }

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -225,20 +225,14 @@ public class ExecutorService extends BaseService{
if (processInstance.getState() == ExecutionStatus.READY_STOP) { if (processInstance.getState() == ExecutionStatus.READY_STOP) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else { } else {
processInstance.setCommandType(CommandType.STOP); result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
processInstance.addHistoryCmd(CommandType.STOP);
processService.updateProcessInstance(processInstance);
result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_STOP);
} }
break; break;
case PAUSE: case PAUSE:
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else { } else {
processInstance.setCommandType(CommandType.PAUSE); result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
processInstance.addHistoryCmd(CommandType.PAUSE);
processService.updateProcessInstance(processInstance);
result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_PAUSE);
} }
break; break;
default: default:
@ -308,22 +302,27 @@ public class ExecutorService extends BaseService{
} }
/** /**
* update process instance state * prepare to update process instance command type and status
* *
* @param processInstanceId process instance id * @param processInstance process instance
* @param commandType command type
* @param executionStatus execute status * @param executionStatus execute status
* @return update result * @return update result
*/ */
private Map<String, Object> updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
int update = processService.updateProcessInstanceState(processInstanceId, executionStatus); processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType);
processInstance.setState(executionStatus);
int update = processService.updateProcessInstance(processInstance);
// determine whether the process is normal
if (update > 0) { if (update > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
} }
return result; return result;
} }

7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java

@ -114,9 +114,6 @@ public class DataAnalysisServiceTest {
Map<String, Object> result = dataAnalysisService.countTaskStateByProject(user, 2, startDate, endDate); Map<String, Object> result = dataAnalysisService.countTaskStateByProject(user, 2, startDate, endDate);
Assert.assertTrue(result.isEmpty()); Assert.assertTrue(result.isEmpty());
// task instance state count error
result = dataAnalysisService.countTaskStateByProject(user, 1, startDate, endDate);
Assert.assertEquals(Status.TASK_INSTANCE_STATE_COUNT_ERROR,result.get(Constants.STATUS));
//SUCCESS //SUCCESS
Mockito.when(taskInstanceMapper.countTaskInstanceStateByUser(DateUtils.getScheduleDate(startDate), Mockito.when(taskInstanceMapper.countTaskInstanceStateByUser(DateUtils.getScheduleDate(startDate),
@ -137,10 +134,6 @@ public class DataAnalysisServiceTest {
Map<String, Object> result = dataAnalysisService.countProcessInstanceStateByProject(user,2,startDate,endDate); Map<String, Object> result = dataAnalysisService.countProcessInstanceStateByProject(user,2,startDate,endDate);
Assert.assertTrue(result.isEmpty()); Assert.assertTrue(result.isEmpty());
//COUNT_PROCESS_INSTANCE_STATE_ERROR
result = dataAnalysisService.countProcessInstanceStateByProject(user,1,startDate,endDate);
Assert.assertEquals(Status.COUNT_PROCESS_INSTANCE_STATE_ERROR,result.get(Constants.STATUS));
//SUCCESS //SUCCESS
Mockito.when(processInstanceMapper.countInstanceStateByUser(DateUtils.getScheduleDate(startDate), Mockito.when(processInstanceMapper.countInstanceStateByUser(DateUtils.getScheduleDate(startDate),
DateUtils.getScheduleDate(endDate), new Integer[]{1})).thenReturn(getTaskInstanceStateCounts()); DateUtils.getScheduleDate(endDate), new Integer[]{1})).thenReturn(getTaskInstanceStateCounts());

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

@ -223,4 +223,14 @@ public class ThreadUtils {
} }
return id + " (" + name + ")"; return id + " (" + name + ")";
} }
/**
* sleep
* @param millis millis
*/
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ignore) {}
}
} }

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@ -111,7 +111,7 @@ public class Command {
/** /**
* worker group * worker group
*/ */
@TableField(exist = false) @TableField("worker_group")
private String workerGroup; private String workerGroup;
public Command() { public Command() {

13
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@ -76,7 +76,8 @@ public class CommandMapperTest {
//query //query
Command actualCommand = commandMapper.selectById(expectedCommand.getId()); Command actualCommand = commandMapper.selectById(expectedCommand.getId());
assertEquals(expectedCommand, actualCommand); assertNotNull(actualCommand);
assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId());
} }
/** /**
@ -94,7 +95,8 @@ public class CommandMapperTest {
Command actualCommand = commandMapper.selectById(expectedCommand.getId()); Command actualCommand = commandMapper.selectById(expectedCommand.getId());
assertEquals(expectedCommand,actualCommand); assertNotNull(actualCommand);
assertEquals(expectedCommand.getUpdateTime(),actualCommand.getUpdateTime());
} }
@ -127,13 +129,6 @@ public class CommandMapperTest {
List<Command> actualCommands = commandMapper.selectList(null); List<Command> actualCommands = commandMapper.selectList(null);
assertThat(actualCommands.size(), greaterThanOrEqualTo(count)); assertThat(actualCommands.size(), greaterThanOrEqualTo(count));
for (Command actualCommand : actualCommands){
Command expectedCommand = commandMap.get(actualCommand.getId());
if (expectedCommand != null){
assertEquals(expectedCommand,actualCommand);
}
}
} }
/** /**

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@ -122,9 +123,7 @@ public class TaskPriorityQueueConsumer extends Thread{
result = dispatcher.dispatch(executionContext); result = dispatcher.dispatch(executionContext);
} catch (ExecuteException e) { } catch (ExecuteException e) {
logger.error("dispatch error",e); logger.error("dispatch error",e);
try { ThreadUtils.sleep(SLEEP_TIME_MILLIS);
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e1) {}
} }
if (result){ if (result){

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -101,10 +102,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
if (taskInstance != null && ackStatus.typeIsRunning()){ if (taskInstance != null && ackStatus.typeIsRunning()){
break; break;
} }
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
} }
} }

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -99,10 +100,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
if (taskInstance != null && responseStatus.typeIsFinished()){ if (taskInstance != null && responseStatus.typeIsFinished()){
break; break;
} }
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
} }
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -914,7 +914,7 @@ public class MasterExecThread implements Runnable {
processInstance.getId(), processInstance.getName(), processInstance.getId(), processInstance.getName(),
processInstance.getState(), state, processInstance.getState(), state,
processInstance.getCommandType()); processInstance.getCommandType());
processInstance.setState(state);
ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId()); ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
instance.setState(state); instance.setState(state);
instance.setProcessDefinition(processInstance.getProcessDefinition()); instance.setProcessDefinition(processInstance.getProcessDefinition());

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -18,11 +18,11 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -37,6 +37,8 @@ import org.springframework.stereotype.Service;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/** /**
* taks callback service * taks callback service
*/ */
@ -98,11 +100,7 @@ public class TaskCallbackService {
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
if (CollectionUtils.isEmpty(masterNodes)) { if (CollectionUtils.isEmpty(masterNodes)) {
logger.error("no available master node"); logger.error("no available master node");
try { ThreadUtils.sleep(SLEEP_TIME_MILLIS);
Thread.sleep(1000);
}catch (Exception e){
}
}else { }else {
break; break;
} }

6
sql/dolphinscheduler-postgre.sql

@ -234,7 +234,7 @@ CREATE TABLE t_ds_command (
dependence varchar(255) DEFAULT NULL , dependence varchar(255) DEFAULT NULL ,
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL , process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' , worker_group varchar(64),
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;
@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command (
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
dependence text , dependence text ,
process_instance_priority int DEFAULT NULL , process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' , worker_group varchar(64),
message text , message text ,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
@ -748,7 +748,7 @@ CREATE SEQUENCE t_ds_worker_server_id_sequence;
ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence'); ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence');
-- Records of t_ds_useruser : admin , password : dolphinscheduler123 -- Records of t_ds_user?user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22'); INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup,dolphinscheduler warning group -- Records of t_ds_alertgroup,dolphinscheduler warning group

4
sql/dolphinscheduler_mysql.sql

@ -333,7 +333,7 @@ CREATE TABLE `t_ds_command` (
`dependence` varchar(255) DEFAULT NULL COMMENT 'dependence', `dependence` varchar(255) DEFAULT NULL COMMENT 'dependence',
`update_time` datetime DEFAULT NULL COMMENT 'update time', `update_time` datetime DEFAULT NULL COMMENT 'update time',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest', `process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id', `worker_group` varchar(64) COMMENT 'worker group',
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@ -380,7 +380,7 @@ CREATE TABLE `t_ds_error_command` (
`update_time` datetime DEFAULT NULL COMMENT 'update time', `update_time` datetime DEFAULT NULL COMMENT 'update time',
`dependence` text COMMENT 'dependence', `dependence` text COMMENT 'dependence',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority, 0 Highest,1 High,2 Medium,3 Low,4 Lowest', `process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority, 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id', `worker_group` varchar(64) COMMENT 'worker group',
`message` text COMMENT 'message', `message` text COMMENT 'message',
PRIMARY KEY (`id`) USING BTREE PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC; ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

Loading…
Cancel
Save