Browse Source

Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
658922056a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 91
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  2. 58
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

91
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.builder;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
/**
* TaskExecutionContext builder
*/
public class TaskExecutionContextBuilder {
public static TaskExecutionContextBuilder get(){
return new TaskExecutionContextBuilder();
}
private TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
/**
* build taskInstance related info
*
* @param taskInstance taskInstance
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
taskExecutionContext.setTaskId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
return this;
}
/**
* build processInstance related info
*
* @param processInstance
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(ProcessInstance processInstance){
taskExecutionContext.setProcessInstanceId(processInstance.getId());
taskExecutionContext.setScheduleTime(processInstance.getScheduleTime());
taskExecutionContext.setGlobalParams(processInstance.getGlobalParams());
taskExecutionContext.setExecutorId(processInstance.getExecutorId());
taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode());
taskExecutionContext.setTenantCode(processInstance.getTenantCode());
taskExecutionContext.setQueue(processInstance.getQueue());
return this;
}
/**
* build processDefinition related info
*
* @param processDefinition processDefinition
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildProcessDefinitionRelatedInfo(ProcessDefinition processDefinition){
taskExecutionContext.setProcessDefineId(processDefinition.getId());
taskExecutionContext.setProjectId(processDefinition.getProjectId());
return this;
}
/**
* create
*
* @return taskExecutionContext
*/
public TaskExecutionContext create(){
return taskExecutionContext;
}
}

58
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -128,13 +130,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
// TODO send task to worker
public void sendToWorker(TaskInstance taskInstance){
final Address address = new Address("127.0.0.1", 12346);
/**
* set taskInstance relation
*/
TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance);
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(
FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance)));
FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
try {
Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), 2000);
@ -156,18 +154,25 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
/**
* set task instance relation
* get TaskExecutionContext
*
* @param taskInstance taskInstance
* @return TaskExecutionContext
*/
private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){
private TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
processService.changeTaskState(ExecutionStatus.FAILURE,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null,
taskInstance.getId());
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
@ -175,7 +180,11 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
return taskInstance;
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.create();
}
@ -195,35 +204,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return false;
}
/**
* taskInstance convert to taskInfo
*
* @param taskInstance taskInstance
* @return taskInfo
*/
private TaskExecutionContext convertToTaskInfo(TaskInstance taskInstance){
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setExecutePath(getExecLocalPath(taskInstance));
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setProcessInstanceId(taskInstance.getProcessInstance().getId());
taskExecutionContext.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
taskExecutionContext.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
taskExecutionContext.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
taskExecutionContext.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
taskExecutionContext.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
taskExecutionContext.setQueue(taskInstance.getProcessInstance().getQueue());
taskExecutionContext.setProcessDefineId(taskInstance.getProcessDefine().getId());
taskExecutionContext.setProjectId(taskInstance.getProcessDefine().getProjectId());
return taskExecutionContext;
}
/**
* get execute local path
*

Loading…
Cancel
Save