|
|
@ -17,22 +17,19 @@ |
|
|
|
|
|
|
|
|
|
|
|
package org.apache.dolphinscheduler.server.worker.processor; |
|
|
|
package org.apache.dolphinscheduler.server.worker.processor; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import com.google.common.base.Preconditions; |
|
|
|
|
|
|
|
import io.netty.channel.Channel; |
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
import org.apache.dolphinscheduler.remote.command.Command; |
|
|
|
import org.apache.dolphinscheduler.remote.command.Command; |
|
|
|
import org.apache.dolphinscheduler.remote.command.CommandType; |
|
|
|
import org.apache.dolphinscheduler.remote.command.CommandType; |
|
|
|
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; |
|
|
|
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; |
|
|
|
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; |
|
|
|
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; |
|
|
|
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
|
|
|
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
|
|
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
|
|
|
|
import com.google.common.base.Preconditions; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import io.netty.channel.Channel; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* update process host |
|
|
|
* update process host |
|
|
|
* this used when master failover |
|
|
|
* this used when master failover |
|
|
@ -45,6 +42,7 @@ public class HostUpdateProcessor implements NettyRequestProcessor { |
|
|
|
/** |
|
|
|
/** |
|
|
|
* task callback service |
|
|
|
* task callback service |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private TaskCallbackService taskCallbackService; |
|
|
|
private TaskCallbackService taskCallbackService; |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
@ -52,10 +50,6 @@ public class HostUpdateProcessor implements NettyRequestProcessor { |
|
|
|
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); |
|
|
|
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); |
|
|
|
HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class); |
|
|
|
HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class); |
|
|
|
logger.info("received host update command : {}", updateCommand); |
|
|
|
logger.info("received host update command : {}", updateCommand); |
|
|
|
|
|
|
|
|
|
|
|
if (taskCallbackService == null) { |
|
|
|
|
|
|
|
taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); |
|
|
|
taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|