|
|
@ -35,10 +35,13 @@ import org.apache.dolphinscheduler.service.process.ProcessService; |
|
|
|
|
|
|
|
|
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Optional; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
|
|
|
|
@Component |
|
|
|
@Component |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
public class TaskResultEventHandler implements TaskEventHandler { |
|
|
|
public class TaskResultEventHandler implements TaskEventHandler { |
|
|
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Autowired |
|
|
@ -99,11 +102,13 @@ public class TaskResultEventHandler implements TaskEventHandler { |
|
|
|
taskInstance.setVarPool(taskEvent.getVarPool()); |
|
|
|
taskInstance.setVarPool(taskEvent.getVarPool()); |
|
|
|
processService.changeOutParam(taskInstance); |
|
|
|
processService.changeOutParam(taskInstance); |
|
|
|
taskInstanceDao.updateById(taskInstance); |
|
|
|
taskInstanceDao.updateById(taskInstance); |
|
|
|
sendAckToWorker(taskEvent); |
|
|
|
|
|
|
|
} catch (Exception ex) { |
|
|
|
} catch (Exception ex) { |
|
|
|
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); |
|
|
|
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); |
|
|
|
throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex); |
|
|
|
throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sendAckToWorker(taskEvent); |
|
|
|
|
|
|
|
|
|
|
|
TaskStateEvent stateEvent = TaskStateEvent.builder() |
|
|
|
TaskStateEvent stateEvent = TaskStateEvent.builder() |
|
|
|
.processInstanceId(taskEvent.getProcessInstanceId()) |
|
|
|
.processInstanceId(taskEvent.getProcessInstanceId()) |
|
|
|
.taskInstanceId(taskEvent.getTaskInstanceId()) |
|
|
|
.taskInstanceId(taskEvent.getTaskInstanceId()) |
|
|
@ -115,11 +120,16 @@ public class TaskResultEventHandler implements TaskEventHandler { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void sendAckToWorker(TaskEvent taskEvent) { |
|
|
|
public void sendAckToWorker(TaskEvent taskEvent) { |
|
|
|
|
|
|
|
try { |
|
|
|
ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = |
|
|
|
ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = |
|
|
|
SingletonJdkDynamicRpcClientProxyFactory |
|
|
|
SingletonJdkDynamicRpcClientProxyFactory |
|
|
|
.getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); |
|
|
|
.getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); |
|
|
|
instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck( |
|
|
|
instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck( |
|
|
|
TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId())); |
|
|
|
TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId())); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
|
|
// master ignore the exception, worker will retry to send this TaskEventType.RESULT event again.
|
|
|
|
|
|
|
|
log.warn("send ack to worker error, taskInstanceId: {}", taskEvent.getTaskInstanceId(), e); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|