Browse Source

master/worker basic communication

pull/2/head
qiaozhanwei 5 years ago
parent
commit
910401fb2d
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
  2. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  3. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
  4. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInstanceJson='" + taskInstanceJson + '\'' + '}'; } }

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -86,7 +86,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* netty remoting client
*/
private NettyRemotingClient nettyRemotingClient;
private static final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig());
/**
* constructor of MasterBaseTaskExecThread
@ -101,9 +101,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
/**
@ -122,17 +119,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
//TODO
/**端口默认是123456
* 需要构造ExecuteTaskRequestCommand里面就是TaskInstance的属性
*/
// TODO send task to worker
public void sendToWorker(String taskInstanceJson){
final Address address = new Address("192.168.220.247", 12346);
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand();
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson);
try {
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000);
logger.info("已发送任务到Worker上,Worker需要执行任务");
//结果可能为空,所以不用管,能发过去,就行。
logger.info("response result : {}",response);
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);
}

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -28,6 +29,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -64,7 +66,15 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
logger.debug("received command : {}", command);
TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class);
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskRequestCommand.class);
String taskInstanceJson = taskRequestCommand.getTaskInstanceJson();
TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class);
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
//TODO 需要干掉,然后移到master里面。
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -105,7 +105,7 @@ public class TaskScheduleThread implements Runnable {
try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType());
taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
// taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
logger.info("script path : {}", taskInstance.getExecutePath());
// task node
@ -182,7 +182,7 @@ public class TaskScheduleThread implements Runnable {
responseCommand.setEndTime(new Date());
} finally {
taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
// taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
}
logger.info("task instance id : {},task final status : {}",

Loading…
Cancel
Save