Browse Source

Refactor worker (#2319)

* let quartz use the same datasource

* move master/worker config from dao.properties to each config
add master/worker registry test

* move mybatis config from application.properties to SpringConnectionFactory

* move mybatis-plus config from application.properties to SpringConnectionFactory

* refactor TaskCallbackService

* add ZookeeperNodeManagerTest

* add NettyExecutorManagerTest

* refactor TaskKillProcessor

* add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest

* add RoundRobinHostManagerTest, ExecutorDispatcherTest

* refactor task response service
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
5730bfe2be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 58
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java
  2. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  3. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  4. 99
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
  5. 59
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  6. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

58
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEventEnum.java

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.manager;
import com.baomidou.mybatisplus.annotation.EnumValue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import java.util.Date;
/**
* task event enum
*/
public enum TaskEventEnum {
ACK(0, "task ack"),
RESPONSE(1, "task response result");
TaskEventEnum(int code, String descp){
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
public String getDescp() {
return descp;
}
public int getCode() {
return code;
}
public static TaskEventEnum of(int status){
for(TaskEventEnum es : values()){
if(es.getCode() == status){
return es;
}
}
throw new IllegalArgumentException("invalid status : " + status);
}
}

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -28,10 +28,9 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.manager.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.manager.TaskManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,7 +44,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
/** /**
* process service * process service
*/ */
private final TaskManager taskManager; private final TaskResponseService taskResponseService;
/** /**
* taskInstance cache manager * taskInstance cache manager
@ -53,7 +52,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
private final TaskInstanceCacheManager taskInstanceCacheManager; private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskAckProcessor(){ public TaskAckProcessor(){
this.taskManager = SpringApplicationContext.getBean(TaskManager.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
@ -72,15 +71,15 @@ public class TaskAckProcessor implements NettyRequestProcessor {
String workerAddress = ChannelUtils.toAddress(channel).getAddress(); String workerAddress = ChannelUtils.toAddress(channel).getAddress();
// TaskEvent // TaskResponseEvent
TaskEvent taskEvent = new TaskEvent(ExecutionStatus.of(taskAckCommand.getStatus()), TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(), taskAckCommand.getStartTime(),
workerAddress, workerAddress,
taskAckCommand.getExecutePath(), taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(), taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId()); taskAckCommand.getTaskInstanceId());
taskManager.putTask(taskEvent); taskResponseService.addResponse(taskResponseEvent);
} }

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -27,10 +27,9 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.manager.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.manager.TaskManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -44,7 +43,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
/** /**
* process service * process service
*/ */
private final TaskManager taskManager; private final TaskResponseService taskResponseService;
/** /**
* taskInstance cache manager * taskInstance cache manager
@ -52,7 +51,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
private final TaskInstanceCacheManager taskInstanceCacheManager; private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskResponseProcessor(){ public TaskResponseProcessor(){
this.taskManager = SpringApplicationContext.getBean(TaskManager.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
@ -72,14 +71,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand); taskInstanceCacheManager.cacheTaskInstance(responseCommand);
// TaskEvent // TaskResponseEvent
TaskEvent taskEvent = new TaskEvent(ExecutionStatus.of(responseCommand.getStatus()), TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(), responseCommand.getEndTime(),
responseCommand.getProcessId(), responseCommand.getProcessId(),
responseCommand.getAppIds(), responseCommand.getAppIds(),
responseCommand.getTaskInstanceId()); responseCommand.getTaskInstanceId());
taskManager.putTask(taskEvent); taskResponseService.addResponse(taskResponseEvent);
} }

99
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.manager; package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -24,7 +24,7 @@ import java.util.Date;
/** /**
* task event * task event
*/ */
public class TaskEvent { public class TaskResponseEvent {
/** /**
* taskInstanceId * taskInstanceId
@ -74,53 +74,29 @@ public class TaskEvent {
/** /**
* ack / response * ack / response
*/ */
private TaskEventEnum type; private Event event;
public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){
/** TaskResponseEvent event = new TaskResponseEvent();
* receive ack info event.setState(state);
* @param state state event.setStartTime(startTime);
* @param startTime startTime event.setWorkerAddress(workerAddress);
* @param workerAddress workerAddress event.setExecutePath(executePath);
* @param executePath executePath event.setLogPath(logPath);
* @param logPath logPath event.setTaskInstanceId(taskInstanceId);
* @param taskInstanceId taskInstanceId event.setEvent(Event.ACK);
*/ return event;
public TaskEvent(ExecutionStatus state, }
Date startTime,
String workerAddress, public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){
String executePath, TaskResponseEvent event = new TaskResponseEvent();
String logPath, event.setState(state);
int taskInstanceId){ event.setEndTime(endTime);
this.state = state; event.setProcessId(processId);
this.startTime = startTime; event.setAppIds(appIds);
this.workerAddress = workerAddress; event.setTaskInstanceId(taskInstanceId);
this.executePath = executePath; event.setEvent(Event.RESULT);
this.logPath = logPath; return event;
this.taskInstanceId = taskInstanceId;
this.type = TaskEventEnum.ACK;
}
/**
* receive response info
* @param state state
* @param endTime endTime
* @param processId processId
* @param appIds appIds
* @param taskInstanceId taskInstanceId
*/
public TaskEvent(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstanceId){
this.state = state;
this.endTime = endTime;
this.processId = processId;
this.appIds = appIds;
this.taskInstanceId = taskInstanceId;
this.type = TaskEventEnum.RESPONSE;
} }
public int getTaskInstanceId() { public int getTaskInstanceId() {
@ -195,27 +171,16 @@ public class TaskEvent {
this.appIds = appIds; this.appIds = appIds;
} }
public TaskEventEnum getType() { public Event getEvent() {
return type; return event;
} }
public void setType(TaskEventEnum type) { public void setEvent(Event event) {
this.type = type; this.event = event;
} }
@Override public enum Event{
public String toString() { ACK,
return "TaskEvent{" + RESULT;
"taskInstanceId=" + taskInstanceId +
", workerAddress='" + workerAddress + '\'' +
", state=" + state +
", startTime=" + startTime +
", endTime=" + endTime +
", executePath='" + executePath + '\'' +
", logPath='" + logPath + '\'' +
", processId=" + processId +
", appIds='" + appIds + '\'' +
", type=" + type +
'}';
} }
} }

59
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.manager; package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -27,23 +27,22 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.dolphinscheduler.server.master.manager.TaskEventEnum.*;
/** /**
* task manager * task manager
*/ */
@Component @Component
public class TaskManager { public class TaskResponseService {
/** /**
* logger * logger
*/ */
private static final Logger logger = LoggerFactory.getLogger(TaskManager.class); private static final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
/** /**
* attemptQueue * attemptQueue
*/ */
private final BlockingQueue<TaskEvent> attemptQueue = new LinkedBlockingQueue<>(5000); private final BlockingQueue<TaskResponseEvent> attemptQueue = new LinkedBlockingQueue<>(5000);
/** /**
@ -63,13 +62,13 @@ public class TaskManager {
/** /**
* put task to attemptQueue * put task to attemptQueue
* *
* @param taskEvent taskEvent * @param taskResponseEvent taskResponseEvent
*/ */
public void putTask(TaskEvent taskEvent){ public void addResponse(TaskResponseEvent taskResponseEvent){
try { try {
attemptQueue.put(taskEvent); attemptQueue.put(taskResponseEvent);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("put task : {} error :{}",taskEvent,e); logger.error("put task : {} error :{}", taskResponseEvent,e);
} }
} }
@ -85,8 +84,8 @@ public class TaskManager {
while (Stopper.isRunning()){ while (Stopper.isRunning()){
try { try {
// if not task , blocking here // if not task , blocking here
TaskEvent taskEvent = attemptQueue.take(); TaskResponseEvent taskResponseEvent = attemptQueue.take();
persist(taskEvent); persist(taskResponseEvent);
}catch (Exception e){ }catch (Exception e){
logger.error("persist task error",e); logger.error("persist task error",e);
@ -95,32 +94,30 @@ public class TaskManager {
} }
/** /**
* persist taskEvent * persist taskResponseEvent
* @param taskEvent taskEvent * @param taskResponseEvent taskResponseEvent
*/ */
private void persist(TaskEvent taskEvent){ private void persist(TaskResponseEvent taskResponseEvent){
// task event type TaskResponseEvent.Event event = taskResponseEvent.getEvent();
TaskEventEnum type = taskEvent.getType();
switch (type){ switch (event){
case ACK: case ACK:
processService.changeTaskState(taskEvent.getState(), processService.changeTaskState(taskResponseEvent.getState(),
taskEvent.getStartTime(), taskResponseEvent.getStartTime(),
taskEvent.getWorkerAddress(), taskResponseEvent.getWorkerAddress(),
taskEvent.getExecutePath(), taskResponseEvent.getExecutePath(),
taskEvent.getLogPath(), taskResponseEvent.getLogPath(),
taskEvent.getTaskInstanceId()); taskResponseEvent.getTaskInstanceId());
break; break;
case RESPONSE: case RESULT:
processService.changeTaskState(taskEvent.getState(), processService.changeTaskState(taskResponseEvent.getState(),
taskEvent.getEndTime(), taskResponseEvent.getEndTime(),
taskEvent.getProcessId(), taskResponseEvent.getProcessId(),
taskEvent.getAppIds(), taskResponseEvent.getAppIds(),
taskEvent.getTaskInstanceId()); taskResponseEvent.getTaskInstanceId());
break; break;
default: default:
throw new IllegalArgumentException("invalid task event type : " + type); throw new IllegalArgumentException("invalid event type : " + event);
} }
} }
} }

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
import org.apache.dolphinscheduler.server.master.manager.TaskManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -141,7 +141,7 @@ public class DependencyConfig {
} }
@Bean @Bean
public TaskManager taskManager(){ public TaskResponseService taskResponseService(){
return Mockito.mock(TaskManager.class); return Mockito.mock(TaskResponseService.class);
} }
} }

Loading…
Cancel
Save