Browse Source

Refactor worker (#1979)

* updates

* move FetchTaskThread logic to WorkerNettyRequestProcessor

* add NettyRemotingClient to scheduler thread

* refactor TaskScheduleThread, add TaskInstanceCallbackService
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
ac93d78379
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  2. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
  3. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
  4. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
  5. 49
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
  6. 76
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java
  7. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
  8. 73
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack */ EXECUTE_TASK_ACK, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java

@ -0,0 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { /** * task id */ concurrent.atomic.AtomicLong; /** concurrent.atomic.AtomicLong; concurrent.atomic.AtomicLong; */ concurrent.atomic.AtomicLong; /** concurrent.atomic.AtomicLong; /** */ concurrent.atomic.AtomicLong; * execute task response command /** concurrent.atomic.AtomicLong; */ */ concurrent.atomic.AtomicLong; public class ExecuteTaskResponseCommand implements Serializable { /** * execute count */ concurrent.atomic.AtomicLong; * task id /** * execute time */ private long executeTime; public String getAttemptId() { return attemptId; } * execute task response command */ /** public class ExecuteTaskResponseCommand implements Serializable { /** } public void setTaskId(String taskId) { this.taskId = taskId; } /** return result; /** public void setResult(Object result) { this.result = result; } /** * execute task response command /** */ } /** public class ExecuteTaskResponseCommand implements Serializable { /** /** } /** * task id /** */ } * execute task response command * execute task response command concurrent.atomic.AtomicLong; } public long getExecuteTime() { * execute task response command /** /** * execute task response command * execute task response command * execute task response command */ } * execute task response command public class ExecuteTaskResponseCommand implements Serializable { Command command = new Command(); * execute task response command * task id byte[] body = FastJsonSerializer.serialize(this); */ return command; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util. */ /** */ * execute task response command */ */ */ public class ExecuteTaskResponseCommand implements Serializable { */ /** */ * task id */ */ public class ExecuteTaskResponseCommand implements Serializable { public class ExecuteTaskResponseCommand implements Serializable { concurrent.atomic.AtomicLong; public class ExecuteTaskResponseCommand implements Serializable { } public class ExecuteTaskResponseCommand implements Serializable { /** public class ExecuteTaskResponseCommand implements Serializable { * execute task response command */ private int taskInstanceId; */ * execute task response command /** public class ExecuteTaskResponseCommand implements Serializable { /** */ public class ExecuteTaskResponseCommand implements Serializable { * task id */ * execute task response command */ * execute task response command public class ExecuteTaskResponseCommand implements Serializable { /** * end time public class ExecuteTaskResponseCommand implements Serializable { */ /** public int getTaskInstanceId() { /** public class ExecuteTaskResponseCommand implements Serializable { /** /** public class ExecuteTaskResponseCommand implements Serializable { } /** * execute task response command /** */ } /** public class ExecuteTaskResponseCommand implements Serializable { /** /** } /** * task id /** */ public class ExecuteTaskResponseCommand implements Serializable { */ * execute task response command public void setEndTime(Date endTime) { this.endTime = endTime; } */ * execute task response command * task id * task id /** * task id * execute task response command * task id */ * task id public class ExecuteTaskResponseCommand implements Serializable { * task id /** public class ExecuteTaskResponseCommand implements Serializable { }

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java

@ -50,4 +50,8 @@ public class Pair<L, R> {
public void setRight(R right) {
this.right = right;
}
public static <L, R> Pair of(L left, R right){
return new Pair(left, right);
}
}

49
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
public class CallbackChannel {
private Channel channel;
private long opaque;
public CallbackChannel(Channel channel, long opaque) {
this.channel = channel;
this.opaque = opaque;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public long getOpaque() {
return opaque;
}
public void setOpaque(long opaque) {
this.opaque = opaque;
}
}

76
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import java.util.concurrent.ConcurrentHashMap;
public class TaskInstanceCallbackService {
private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){
CALL_BACK_CHANNELS.put(taskInstanceId, channel);
}
public CallbackChannel getCallbackChannel(int taskInstanceId){
CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId);
if(callbackChannel.getChannel().isActive()){
return callbackChannel;
}
Channel newChannel = createChannel();
callbackChannel.setChannel(newChannel);
CALL_BACK_CHANNELS.put(taskInstanceId, callbackChannel);
return callbackChannel;
}
public void remove(int taskInstanceId){
CALL_BACK_CHANNELS.remove(taskInstanceId);
}
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
}
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
remove(taskInstanceId);
return;
}
}
});
}
//TODO
private Channel createChannel(){
return null;
}
}

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java

@ -51,8 +51,11 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
private final WorkerConfig workerConfig;
private final TaskInstanceCallbackService taskInstanceCallbackService;
public WorkerNettyRequestProcessor(ProcessService processService){
this.processService = processService;
this.taskInstanceCallbackService = new TaskInstanceCallbackService();
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
}
@ -62,6 +65,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
logger.debug("received command : {}", command);
TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class);
//TODO 需要干掉,然后移到master里面。
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
@ -73,6 +77,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
//TODO 到这里。
// local execute path
String execLocalPath = getExecLocalPath(taskInstance);
logger.info("task instance local execute path : {} ", execLocalPath);
@ -84,7 +89,8 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque()));
workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService));
}
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {

73
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -24,17 +24,21 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.server.worker.processor.TaskInstanceCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@ -73,23 +77,32 @@ public class TaskScheduleThread implements Runnable {
*/
private AbstractTask task;
/**
* task instance callback service
*/
private TaskInstanceCallbackService taskInstanceCallbackService;
/**
* constructor
*
* @param taskInstance task instance
* @param processService process dao
*/
public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService){
public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskInstanceCallbackService taskInstanceCallbackService){
this.processService = processService;
this.taskInstance = taskInstance;
this.taskInstanceCallbackService = taskInstanceCallbackService;
}
@Override
public void run() {
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
try {
// update task state is running according to task type
updateTaskState(taskInstance.getTaskType());
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType());
taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
logger.info("script path : {}", taskInstance.getExecutePath());
// task node
@ -148,22 +161,21 @@ public class TaskScheduleThread implements Runnable {
// task result process
task.after();
//
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus());
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
// update task instance state
processService.changeTaskState(ExecutionStatus.FAILURE,
new Date(),
taskInstance.getId());
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
}
logger.info("task instance id : {},task final status : {}",
taskInstance.getId(),
task.getExitStatus());
// update task instance state
processService.changeTaskState(task.getExitStatus(),
new Date(),
taskInstance.getId());
}
/**
@ -182,29 +194,22 @@ public class TaskScheduleThread implements Runnable {
}
return globalParamsMap;
}
/**
* update task state according to task type
* build ack command
* @param taskType
*/
private void updateTaskState(String taskType) {
// update task status is running
if(taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name())){
processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
getTaskLogPath(),
taskInstance.getId());
private ExecuteTaskAckCommand buildAckCommand(String taskType) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath());
ackCommand.setHost("localhost");
ackCommand.setStartTime(new Date());
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
taskInstance.getExecutePath(),
getTaskLogPath(),
taskInstance.getId());
ackCommand.setExecutePath(taskInstance.getExecutePath());
}
return ackCommand;
}
/**

Loading…
Cancel
Save