Browse Source

Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
d6ea202ed7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
  2. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  3. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
  4. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
  5. 250
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
  6. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  7. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  8. 102
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  9. 64
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
  10. 118
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

9
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java

@ -65,4 +65,13 @@ public enum CommandType {
public String getDescp() {
return descp;
}
public static CommandType of(Integer status){
for(CommandType cmdType : values()){
if(cmdType.getCode() == status){
return cmdType;
}
}
throw new IllegalArgumentException("invalid status : " + status);
}
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -27,13 +27,14 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.util.Date;
/**
* task instance
*/
@TableName("t_ds_task_instance")
public class TaskInstance {
public class TaskInstance implements Serializable {
/**
* id
@ -198,7 +199,7 @@ public class TaskInstance {
public void init(String host,Date startTime,String executePath){
public void init(String host,Date startTime,String executePath){
this.host = host;
this.startTime = startTime;
this.executePath = executePath;

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java

@ -79,8 +79,10 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
/**
* list authorized UDF function
*
* @param userId userId
* @param dataSourceIds data source id array
* @param <T> T
* @return UDF function list
*/
<T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds);

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { stanceJson; Json; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInstanceJson='" + taskInstanceJson + '\'' + '}'; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInstanceJson; stanceJson; stanceJson; stanceJson; public String getTaskInstanceJson() { stanceJson; return taskInstanceJson; stanceJson; } stanceJson; stanceJson; public void setTaskInstanceJson(String taskInstanceJson) { stanceJson; this.taskInstanceJson = taskInstanceJson; stanceJson; } stanceJson; stanceJson; public ExecuteTaskRequestCommand() { stanceJson; } stanceJson; stanceJson; public ExecuteTaskRequestCommand(String taskInstanceJson) { Json; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInfoJson='" + taskInfoJson + '\'' + '}'; } }

250
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
import java.util.Date;
/**
* master/worker task transport
*/
public class TaskInfo implements Serializable{
/**
* task instance id
*/
private Integer taskId;
/**
* taks name
*/
private String taskName;
/**
* task start time
*/
private Date startTime;
/**
* task type
*/
private String taskType;
/**
* task execute path
*/
private String executePath;
/**
* task json
*/
private String taskJson;
/**
* process instance id
*/
private Integer processInstanceId;
/**
* process instance schedule time
*/
private Date scheduleTime;
/**
* process instance global parameters
*/
private String globalParams;
/**
* execute user id
*/
private Integer executorId;
/**
* command type if complement
*/
private Integer cmdTypeIfComplement;
/**
* tenant code
*/
private String tenantCode;
/**
* task queue
*/
private String queue;
/**
* process define id
*/
private Integer processDefineId;
/**
* project id
*/
private Integer projectId;
public Integer getTaskId() {
return taskId;
}
public void setTaskId(Integer taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getTaskJson() {
return taskJson;
}
public void setTaskJson(String taskJson) {
this.taskJson = taskJson;
}
public Integer getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(Integer processInstanceId) {
this.processInstanceId = processInstanceId;
}
public Date getScheduleTime() {
return scheduleTime;
}
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
public String getGlobalParams() {
return globalParams;
}
public void setGlobalParams(String globalParams) {
this.globalParams = globalParams;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public Integer getProcessDefineId() {
return processDefineId;
}
public void setProcessDefineId(Integer processDefineId) {
this.processDefineId = processDefineId;
}
public Integer getProjectId() {
return projectId;
}
public void setProjectId(Integer projectId) {
this.projectId = projectId;
}
public Integer getExecutorId() {
return executorId;
}
public void setExecutorId(Integer executorId) {
this.executorId = executorId;
}
public Integer getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
@Override
public String toString() {
return "TaskInfo{" +
"taskId=" + taskId +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", taskType='" + taskType + '\'' +
", executePath='" + executePath + '\'' +
", taskJson='" + taskJson + '\'' +
", processInstanceId=" + processInstanceId +
", scheduleTime=" + scheduleTime +
", globalParams='" + globalParams + '\'' +
", executorId=" + executorId +
", cmdTypeIfComplement=" + cmdTypeIfComplement +
", tenantCode='" + tenantCode + '\'' +
", queue='" + queue + '\'' +
", processDefineId=" + processDefineId +
", projectId=" + projectId +
'}';
}
}

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -118,6 +118,7 @@ public class MasterServer implements IStoppable {
//
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(45678);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
this.nettyRemotingServer.start();

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -46,6 +46,13 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
this.processService = processService;
}
/**
* task final result response
* need master process , state persistence
*
* @param channel channel
* @param command command
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));

102
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -17,14 +17,18 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
@ -124,10 +128,20 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
// TODO send task to worker
public void sendToWorker(TaskInstance taskInstance){
final Address address = new Address("127.0.0.1", 12346);
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance));
/**
* set taskInstance relation
*/
TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance);
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(
FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance)));
try {
Command responseCommand = nettyRemotingClient.sendSync(address, taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(responseCommand.getBody(), ExecuteTaskAckCommand.class);
Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
responseCommand.getBody(), ExecuteTaskAckCommand.class);
logger.info("taskAckCommand : {}",taskAckCommand);
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
@ -141,6 +155,88 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
}
/**
* set task instance relation
*
* @param taskInstance taskInstance
*/
private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
return taskInstance;
}
/**
* whehter tenant is null
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if(tenant == null){
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;
}
return false;
}
/**
* taskInstance convert to taskInfo
*
* @param taskInstance taskInstance
* @return taskInfo
*/
private TaskInfo convertToTaskInfo(TaskInstance taskInstance){
TaskInfo taskInfo = new TaskInfo();
taskInfo.setTaskId(taskInstance.getId());
taskInfo.setTaskName(taskInstance.getName());
taskInfo.setStartTime(taskInstance.getStartTime());
taskInfo.setTaskType(taskInstance.getTaskType());
taskInfo.setExecutePath(getExecLocalPath(taskInstance));
taskInfo.setTaskJson(taskInstance.getTaskJson());
taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId());
taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
taskInfo.setQueue(taskInstance.getProcessInstance().getQueue());
taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId());
taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId());
return taskInfo;
}
/**
* get execute local path
*
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
/**
* submit master base task exec thread
* @return TaskInstance

64
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java

@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -85,71 +85,39 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
String taskInstanceJson = taskRequestCommand.getTaskInstanceJson();
String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class);
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
//TODO this logic need add to master
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
return;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
//TODO end
TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class);
// local execute path
String execLocalPath = getExecLocalPath(taskInstance);
String execLocalPath = getExecLocalPath(taskInfo);
logger.info("task instance local execute path : {} ", execLocalPath);
// init task
taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath);
try {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode());
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode());
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
taskCallbackService.addCallbackChannel(taskInstance.getId(),
taskCallbackService.addCallbackChannel(taskInfo.getTaskId(),
new CallbackChannel(channel, command.getOpaque()));
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance,
workerExecService.submit(new TaskScheduleThread(taskInfo,
processService, taskCallbackService));
}
/**
* whehter tenant is null
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if(tenant == null){
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;
}
return false;
}
/**
* get execute local path
* @param taskInstance taskInstance
* get execute local path
*
* @param taskInfo taskInfo
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
private String getExecLocalPath(TaskInfo taskInfo){
return FileUtils.getProcessExecDir(taskInfo.getProjectId(),
taskInfo.getProcessDefineId(),
taskInfo.getProcessInstanceId(),
taskInfo.getTaskId());
}
}

118
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
@ -29,14 +30,12 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
@ -64,7 +63,7 @@ public class TaskScheduleThread implements Runnable {
/**
* task instance
*/
private TaskInstance taskInstance;
private TaskInfo taskInfo;
/**
* process service
@ -82,14 +81,15 @@ public class TaskScheduleThread implements Runnable {
private TaskCallbackService taskInstanceCallbackService;
/**
* constructor
* constructor
*
* @param taskInstance task instance
* @param processService process dao
* @param taskInfo taskInfo
* @param processService processService
* @param taskInstanceCallbackService taskInstanceCallbackService
*/
public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
this.processService = processService;
this.taskInstance = taskInstance;
this.taskInfo = taskInfo;
this.taskInstanceCallbackService = taskInstanceCallbackService;
}
@ -100,54 +100,50 @@ public class TaskScheduleThread implements Runnable {
try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType());
taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType());
taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand);
logger.info("script path : {}", taskInstance.getExecutePath());
logger.info("script path : {}", taskInfo.getExecutePath());
// task node
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class);
// get resource files
List<String> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local
downloadResource(
taskInstance.getExecutePath(),
taskInfo.getExecutePath(),
resourceFiles,
logger);
// get process instance according to tak instance
ProcessInstance processInstance = taskInstance.getProcessInstance();
// set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(),
taskInstance.getExecutePath(),
processInstance.getScheduleTime(),
taskInstance.getName(),
taskInstance.getTaskType(),
taskInstance.getId(),
taskInfo.getExecutePath(),
taskInfo.getScheduleTime(),
taskInfo.getTaskName(),
taskInfo.getTaskType(),
taskInfo.getTaskId(),
CommonUtils.getSystemEnvPath(),
processInstance.getTenantCode(),
processInstance.getQueue(),
taskInstance.getStartTime(),
taskInfo.getTenantCode(),
taskInfo.getQueue(),
taskInfo.getStartTime(),
getGlobalParamsMap(),
taskInstance.getDependency(),
processInstance.getCmdTypeIfComplement());
null,
CommandType.of(taskInfo.getCmdTypeIfComplement()));
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
taskInfo.getProcessDefineId(),
taskInfo.getProcessInstanceId(),
taskInfo.getTaskId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
taskInfo.getProcessDefineId(),
taskInfo.getProcessInstanceId(),
taskInfo.getTaskId()));
task = TaskManager.newTask(taskInstance.getTaskType(),
task = TaskManager.newTask(taskInfo.getTaskType(),
taskProps,
taskLogger);
@ -163,14 +159,14 @@ public class TaskScheduleThread implements Runnable {
//
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus());
logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus());
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand);
}
}
@ -182,7 +178,7 @@ public class TaskScheduleThread implements Runnable {
Map<String,String> globalParamsMap = new HashMap<>(16);
// global params string
String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams();
String globalParamsStr = taskInfo.getGlobalParams();
if (globalParamsStr != null) {
List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
@ -192,18 +188,18 @@ public class TaskScheduleThread implements Runnable {
}
/**
* build ack command
* @param taskType
* @param taskType taskType
*/
private ExecuteTaskAckCommand buildAckCommand(String taskType) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath());
ackCommand.setHost("localhost");
ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date());
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
ackCommand.setExecutePath(taskInstance.getExecutePath());
ackCommand.setExecutePath(taskInfo.getExecutePath());
}
return ackCommand;
}
@ -219,15 +215,15 @@ public class TaskScheduleThread implements Runnable {
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInstance.getId() + ".log";
taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH +
taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInfo.getTaskId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInstance.getId() + ".log";
taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH +
taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInfo.getTaskId() + ".log";
}
/**
@ -333,33 +329,9 @@ public class TaskScheduleThread implements Runnable {
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
int userId = taskInstance.getProcessInstance().getExecutorId();
int userId = taskInfo.getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission();
}
/**
* update task state according to task type
* @param taskType
*/
private void updateTaskState(String taskType) {
// update task status is running
if(taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name())){
processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
getTaskLogPath(),
taskInstance.getId());
}else{
processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
taskInstance.getExecutePath(),
getTaskLogPath(),
taskInstance.getId());
}
}
}
Loading…
Cancel
Save