Browse Source

Refactor worker (#4)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
1292b03e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -97,7 +97,7 @@ public class TaskScheduleThread implements Runnable {
try { try {
// tell master that task is in executing // tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType()); ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
logger.info("script path : {}", taskExecutionContext.getExecutePath()); logger.info("script path : {}", taskExecutionContext.getExecutePath());
@ -182,17 +182,20 @@ public class TaskScheduleThread implements Runnable {
} }
return globalParamsMap; return globalParamsMap;
} }
/** /**
* build ack command * build ack command
* @param taskType taskType * @param taskExecutionContext taskExecutionContext
* @return ExecuteTaskAckCommand
*/ */
private ExecuteTaskAckCommand buildAckCommand(String taskType) { private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand(); ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath()); ackCommand.setLogPath(getTaskLogPath());
ackCommand.setHost(OSUtils.getHost()); ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){ if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null); ackCommand.setExecutePath(null);
}else{ }else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); ackCommand.setExecutePath(taskExecutionContext.getExecutePath());

Loading…
Cancel
Save