From d6ea202ed70e408c992b17a3e797e05e27df2baf Mon Sep 17 00:00:00 2001 From: Tboy Date: Fri, 21 Feb 2020 20:33:10 +0800 Subject: [PATCH 1/2] Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei --- .../common/enums/CommandType.java | 9 + .../dao/entity/TaskInstance.java | 5 +- .../dao/mapper/DataSourceMapper.java | 2 + .../command/ExecuteTaskRequestCommand.java | 2 +- .../remote/command/TaskInfo.java | 250 ++++++++++++++++++ .../server/master/MasterServer.java | 1 + .../processor/TaskResponseProcessor.java | 7 + .../runner/MasterBaseTaskExecThread.java | 102 ++++++- .../processor/WorkerRequestProcessor.java | 64 ++--- .../worker/runner/TaskScheduleThread.java | 118 ++++----- 10 files changed, 433 insertions(+), 127 deletions(-) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java index 1ee79156dc..56fdd078d7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java @@ -65,4 +65,13 @@ public enum CommandType { public String getDescp() { return descp; } + + public static CommandType of(Integer status){ + for(CommandType cmdType : values()){ + if(cmdType.getCode() == status){ + return cmdType; + } + } + throw new IllegalArgumentException("invalid status : " + status); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 2db1eda8f4..c9481baf94 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -27,13 +27,14 @@ import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import java.io.Serializable; import java.util.Date; /** * task instance */ @TableName("t_ds_task_instance") -public class TaskInstance { +public class TaskInstance implements Serializable { /** * id @@ -198,7 +199,7 @@ public class TaskInstance { - public void init(String host,Date startTime,String executePath){ + public void init(String host,Date startTime,String executePath){ this.host = host; this.startTime = startTime; this.executePath = executePath; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java index f95fbc7a4d..0c3238a5c5 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java @@ -79,8 +79,10 @@ public interface DataSourceMapper extends BaseMapper { /** * list authorized UDF function + * * @param userId userId * @param dataSourceIds data source id array + * @param T * @return UDF function list */ List listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index 4d01142c97..e1556f32f1 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInstanceJson='" + taskInstanceJson + '\'' + '}'; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInfoJson; public String getTaskInfoJson() { return taskInfoJson; } public void setTaskInfoJson(String taskInfoJson) { this.taskInfoJson = taskInfoJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInfoJson) { this.taskInfoJson = taskInfoJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInfoJson='" + taskInfoJson + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java new file mode 100644 index 0000000000..3fb58fe3da --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import java.io.Serializable; +import java.util.Date; + +/** + * master/worker task transport + */ +public class TaskInfo implements Serializable{ + + /** + * task instance id + */ + private Integer taskId; + + + /** + * taks name + */ + private String taskName; + + /** + * task start time + */ + private Date startTime; + + /** + * task type + */ + private String taskType; + + /** + * task execute path + */ + private String executePath; + + /** + * task json + */ + private String taskJson; + + + /** + * process instance id + */ + private Integer processInstanceId; + + + /** + * process instance schedule time + */ + private Date scheduleTime; + + /** + * process instance global parameters + */ + private String globalParams; + + + /** + * execute user id + */ + private Integer executorId; + + + /** + * command type if complement + */ + private Integer cmdTypeIfComplement; + + + /** + * tenant code + */ + private String tenantCode; + + /** + * task queue + */ + private String queue; + + + /** + * process define id + */ + private Integer processDefineId; + + /** + * project id + */ + private Integer projectId; + + public Integer getTaskId() { + return taskId; + } + + public void setTaskId(Integer taskId) { + this.taskId = taskId; + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public String getTaskType() { + return taskType; + } + + public void setTaskType(String taskType) { + this.taskType = taskType; + } + + public String getExecutePath() { + return executePath; + } + + public void setExecutePath(String executePath) { + this.executePath = executePath; + } + + public String getTaskJson() { + return taskJson; + } + + public void setTaskJson(String taskJson) { + this.taskJson = taskJson; + } + + public Integer getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(Integer processInstanceId) { + this.processInstanceId = processInstanceId; + } + + public Date getScheduleTime() { + return scheduleTime; + } + + public void setScheduleTime(Date scheduleTime) { + this.scheduleTime = scheduleTime; + } + + public String getGlobalParams() { + return globalParams; + } + + public void setGlobalParams(String globalParams) { + this.globalParams = globalParams; + } + + public String getTenantCode() { + return tenantCode; + } + + public void setTenantCode(String tenantCode) { + this.tenantCode = tenantCode; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public Integer getProcessDefineId() { + return processDefineId; + } + + public void setProcessDefineId(Integer processDefineId) { + this.processDefineId = processDefineId; + } + + public Integer getProjectId() { + return projectId; + } + + public void setProjectId(Integer projectId) { + this.projectId = projectId; + } + + public Integer getExecutorId() { + return executorId; + } + + public void setExecutorId(Integer executorId) { + this.executorId = executorId; + } + + public Integer getCmdTypeIfComplement() { + return cmdTypeIfComplement; + } + + public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) { + this.cmdTypeIfComplement = cmdTypeIfComplement; + } + + @Override + public String toString() { + return "TaskInfo{" + + "taskId=" + taskId + + ", taskName='" + taskName + '\'' + + ", startTime=" + startTime + + ", taskType='" + taskType + '\'' + + ", executePath='" + executePath + '\'' + + ", taskJson='" + taskJson + '\'' + + ", processInstanceId=" + processInstanceId + + ", scheduleTime=" + scheduleTime + + ", globalParams='" + globalParams + '\'' + + ", executorId=" + executorId + + ", cmdTypeIfComplement=" + cmdTypeIfComplement + + ", tenantCode='" + tenantCode + '\'' + + ", queue='" + queue + '\'' + + ", processDefineId=" + processDefineId + + ", projectId=" + projectId + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 0a153e7041..d0c7bc2350 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -118,6 +118,7 @@ public class MasterServer implements IStoppable { // //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(45678); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService)); this.nettyRemotingServer.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 0dd45f091e..c3b6a05676 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -46,6 +46,13 @@ public class TaskResponseProcessor implements NettyRequestProcessor { this.processService = processService; } + /** + * task final result response + * need master process , state persistence + * + * @param channel channel + * @param command command + */ @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index a261b3430a..09005a1f27 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -17,14 +17,18 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.utils.BeanContext; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Address; @@ -124,10 +128,20 @@ public class MasterBaseTaskExecThread implements Callable { // TODO send task to worker public void sendToWorker(TaskInstance taskInstance){ final Address address = new Address("127.0.0.1", 12346); - ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance)); + /** + * set taskInstance relation + */ + TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance); + + ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand( + FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance))); try { - Command responseCommand = nettyRemotingClient.sendSync(address, taskRequestCommand.convert2Command(), Integer.MAX_VALUE); - ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(responseCommand.getBody(), ExecuteTaskAckCommand.class); + Command responseCommand = nettyRemotingClient.sendSync(address, + taskRequestCommand.convert2Command(), Integer.MAX_VALUE); + + ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( + responseCommand.getBody(), ExecuteTaskAckCommand.class); + logger.info("taskAckCommand : {}",taskAckCommand); processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()), taskAckCommand.getStartTime(), @@ -141,6 +155,88 @@ public class MasterBaseTaskExecThread implements Callable { } } + + /** + * set task instance relation + * + * @param taskInstance taskInstance + */ + private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){ + taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); + + int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); + Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); + // verify tenant is null + if (verifyTenantIsNull(tenant, taskInstance)) { + processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId()); + return null; + } + // set queue for process instance, user-specified queue takes precedence over tenant queue + String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); + taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); + taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); + + return taskInstance; + } + + + /** + * whehter tenant is null + * @param tenant tenant + * @param taskInstance taskInstance + * @return result + */ + private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { + if(tenant == null){ + logger.error("tenant not exists,process instance id : {},task instance id : {}", + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); + return true; + } + return false; + } + + + /** + * taskInstance convert to taskInfo + * + * @param taskInstance taskInstance + * @return taskInfo + */ + private TaskInfo convertToTaskInfo(TaskInstance taskInstance){ + TaskInfo taskInfo = new TaskInfo(); + taskInfo.setTaskId(taskInstance.getId()); + taskInfo.setTaskName(taskInstance.getName()); + taskInfo.setStartTime(taskInstance.getStartTime()); + taskInfo.setTaskType(taskInstance.getTaskType()); + taskInfo.setExecutePath(getExecLocalPath(taskInstance)); + taskInfo.setTaskJson(taskInstance.getTaskJson()); + taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId()); + taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime()); + taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams()); + taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId()); + taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode()); + taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); + taskInfo.setQueue(taskInstance.getProcessInstance().getQueue()); + taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId()); + taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId()); + + return taskInfo; + } + + + /** + * get execute local path + * + * @return execute local path + */ + private String getExecLocalPath(TaskInstance taskInstance){ + return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); + } + /** * submit master base task exec thread * @return TaskInstance diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index 8ea9ccbe5b..ba2149492b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -85,71 +85,39 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( command.getBody(), ExecuteTaskRequestCommand.class); - String taskInstanceJson = taskRequestCommand.getTaskInstanceJson(); + String taskInstanceJson = taskRequestCommand.getTaskInfoJson(); - TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class); - - taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); - - - //TODO this logic need add to master - int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); - Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); - // verify tenant is null - if (verifyTenantIsNull(tenant, taskInstance)) { - processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId()); - return; - } - // set queue for process instance, user-specified queue takes precedence over tenant queue - String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); - taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); - taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); - //TODO end + TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class); // local execute path - String execLocalPath = getExecLocalPath(taskInstance); + String execLocalPath = getExecLocalPath(taskInfo); logger.info("task instance local execute path : {} ", execLocalPath); - // init task - taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath); + try { - FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode()); + FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode()); } catch (Exception ex){ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); } - taskCallbackService.addCallbackChannel(taskInstance.getId(), + taskCallbackService.addCallbackChannel(taskInfo.getTaskId(), new CallbackChannel(channel, command.getOpaque())); // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, + workerExecService.submit(new TaskScheduleThread(taskInfo, processService, taskCallbackService)); } - /** - * whehter tenant is null - * @param tenant tenant - * @param taskInstance taskInstance - * @return result - */ - private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { - if(tenant == null){ - logger.error("tenant not exists,process instance id : {},task instance id : {}", - taskInstance.getProcessInstance().getId(), - taskInstance.getId()); - return true; - } - return false; - } /** - * get execute local path - * @param taskInstance taskInstance + * get execute local path + * + * @param taskInfo taskInfo * @return execute local path */ - private String getExecLocalPath(TaskInstance taskInstance){ - return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId()); + private String getExecLocalPath(TaskInfo taskInfo){ + return FileUtils.getProcessExecDir(taskInfo.getProjectId(), + taskInfo.getProcessDefineId(), + taskInfo.getProcessInstanceId(), + taskInfo.getTaskId()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 349e762616..04ee565871 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator; @@ -29,14 +30,12 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; -import org.apache.dolphinscheduler.common.utils.CommonUtils; -import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; @@ -64,7 +63,7 @@ public class TaskScheduleThread implements Runnable { /** * task instance */ - private TaskInstance taskInstance; + private TaskInfo taskInfo; /** * process service @@ -82,14 +81,15 @@ public class TaskScheduleThread implements Runnable { private TaskCallbackService taskInstanceCallbackService; /** - * constructor + * constructor * - * @param taskInstance task instance - * @param processService process dao + * @param taskInfo taskInfo + * @param processService processService + * @param taskInstanceCallbackService taskInstanceCallbackService */ - public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ + public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ this.processService = processService; - this.taskInstance = taskInstance; + this.taskInfo = taskInfo; this.taskInstanceCallbackService = taskInstanceCallbackService; } @@ -100,54 +100,50 @@ public class TaskScheduleThread implements Runnable { try { // tell master that task is in executing - ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType()); - taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); + ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType()); + taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand); - logger.info("script path : {}", taskInstance.getExecutePath()); + logger.info("script path : {}", taskInfo.getExecutePath()); // task node - TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); + TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class); // get resource files List resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local downloadResource( - taskInstance.getExecutePath(), + taskInfo.getExecutePath(), resourceFiles, logger); - - // get process instance according to tak instance - ProcessInstance processInstance = taskInstance.getProcessInstance(); - // set task props TaskProps taskProps = new TaskProps(taskNode.getParams(), - taskInstance.getExecutePath(), - processInstance.getScheduleTime(), - taskInstance.getName(), - taskInstance.getTaskType(), - taskInstance.getId(), + taskInfo.getExecutePath(), + taskInfo.getScheduleTime(), + taskInfo.getTaskName(), + taskInfo.getTaskType(), + taskInfo.getTaskId(), CommonUtils.getSystemEnvPath(), - processInstance.getTenantCode(), - processInstance.getQueue(), - taskInstance.getStartTime(), + taskInfo.getTenantCode(), + taskInfo.getQueue(), + taskInfo.getStartTime(), getGlobalParamsMap(), - taskInstance.getDependency(), - processInstance.getCmdTypeIfComplement()); + null, + CommandType.of(taskInfo.getCmdTypeIfComplement())); // set task timeout setTaskTimeout(taskProps, taskNode); taskProps.setTaskAppId(String.format("%s_%s_%s", - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); + taskInfo.getProcessDefineId(), + taskInfo.getProcessInstanceId(), + taskInfo.getTaskId())); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); + taskInfo.getProcessDefineId(), + taskInfo.getProcessInstanceId(), + taskInfo.getTaskId())); - task = TaskManager.newTask(taskInstance.getTaskType(), + task = TaskManager.newTask(taskInfo.getTaskType(), taskProps, taskLogger); @@ -163,14 +159,14 @@ public class TaskScheduleThread implements Runnable { // responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); - logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus()); + logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus()); }catch (Exception e){ logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); } finally { - taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); + taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand); } } @@ -182,7 +178,7 @@ public class TaskScheduleThread implements Runnable { Map globalParamsMap = new HashMap<>(16); // global params string - String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams(); + String globalParamsStr = taskInfo.getGlobalParams(); if (globalParamsStr != null) { List globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class); @@ -192,18 +188,18 @@ public class TaskScheduleThread implements Runnable { } /** * build ack command - * @param taskType + * @param taskType taskType */ private ExecuteTaskAckCommand buildAckCommand(String taskType) { ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand(); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); ackCommand.setLogPath(getTaskLogPath()); - ackCommand.setHost("localhost"); + ackCommand.setHost(OSUtils.getHost()); ackCommand.setStartTime(new Date()); if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){ ackCommand.setExecutePath(null); }else{ - ackCommand.setExecutePath(taskInstance.getExecutePath()); + ackCommand.setExecutePath(taskInfo.getExecutePath()); } return ackCommand; } @@ -219,15 +215,15 @@ public class TaskScheduleThread implements Runnable { .getDiscriminator()).getLogBase(); if (baseLog.startsWith(Constants.SINGLE_SLASH)){ return baseLog + Constants.SINGLE_SLASH + - taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + - taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInstance.getId() + ".log"; + taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + + taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskInfo.getTaskId() + ".log"; } return System.getProperty("user.dir") + Constants.SINGLE_SLASH + baseLog + Constants.SINGLE_SLASH + - taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + - taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInstance.getId() + ".log"; + taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + + taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskInfo.getTaskId() + ".log"; } /** @@ -333,33 +329,9 @@ public class TaskScheduleThread implements Runnable { * @throws Exception exception */ private void checkDownloadPermission(List projectRes) throws Exception { - int userId = taskInstance.getProcessInstance().getExecutorId(); + int userId = taskInfo.getExecutorId(); String[] resNames = projectRes.toArray(new String[projectRes.size()]); PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); permissionCheck.checkPermission(); } - - /** - * update task state according to task type - * @param taskType - */ - private void updateTaskState(String taskType) { - // update task status is running - if(taskType.equals(TaskType.SQL.name()) || - taskType.equals(TaskType.PROCEDURE.name())){ - processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, - taskInstance.getStartTime(), - taskInstance.getHost(), - null, - getTaskLogPath(), - taskInstance.getId()); - }else{ - processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, - taskInstance.getStartTime(), - taskInstance.getHost(), - taskInstance.getExecutePath(), - getTaskLogPath(), - taskInstance.getId()); - } - } } \ No newline at end of file From 64d538e158e6690960d6333dd62232bc7d86e9f8 Mon Sep 17 00:00:00 2001 From: Technoboy- Date: Fri, 21 Feb 2020 21:54:46 +0800 Subject: [PATCH 2/2] updates --- .../TaskExecutionContext.java} | 6 +- .../server/master/host/Host.java | 95 +++++++++++ .../server/master/host/HostManager.java | 27 +++ .../master/host/RoundRobinHostManager.java | 57 +++++++ .../master/host/assign/RandomSelector.java | 45 +++++ .../host/assign/RoundRobinSelector.java | 40 +++++ .../server/master/host/assign/Selector.java | 26 +++ .../master/registry/MasterRegistry.java | 104 ++++++++++++ .../runner/MasterBaseTaskExecThread.java | 43 ++--- .../server/registry/ZookeeperNodeManager.java | 159 ++++++++++++++++++ .../registry/ZookeeperRegistryCenter.java | 25 +++ .../processor/WorkerRequestProcessor.java | 30 ++-- .../worker/registry/WorkerRegistry.java | 2 +- .../worker/runner/TaskScheduleThread.java | 78 +++++---- .../service/zk/AbstractListener.java | 36 ++++ .../service/zk/ZookeeperCachedOperator.java | 7 +- 16 files changed, 696 insertions(+), 84 deletions(-) rename dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/{command/TaskInfo.java => entity/TaskExecutionContext.java} (97%) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java similarity index 97% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java index 3fb58fe3da..783d166e96 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.command; +package org.apache.dolphinscheduler.remote.entity; import java.io.Serializable; import java.util.Date; @@ -23,7 +23,7 @@ import java.util.Date; /** * master/worker task transport */ -public class TaskInfo implements Serializable{ +public class TaskExecutionContext implements Serializable{ /** * task instance id @@ -229,7 +229,7 @@ public class TaskInfo implements Serializable{ @Override public String toString() { - return "TaskInfo{" + + return "TaskExecutionContext{" + "taskId=" + taskId + ", taskName='" + taskName + '\'' + ", startTime=" + startTime + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java new file mode 100644 index 0000000000..57e64c1446 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host; + + +import java.util.Objects; + + +public class Host { + + private String address; + + private String ip; + + private int port; + + public Host() { + } + + public Host(String ip, int port) { + this.ip = ip; + this.port = port; + this.address = ip + ":" + port; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + this.address = ip + ":" + port; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + this.address = ip + ":" + port; + } + + public static Host of(String address){ + String[] parts = address.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException(String.format("Address : %s illegal.", address)); + } + Host host = new Host(parts[0], Integer.parseInt(parts[1])); + return host; + } + + @Override + public String toString() { + return "Host{" + + "address='" + address + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Host host = (Host) o; + return Objects.equals(getAddress(), host.getAddress()); + } + + @Override + public int hashCode() { + return Objects.hash(getAddress()); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java new file mode 100644 index 0000000000..316ce36d5d --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host; + + +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; + +public interface HostManager { + + Host select(TaskExecutionContext context); + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java new file mode 100644 index 0000000000..18a4659c13 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host; + +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + +@Service +public class RoundRobinHostManager implements HostManager { + + private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class); + + @Autowired + private RoundRobinSelector selector; + + @Autowired + private ZookeeperNodeManager zookeeperNodeManager; + + @Override + public Host select(TaskExecutionContext context){ + Host host = new Host(); + Collection nodes = zookeeperNodeManager.getWorkerNodes(); + if(CollectionUtils.isEmpty(nodes)){ + return host; + } + List candidateHosts = new ArrayList<>(nodes.size()); + nodes.stream().forEach(node -> candidateHosts.add(Host.of(node))); + return selector.select(candidateHosts); + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java new file mode 100644 index 0000000000..3a3f1237bf --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host.assign; + +import java.util.Collection; +import java.util.Random; + + +public class RandomSelector implements Selector { + + private final Random random = new Random(); + + @Override + public T select(final Collection source) { + + if (source == null || source.size() == 0) { + throw new IllegalArgumentException("Empty source."); + } + + if (source.size() == 1) { + return (T) source.toArray()[0]; + } + + int size = source.size(); + int randomIndex = random.nextInt(size); + + return (T) source.toArray()[randomIndex]; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java new file mode 100644 index 0000000000..d3422963b0 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.host.assign; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + + +public class RoundRobinSelector implements Selector { + + private final AtomicInteger index = new AtomicInteger(0); + + @Override + public T select(Collection source) { + if (source == null || source.size() == 0) { + throw new IllegalArgumentException("Empty source."); + } + + if (source.size() == 1) { + return (T)source.toArray()[0]; + } + + int size = source.size(); + return (T) source.toArray()[index.getAndIncrement() % size]; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java new file mode 100644 index 0000000000..c6772f3e03 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host.assign; + +import java.util.Collection; + + +public interface Selector { + + T select(Collection source); +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java new file mode 100644 index 0000000000..a9c111d0c9 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.registry; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * master registry + */ +public class MasterRegistry { + + private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class); + + /** + * zookeeper registry center + */ + private final ZookeeperRegistryCenter zookeeperRegistryCenter; + + /** + * port + */ + private final int port; + + /** + * construct + * @param zookeeperRegistryCenter zookeeperRegistryCenter + * @param port port + */ + public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ + this.zookeeperRegistryCenter = zookeeperRegistryCenter; + this.port = port; + } + + /** + * registry + */ + public void registry() { + String address = Constants.LOCAL_ADDRESS; + String localNodePath = getWorkerPath(); + zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, ""); + zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if(newState == ConnectionState.LOST){ + logger.error("master : {} connection lost from zookeeper", address); + } else if(newState == ConnectionState.RECONNECTED){ + logger.info("master : {} reconnected to zookeeper", address); + zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, ""); + } else if(newState == ConnectionState.SUSPENDED){ + logger.warn("master : {} connection SUSPENDED ", address); + } + } + }); + logger.info("master node : {} registry to ZK successfully.", address); + } + + /** + * remove registry info + */ + public void unRegistry() { + String address = getLocalAddress(); + String localNodePath = getWorkerPath(); + zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); + logger.info("worker node : {} unRegistry to ZK.", address); + } + + /** + * get worker path + * @return + */ + private String getWorkerPath() { + String address = getLocalAddress(); + String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address; + return localNodePath; + } + + /** + * get local address + * @return + */ + private String getLocalAddress(){ + return Constants.LOCAL_ADDRESS + ":" + port; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 09005a1f27..e7d998e3bf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; -import org.apache.dolphinscheduler.remote.command.TaskInfo; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Address; @@ -137,7 +137,7 @@ public class MasterBaseTaskExecThread implements Callable { FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance))); try { Command responseCommand = nettyRemotingClient.sendSync(address, - taskRequestCommand.convert2Command(), Integer.MAX_VALUE); + taskRequestCommand.convert2Command(), 2000); ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( responseCommand.getBody(), ExecuteTaskAckCommand.class); @@ -156,6 +156,7 @@ public class MasterBaseTaskExecThread implements Callable { } + /** * set task instance relation * @@ -203,25 +204,25 @@ public class MasterBaseTaskExecThread implements Callable { * @param taskInstance taskInstance * @return taskInfo */ - private TaskInfo convertToTaskInfo(TaskInstance taskInstance){ - TaskInfo taskInfo = new TaskInfo(); - taskInfo.setTaskId(taskInstance.getId()); - taskInfo.setTaskName(taskInstance.getName()); - taskInfo.setStartTime(taskInstance.getStartTime()); - taskInfo.setTaskType(taskInstance.getTaskType()); - taskInfo.setExecutePath(getExecLocalPath(taskInstance)); - taskInfo.setTaskJson(taskInstance.getTaskJson()); - taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId()); - taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime()); - taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams()); - taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId()); - taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode()); - taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); - taskInfo.setQueue(taskInstance.getProcessInstance().getQueue()); - taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId()); - taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId()); - - return taskInfo; + private TaskExecutionContext convertToTaskInfo(TaskInstance taskInstance){ + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskId(taskInstance.getId()); + taskExecutionContext.setTaskName(taskInstance.getName()); + taskExecutionContext.setStartTime(taskInstance.getStartTime()); + taskExecutionContext.setTaskType(taskInstance.getTaskType()); + taskExecutionContext.setExecutePath(getExecLocalPath(taskInstance)); + taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); + taskExecutionContext.setProcessInstanceId(taskInstance.getProcessInstance().getId()); + taskExecutionContext.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime()); + taskExecutionContext.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams()); + taskExecutionContext.setExecutorId(taskInstance.getProcessInstance().getExecutorId()); + taskExecutionContext.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode()); + taskExecutionContext.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); + taskExecutionContext.setQueue(taskInstance.getProcessInstance().getQueue()); + taskExecutionContext.setProcessDefineId(taskInstance.getProcessDefine().getId()); + taskExecutionContext.setProjectId(taskInstance.getProcessDefine().getProjectId()); + + return taskExecutionContext; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java new file mode 100644 index 0000000000..e3eacafa84 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.registry; + +import org.apache.curator.framework.CuratorFramework; + +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.dolphinscheduler.service.zk.AbstractListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +@Service +public abstract class ZookeeperNodeManager implements InitializingBean { + + private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class); + + private final Lock masterLock = new ReentrantLock(); + + private final Lock workerLock = new ReentrantLock(); + + private final Set workerNodes = new HashSet<>(); + + private final Set masterNodes = new HashSet<>(); + + @Autowired + private ZookeeperRegistryCenter registryCenter; + + @Override + public void afterPropertiesSet() throws Exception { + load(); + registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener()); + registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener()); + } + + private void load(){ + Set schedulerNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(schedulerNodes); + Set workersNodes = registryCenter.getWorkerNodesDirectly(); + syncWorkerNodes(workersNodes); + } + + class WorkerNodeListener extends AbstractListener { + + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + if(registryCenter.isWorkerPath(path)){ + try { + if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { + logger.info("worker node : {} added.", path); + Set previousNodes = new HashSet<>(workerNodes); + Set currentNodes = registryCenter.getWorkerNodesDirectly(); + syncWorkerNodes(currentNodes); + } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { + logger.info("worker node : {} down.", path); + Set previousNodes = new HashSet<>(workerNodes); + Set currentNodes = registryCenter.getWorkerNodesDirectly(); + syncWorkerNodes(currentNodes); + } + } catch (IllegalArgumentException ignore) { + logger.warn(ignore.getMessage()); + } catch (Exception ex) { + logger.error("WorkerListener capture data change and get data failed", ex); + } + } + } + } + + + class MasterNodeListener extends AbstractListener { + + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + if (registryCenter.isMasterPath(path)) { + try { + if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { + logger.info("master node : {} added.", path); + Set previousNodes = new HashSet<>(masterNodes); + Set currentNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(currentNodes); + } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { + logger.info("master node : {} down.", path); + Set previousNodes = new HashSet<>(masterNodes); + Set currentNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(currentNodes); + } + } catch (Exception ex) { + logger.error("MasterNodeListener capture data change and get data failed.", ex); + } + } + } + } + + public Set getMasterNodes() { + masterLock.lock(); + try { + return Collections.unmodifiableSet(masterNodes); + } finally { + masterLock.unlock(); + } + } + + private void syncMasterNodes(Set nodes){ + masterLock.lock(); + try { + masterNodes.clear(); + masterNodes.addAll(nodes); + } finally { + masterLock.unlock(); + } + } + + private void syncWorkerNodes(Set nodes){ + workerLock.lock(); + try { + workerNodes.clear(); + workerNodes.addAll(nodes); + } finally { + workerLock.unlock(); + } + } + + public Set getWorkerNodes(){ + workerLock.lock(); + try { + return Collections.unmodifiableSet(workerNodes); + } finally { + workerLock.unlock(); + } + } + + public void close(){ + registryCenter.close(); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index 68c19ea4eb..96b8424b55 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -22,6 +22,9 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @Service @@ -76,6 +79,28 @@ public class ZookeeperRegistryCenter implements InitializingBean { return WORKER_PATH; } + public Set getMasterNodesDirectly() { + List masters = getChildrenKeys(MASTER_PATH); + return new HashSet<>(masters); + } + + public Set getWorkerNodesDirectly() { + List workers = getChildrenKeys(WORKER_PATH); + return new HashSet<>(workers); + } + + public boolean isWorkerPath(String path) { + return path != null && path.contains(WORKER_PATH); + } + + public boolean isMasterPath(String path) { + return path != null && path.contains(MASTER_PATH); + } + + public List getChildrenKeys(final String key) { + return zookeeperCachedOperator.getChildrenKeys(key); + } + public ZookeeperCachedOperator getZookeeperCachedOperator() { return zookeeperCachedOperator; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index ba2149492b..cbbb5cee38 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -19,18 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor; import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; -import org.apache.dolphinscheduler.remote.command.TaskInfo; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -40,7 +35,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; import java.util.concurrent.ExecutorService; /** @@ -87,23 +81,23 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { String taskInstanceJson = taskRequestCommand.getTaskInfoJson(); - TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class); + TaskExecutionContext taskExecutionContext = JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class); // local execute path - String execLocalPath = getExecLocalPath(taskInfo); + String execLocalPath = getExecLocalPath(taskExecutionContext); logger.info("task instance local execute path : {} ", execLocalPath); try { - FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode()); + FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode()); } catch (Exception ex){ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); } - taskCallbackService.addCallbackChannel(taskInfo.getTaskId(), + taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(), new CallbackChannel(channel, command.getOpaque())); // submit task - workerExecService.submit(new TaskScheduleThread(taskInfo, + workerExecService.submit(new TaskScheduleThread(taskExecutionContext, processService, taskCallbackService)); } @@ -111,13 +105,13 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { /** * get execute local path * - * @param taskInfo taskInfo + * @param taskExecutionContext taskExecutionContext * @return execute local path */ - private String getExecLocalPath(TaskInfo taskInfo){ - return FileUtils.getProcessExecDir(taskInfo.getProjectId(), - taskInfo.getProcessDefineId(), - taskInfo.getProcessInstanceId(), - taskInfo.getTaskId()); + private String getExecLocalPath(TaskExecutionContext taskExecutionContext){ + return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskId()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index fc81638705..a0f4e664b5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -71,7 +71,7 @@ public class WorkerRegistry { } } }); - logger.info("scheduler node : {} registry to ZK successfully.", address); + logger.info("worker node : {} registry to ZK successfully.", address); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 04ee565871..53a25b1016 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -31,11 +31,9 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.command.TaskInfo; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; @@ -63,7 +61,7 @@ public class TaskScheduleThread implements Runnable { /** * task instance */ - private TaskInfo taskInfo; + private TaskExecutionContext taskExecutionContext; /** * process service @@ -83,67 +81,67 @@ public class TaskScheduleThread implements Runnable { /** * constructor * - * @param taskInfo taskInfo + * @param taskExecutionContext taskExecutionContext * @param processService processService * @param taskInstanceCallbackService taskInstanceCallbackService */ - public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ + public TaskScheduleThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ this.processService = processService; - this.taskInfo = taskInfo; + this.taskExecutionContext = taskExecutionContext; this.taskInstanceCallbackService = taskInstanceCallbackService; } @Override public void run() { - ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); + ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskId()); try { // tell master that task is in executing - ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType()); - taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand); + ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType()); + taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(), ackCommand); - logger.info("script path : {}", taskInfo.getExecutePath()); + logger.info("script path : {}", taskExecutionContext.getExecutePath()); // task node - TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class); + TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); // get resource files List resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local downloadResource( - taskInfo.getExecutePath(), + taskExecutionContext.getExecutePath(), resourceFiles, logger); // set task props TaskProps taskProps = new TaskProps(taskNode.getParams(), - taskInfo.getExecutePath(), - taskInfo.getScheduleTime(), - taskInfo.getTaskName(), - taskInfo.getTaskType(), - taskInfo.getTaskId(), + taskExecutionContext.getExecutePath(), + taskExecutionContext.getScheduleTime(), + taskExecutionContext.getTaskName(), + taskExecutionContext.getTaskType(), + taskExecutionContext.getTaskId(), CommonUtils.getSystemEnvPath(), - taskInfo.getTenantCode(), - taskInfo.getQueue(), - taskInfo.getStartTime(), + taskExecutionContext.getTenantCode(), + taskExecutionContext.getQueue(), + taskExecutionContext.getStartTime(), getGlobalParamsMap(), null, - CommandType.of(taskInfo.getCmdTypeIfComplement())); + CommandType.of(taskExecutionContext.getCmdTypeIfComplement())); // set task timeout setTaskTimeout(taskProps, taskNode); taskProps.setTaskAppId(String.format("%s_%s_%s", - taskInfo.getProcessDefineId(), - taskInfo.getProcessInstanceId(), - taskInfo.getTaskId())); + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskId())); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskInfo.getProcessDefineId(), - taskInfo.getProcessInstanceId(), - taskInfo.getTaskId())); + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskId())); - task = TaskManager.newTask(taskInfo.getTaskType(), + task = TaskManager.newTask(taskExecutionContext.getTaskType(), taskProps, taskLogger); @@ -159,14 +157,14 @@ public class TaskScheduleThread implements Runnable { // responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); - logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus()); + logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskId(), task.getExitStatus()); }catch (Exception e){ logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); } finally { - taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand); + taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(), responseCommand); } } @@ -178,7 +176,7 @@ public class TaskScheduleThread implements Runnable { Map globalParamsMap = new HashMap<>(16); // global params string - String globalParamsStr = taskInfo.getGlobalParams(); + String globalParamsStr = taskExecutionContext.getGlobalParams(); if (globalParamsStr != null) { List globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class); @@ -199,7 +197,7 @@ public class TaskScheduleThread implements Runnable { if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){ ackCommand.setExecutePath(null); }else{ - ackCommand.setExecutePath(taskInfo.getExecutePath()); + ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); } return ackCommand; } @@ -215,15 +213,15 @@ public class TaskScheduleThread implements Runnable { .getDiscriminator()).getLogBase(); if (baseLog.startsWith(Constants.SINGLE_SLASH)){ return baseLog + Constants.SINGLE_SLASH + - taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + - taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInfo.getTaskId() + ".log"; + taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + + taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskExecutionContext.getTaskId() + ".log"; } return System.getProperty("user.dir") + Constants.SINGLE_SLASH + baseLog + Constants.SINGLE_SLASH + - taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + - taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInfo.getTaskId() + ".log"; + taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + + taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskExecutionContext.getTaskId() + ".log"; } /** @@ -329,7 +327,7 @@ public class TaskScheduleThread implements Runnable { * @throws Exception exception */ private void checkDownloadPermission(List projectRes) throws Exception { - int userId = taskInfo.getExecutorId(); + int userId = taskExecutionContext.getExecutorId(); String[] resNames = projectRes.toArray(new String[projectRes.size()]); PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); permissionCheck.checkPermission(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java new file mode 100644 index 0000000000..3e3e6c8c20 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.zk; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; + +public abstract class AbstractListener implements TreeCacheListener { + + @Override + public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { + String path = null == event.getData() ? "" : event.getData().getPath(); + if (path.isEmpty()) { + return; + } + dataChanged(client, event, path); + } + + protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java index dccb768f8b..6c38a68f3e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java @@ -20,6 +20,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -32,7 +33,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class); - TreeCache treeCache; + private TreeCache treeCache; /** * register a unified listener of /${dsRoot}, */ @@ -72,6 +73,10 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { return treeCache; } + public void addListener(TreeCacheListener listener){ + this.treeCache.getListenable().addListener(listener); + } + @Override public void close() { treeCache.close();