Browse Source

buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify
pull/2/head
qiaozhanwei 5 years ago committed by GitHub
parent
commit
00cf259840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -97,7 +97,7 @@ public class TaskScheduleThread implements Runnable {
try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType());
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
logger.info("script path : {}", taskExecutionContext.getExecutePath());
@ -182,17 +182,20 @@ public class TaskScheduleThread implements Runnable {
}
return globalParamsMap;
}
/**
* build ack command
* @param taskType taskType
* @param taskExecutionContext taskExecutionContext
* @return ExecuteTaskAckCommand
*/
private ExecuteTaskAckCommand buildAckCommand(String taskType) {
private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath());
ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date());
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());

Loading…
Cancel
Save