Browse Source

[Improvement][MasterServer] event response handle parallel (#7560)

* [Feature][dolphinscheduler-api] parse traceId in http header for Cross system delivery to #7237 (#7238)

* to #7237

* rerun test

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 05aef27 and handle conflicts

* to #7065: fix ExecutorService and schedulerService (#7072)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* [Feature][dolphinscheduler-api] access control of taskDefinition and taskInstance in project to #7081  (#7082)

* to #7081

* fix #7081

* to #7081

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 8ebe060 and handle conflicts

* cherry-pick 1f18444 and handle conflicts

* fix #6807: dolphinscheduler.zookeeper.env_vars - > dolphinscheduler.registry.env_vars (#6808)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: Kirs <acm_master@163.com>

* add default constructor (#6780)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* to #7108 (#7109)

* to #7450

* to #7450: fix parallel bug

* add index

* expose config to user

* fix bug

* fix bug

* add delay delete

* fix bug

* add License

* fix ut

* fix ut

* fix name

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: Kirs <acm_master@163.com>
2.0.7-release
zwZjut 3 years ago committed by GitHub
parent
commit
f8942bf798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docker/build/conf/dolphinscheduler/master.properties.tpl
  2. 1
      docker/kubernetes/dolphinscheduler/values.yaml
  3. 3
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  4. 2
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  5. 24
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
  6. 3
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
  7. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  8. 189
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
  9. 205
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  10. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  11. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  12. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

4
docker/build/conf/dolphinscheduler/master.properties.tpl

@ -47,4 +47,6 @@ master.reserved.memory=${MASTER_RESERVED_MEMORY}
# master failover interval minutes
master.failover.interval=${MASTER_FAILOVER_INTERVAL}
# master kill yarn job when handle failover
master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
# master.persist.event.state.threads
master.persist.event.state.threads=${MASTER_PERSIST_EVENT_STATE_THREADS}

1
docker/kubernetes/dolphinscheduler/values.yaml

@ -166,6 +166,7 @@ master:
ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25"
ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1"
SESSION_TIMEOUT_MS: 60000
MASTER_PERSIST_EVENT_STATE_THREADS: 10
## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated.
## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe:

3
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -532,7 +532,8 @@ CREATE TABLE `t_ds_process_task_relation` (
`condition_params` text COMMENT 'condition params(json)',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`)
PRIMARY KEY (`id`),
KEY `project_code_process_definition_code_index` (`project_code`,`process_definition_code`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------

2
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -440,6 +440,8 @@ CREATE TABLE t_ds_process_task_relation (
PRIMARY KEY (id)
) ;
create index project_code_process_definition_code_index on t_ds_process_task_relation (project_code,process_definition_code);
DROP TABLE IF EXISTS t_ds_process_task_relation_log;
CREATE TABLE t_ds_process_task_relation_log (
id int NOT NULL ,

24
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql

@ -17,6 +17,7 @@
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
-- uc_dolphin_T_t_ds_process_instance_A_restart_time
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_restart_time;
delimiter d//
@ -35,4 +36,25 @@ d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_instance_A_restart_time();
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;
-- uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
WHERE TABLE_NAME='t_ds_process_task_relation'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND INDEX_NAME ='project_code_process_definition_code_index')
THEN
ALTER TABLE `t_ds_process_task_relation` ADD KEY `project_code_process_definition_code_index`(`project_code`,`process_definition_code`) USING BTREE;
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index();
DROP PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index;

3
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -29,6 +29,9 @@ BEGIN
v_schema =current_schema();
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL';
EXECUTE 'CREATE INDEX IF NOT EXISTS project_code_process_definition_code_index ON ' || quote_ident(v_schema) ||'.t_ds_process_task_relation USING Btree("project_code","process_definition_code")';
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -66,9 +66,12 @@ public class MasterConfig {
@Value("${master.failover.interval:10}")
private int failoverInterval;
@Value("${master.kill.yarn.job.when.handle.fail.over:true}")
@Value("${master.kill.yarn.job.when.handle.failover:true}")
private boolean masterKillYarnJobWhenHandleFailOver;
@Value("${master.persist.event.state.threads:10}")
private int masterPersistEventStateThreads;
public int getListenPort() {
return listenPort;
}
@ -183,4 +186,12 @@ public class MasterConfig {
public void setMasterKillYarnJobWhenHandleFailOver(boolean masterKillYarnJobWhenHandleFailOver) {
this.masterKillYarnJobWhenHandleFailOver = masterKillYarnJobWhenHandleFailOver;
}
public int getMasterPersistEventStateThreads() {
return masterPersistEventStateThreads;
}
public void setMasterPersistEventStateThreads(int masterPersistEventStateThreads) {
this.masterPersistEventStateThreads = masterPersistEventStateThreads;
}
}

189
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
public class TaskResponsePersistThread implements Runnable {
/**
* logger of TaskResponsePersistThread
*/
private static final Logger logger = LoggerFactory.getLogger(TaskResponsePersistThread.class);
private final ConcurrentLinkedQueue<TaskResponseEvent> events = new ConcurrentLinkedQueue<>();
private final Integer processInstanceId;
/**
* process service
*/
private ProcessService processService;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
public TaskResponsePersistThread(ProcessService processService,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper,
Integer processInstanceId) {
this.processService = processService;
this.processInstanceMapper = processInstanceMapper;
this.processInstanceId = processInstanceId;
}
@Override
public void run() {
while (!this.events.isEmpty()) {
TaskResponseEvent event = this.events.peek();
try {
boolean result = persist(event);
if (!result) {
logger.error("persist meta error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId());
}
} catch (Exception e) {
logger.error("persist error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId(), e);
} finally {
this.events.remove(event);
}
}
}
/**
* persist taskResponseEvent
*
* @param taskResponseEvent taskResponseEvent
*/
private boolean persist(TaskResponseEvent taskResponseEvent) {
Event event = taskResponseEvent.getEvent();
Channel channel = taskResponseEvent.getChannel();
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
boolean result = true;
switch (event) {
case ACK:
try {
if (taskInstance != null) {
ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} catch (Exception e) {
result = false;
logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : taskInstance.getId());
channel.writeAndFlush(taskAckCommand.convert2Command());
}
break;
case RESULT:
try {
if (taskInstance != null) {
result = processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
);
logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
if (!result) {
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
logger.debug("worker response master failure, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} else {
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
logger.debug("worker response master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
}
} catch (Exception e) {
result = false;
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskResponseCommand.convert2Command());
}
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskResponseEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThread.addStateEvent(stateEvent);
}
return result;
}
public boolean addEvent(TaskResponseEvent event) {
if (event.getProcessInstanceId() != this.processInstanceId) {
logger.info("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}",
event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId);
return false;
}
return this.events.add(event);
}
public int eventSize() {
return this.events.size();
}
public boolean isEmpty() {
return this.events.isEmpty();
}
public Integer getProcessInstanceId() {
return processInstanceId;
}
public String getKey() {
return String.valueOf(processInstanceId);
}
}

205
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -17,22 +17,18 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -42,7 +38,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
/**
* task manager
@ -66,37 +66,56 @@ public class TaskResponseService {
@Autowired
private ProcessService processService;
@Autowired
private MasterConfig masterConfig;
/**
* task response worker
*/
private Thread taskResponseWorker;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
/**
* event handler
*/
private Thread taskResponseEventHandler;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMap;
private final ConcurrentHashMap<String, TaskResponsePersistThread> taskResponseEventHandlerMap = new ConcurrentHashMap<>();
private ListeningExecutorService listeningExecutorService;
private ExecutorService eventExecService;
/**
* task response mapper
*/
private final ConcurrentHashMap<Integer, TaskResponsePersistThread> processTaskResponseMap = new ConcurrentHashMap<>();
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) {
if (this.processInstanceMapper == null) {
this.processInstanceMapper = processInstanceMapper;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMap) {
if (this.processInstanceMap == null) {
this.processInstanceMap = processInstanceMap;
}
}
@PostConstruct
public void start() {
eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("PersistEventState", masterConfig.getMasterPersistEventStateThreads());
this.listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService);
this.taskResponseWorker = new TaskResponseWorker();
this.taskResponseWorker.setName("StateEventResponseWorker");
this.taskResponseWorker.setName("TaskResponseWorker");
this.taskResponseWorker.start();
this.taskResponseEventHandler = new TaskResponseEventHandler();
this.taskResponseEventHandler.setName("TaskResponseEventHandler");
this.taskResponseEventHandler.start();
}
@PreDestroy
public void stop() {
try {
this.taskResponseWorker.interrupt();
if (!eventQueue.isEmpty()) {
List<TaskResponseEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents);
for (TaskResponseEvent event : remainEvents) {
this.persist(event);
}
}
this.taskResponseEventHandler.interrupt();
this.eventExecService.shutdown();
} catch (Exception e) {
logger.error("stop error:", e);
}
@ -124,12 +143,26 @@ public class TaskResponseService {
@Override
public void run() {
while (Stopper.isRunning()) {
try {
// if not task , blocking here
TaskResponseEvent taskResponseEvent = eventQueue.take();
persist(taskResponseEvent);
if (processInstanceMap.containsKey(taskResponseEvent.getProcessInstanceId())
&& !processTaskResponseMap.containsKey(taskResponseEvent.getProcessInstanceId())) {
TaskResponsePersistThread taskResponsePersistThread = new TaskResponsePersistThread(
processService, processInstanceMap, taskResponseEvent.getProcessInstanceId());
processTaskResponseMap.put(taskResponseEvent.getProcessInstanceId(), taskResponsePersistThread);
}
TaskResponsePersistThread taskResponsePersistThread = processTaskResponseMap.get(taskResponseEvent.getProcessInstanceId());
if (null != taskResponsePersistThread) {
if (taskResponsePersistThread.addEvent(taskResponseEvent)) {
logger.debug("submit task response persist queue success, task instance id:{},process instance id:{}, state:{} ",
taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getState());
} else {
logger.error("submit task response persist queue error, task instance id:{},process instance id:{} ",
taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
@ -142,84 +175,72 @@ public class TaskResponseService {
}
/**
* persist taskResponseEvent
*
* @param taskResponseEvent taskResponseEvent
* event handler thread
*/
private void persist(TaskResponseEvent taskResponseEvent) {
Event event = taskResponseEvent.getEvent();
Channel channel = taskResponseEvent.getChannel();
class TaskResponseEventHandler extends Thread {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
switch (event) {
case ACK:
@Override
public void run() {
logger.info("event handler thread started");
while (Stopper.isRunning()) {
try {
if (taskInstance != null) {
ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
boolean result = processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
eventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : taskInstance.getId());
channel.writeAndFlush(taskAckCommand.convert2Command());
logger.error("event handler thread error", e);
}
break;
case RESULT:
try {
boolean result = true;
if (taskInstance != null) {
result = processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
);
logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
if (!result) {
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
logger.debug("worker response master failure, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} else {
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
logger.debug("worker response master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
}
}
private void eventHandler() {
for (TaskResponsePersistThread taskResponsePersistThread: processTaskResponseMap.values()) {
if (taskResponseEventHandlerMap.containsKey(taskResponsePersistThread.getKey())) {
continue;
}
if (taskResponsePersistThread.eventSize() == 0) {
if (!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) {
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId());
}
} catch (Exception e) {
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskResponseCommand.convert2Command());
continue;
}
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskResponseEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThread.addStateEvent(stateEvent);
logger.info("already exists handler process size:{}", taskResponseEventHandlerMap.size());
taskResponseEventHandlerMap.put(taskResponsePersistThread.getKey(), taskResponsePersistThread);
ListenableFuture future = listeningExecutorService.submit(taskResponsePersistThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
logger.info("persist events {} succeeded.", taskResponsePersistThread.getProcessInstanceId());
if (!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) {
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId());
}
taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
}
@Override
public void onFailure(Throwable throwable) {
logger.error("persist events failed: {}", throwable);
if (!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) {
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId());
}
taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
}
};
Futures.addCallback(future, futureCallback, listeningExecutorService);
}
}
}
public BlockingQueue<TaskResponseEvent> getEventQueue() {
return eventQueue;
}
}
}

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -436,13 +436,6 @@ public class MasterRegistryClient {
continue;
}
if (serverStartupTime != null && processInstance.getRestartTime() != null
&& processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}
logger.info("failover process instance id: {}", processInstance.getId());
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : validTaskInstanceList) {
if (Constants.NULL.equals(taskInstance.getHost())) {
@ -457,6 +450,13 @@ public class MasterRegistryClient {
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
if (serverStartupTime != null && processInstance.getRestartTime() != null
&& processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}
logger.info("failover process instance id: {}", processInstance.getId());
//updateProcessInstance host is null and insert into command
processService.processNeedFailoverProcessInstances(processInstance);
}
@ -576,8 +576,8 @@ public class MasterRegistryClient {
/**
* get local address
*/
private String getLocalAddress() {
public String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
}
}

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
@ -66,10 +67,12 @@ public class FailoverExecuteThread extends Thread {
while (Stopper.isRunning()) {
logger.info("failover execute started");
try {
List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
List<String> hosts = getNeedFailoverMasterServers();
if (CollectionUtils.isEmpty(hosts)) {
continue;
}
logger.info("need failover hosts:{}", hosts);
for (String host : hosts) {
String failoverPath = masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
try {
@ -88,4 +91,20 @@ public class FailoverExecuteThread extends Thread {
}
}
}
private List<String> getNeedFailoverMasterServers() {
// failover myself && failover dead masters
List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
Iterator<String> iterator = hosts.iterator();
while (iterator.hasNext()) {
String host = iterator.next();
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
if (!host.equals(masterRegistryClient.getLocalAddress())) {
iterator.remove();
}
}
}
return hosts;
}
}

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
@ -34,12 +35,15 @@ import org.mockito.junit.MockitoJUnitRunner;
import io.netty.channel.Channel;
@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class TaskResponseServiceTest {
@Mock(name = "processService")
private ProcessService processService;
@Mock
private MasterConfig masterConfig;
@InjectMocks
TaskResponseService taskRspService;
@ -54,6 +58,7 @@ public class TaskResponseServiceTest {
@Before
public void before() {
Mockito.when(masterConfig.getMasterPersistEventStateThreads()).thenReturn(10);
taskRspService.start();
ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,

Loading…
Cancel
Save