Browse Source

master/worker basic communication

pull/2/head
qiaozhanwei 5 years ago
parent
commit
e7c32affc6
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
  2. 35
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  3. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  4. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
  6. 41
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
  7. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

35
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -16,10 +16,17 @@
*/ */
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.BeanContext; import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -75,6 +82,12 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
*/ */
private MasterConfig masterConfig; private MasterConfig masterConfig;
/**
* netty remoting client
*/
private NettyRemotingClient nettyRemotingClient;
/** /**
* constructor of MasterBaseTaskExecThread * constructor of MasterBaseTaskExecThread
* @param taskInstance task instance * @param taskInstance task instance
@ -88,6 +101,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.cancel = false; this.cancel = false;
this.taskInstance = taskInstance; this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
} }
/** /**
@ -105,6 +121,23 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.cancel = true; this.cancel = true;
} }
//TODO
/**端口默认是123456
* 需要构造ExecuteTaskRequestCommand里面就是TaskInstance的属性
*/
public void sendToWorker(String taskInstanceJson){
final Address address = new Address("192.168.220.247", 12346);
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand();
try {
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000);
logger.info("已发送任务到Worker上,Worker需要执行任务");
//结果可能为空,所以不用管,能发过去,就行。
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);
}
}
/** /**
* submit master base task exec thread * submit master base task exec thread
* @return TaskInstance * @return TaskInstance
@ -128,7 +161,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
if(submitDB && !submitQueue){ if(submitDB && !submitQueue){
// submit task to queue // submit task to queue
submitQueue = processService.submitTaskToQueue(task); sendToWorker(JSONObject.toJSONString(task));
} }
if(submitDB && submitQueue){ if(submitDB && submitQueue){
return task; return task;

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -160,20 +160,7 @@ public class MasterExecThread implements Runnable {
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;
} }
//TODO
/**端口默认是123456
* 需要构造ExecuteTaskRequestCommand里面就是TaskInstance的属性
*/
private void sendToWorker(){
final Address address = new Address("localhost", 12346);
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand();
try {
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000);
//结果可能为空,所以不用管,能发过去,就行。
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);
}
}
@Override @Override

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -172,9 +172,9 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService)); this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService));
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
//worker registry // TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); // this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
this.workerRegistry.registry(); // this.workerRegistry.registry();
this.zkWorkerClient.init(); this.zkWorkerClient.init();

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java

@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{
logger.info("task : {} ready to submit to task scheduler thread",taskInstId); logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
// submit task // submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); // workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
// remove node from zk // remove node from zk
removeNodeFromTaskQueue(taskQueueStr); removeNodeFromTaskQueue(taskQueueStr);

41
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -97,6 +97,9 @@ public class TaskScheduleThread implements Runnable {
@Override @Override
public void run() { public void run() {
// TODO 需要去掉,暂时保留
updateTaskState(taskInstance.getTaskType());
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
try { try {
@ -169,6 +172,12 @@ public class TaskScheduleThread implements Runnable {
}catch (Exception e){ }catch (Exception e){
logger.error("task scheduler failure", e); logger.error("task scheduler failure", e);
kill(); kill();
//TODO 需要去掉,暂时保留 update task instance state
processService.changeTaskState(ExecutionStatus.FAILURE,
new Date(),
taskInstance.getId());
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
@ -176,6 +185,14 @@ public class TaskScheduleThread implements Runnable {
taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
} }
logger.info("task instance id : {},task final status : {}",
taskInstance.getId(),
task.getExitStatus());
// update task instance state
processService.changeTaskState(task.getExitStatus(),
new Date(),
taskInstance.getId());
} }
/** /**
@ -342,4 +359,28 @@ public class TaskScheduleThread implements Runnable {
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission(); permissionCheck.checkPermission();
} }
/**
* update task state according to task type
* @param taskType
*/
private void updateTaskState(String taskType) {
// update task status is running
if(taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name())){
processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
getTaskLogPath(),
taskInstance.getId());
}else{
processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
taskInstance.getExecutePath(),
getTaskLogPath(),
taskInstance.getId());
}
}
} }

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

@ -91,7 +91,7 @@ public class MasterExecThreadTest {
processDefinition.setGlobalParamList(Collections.EMPTY_LIST); processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService)); masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService,null));
// prepareProcess init dag // prepareProcess init dag
Field dag = MasterExecThread.class.getDeclaredField("dag"); Field dag = MasterExecThread.class.getDeclaredField("dag");
dag.setAccessible(true); dag.setAccessible(true);

Loading…
Cancel
Save