Browse Source

master/worker basic communication

pull/2/head
qiaozhanwei 4 years ago
parent
commit
f6a2130ba1
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  2. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -124,7 +124,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
final Address address = new Address("192.168.220.247", 12346);
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson);
try {
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000);
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE);
logger.info("response result : {}",response);
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java

@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -64,9 +66,11 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
logger.debug("received command : {}", command);
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskRequestCommand.class);
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
String taskInstanceJson = taskRequestCommand.getTaskInstanceJson();
@ -101,6 +105,9 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
// submit task
taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque()));
workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService));
ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
channel.writeAndFlush(executeTaskResponseCommand.convert2Command());
}
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {

Loading…
Cancel
Save