Browse Source

master/worker basic communication

pull/2/head
qiaozhanwei 5 years ago
parent
commit
2bb2d0ce7c
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
  2. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  3. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  4. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
  5. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; } }

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -22,11 +22,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.*;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -122,10 +123,29 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
// TODO send task to worker
public void sendToWorker(String taskInstanceJson){
final Address address = new Address("127.0.0.1", 12346);
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson);
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(taskInstanceJson);
try {
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE);
logger.info("response result : {}",response);
Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
logger.info("receive command : {}", responseCommand);
final CommandType commandType = responseCommand.getType();
switch (commandType){
case EXECUTE_TASK_ACK:
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
responseCommand.getBody(), ExecuteTaskAckCommand.class);
logger.info("taskAckCommand : {}",taskAckCommand);
break;
case EXECUTE_TASK_RESPONSE:
ExecuteTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize(
responseCommand.getBody(), ExecuteTaskResponseCommand.class);
logger.info("taskResponseCommand : {}",taskResponseCommand);
break;
default:
throw new IllegalArgumentException("unknown commandType");
}
logger.info("response result : {}",responseCommand);
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);
}

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -56,7 +56,8 @@ public class TaskCallbackService {
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(0)).addListener(new ChannelFutureListener(){
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(
callbackChannel.getOpaque())).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -68,7 +69,7 @@ public class TaskCallbackService {
});
}
//TODO
// TODO
private Channel createChannel(){
return null;
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java

@ -123,9 +123,6 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance,
processService, taskCallbackService));
ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
channel.writeAndFlush(executeTaskResponseCommand.convert2Command(command.getOpaque()));
}
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -96,7 +96,7 @@ public class TaskScheduleThread implements Runnable {
@Override
public void run() {
// TODO 需要去掉,暂时保留
// TODO Need to be removed and kept temporarily update task instance state
updateTaskState(taskInstance.getTaskType());
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
@ -172,7 +172,7 @@ public class TaskScheduleThread implements Runnable {
logger.error("task scheduler failure", e);
kill();
//TODO 需要去掉,暂时保留 update task instance state
//TODO Need to be removed and kept temporarily update task instance state
processService.changeTaskState(ExecutionStatus.FAILURE,
new Date(),
taskInstance.getId());

Loading…
Cancel
Save