Browse Source
* [Feature-4355][Master-Worker-API] improvements of master and scheduler module (#6085) * master refactor: 1. spi for task submit and other actions(pause, kill) 2. remove threads for process instance and task instance. 3. add events for process instance and task instance * ut npe * add try catch * code style * fix critical bugs * fix critical bugs * fix critical bugs * fix critical bugs2.0.7-release
OS
3 years ago
committed by
GitHub
84 changed files with 3886 additions and 1753 deletions
@ -0,0 +1,111 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.enums; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* state event |
||||
*/ |
||||
public class StateEvent { |
||||
|
||||
/** |
||||
* origin_pid-origin_task_id-process_instance_id-task_instance_id |
||||
*/ |
||||
private String key; |
||||
|
||||
private StateEventType type; |
||||
|
||||
private ExecutionStatus executionStatus; |
||||
|
||||
private int taskInstanceId; |
||||
|
||||
private int processInstanceId; |
||||
|
||||
private String context; |
||||
|
||||
private Channel channel; |
||||
|
||||
public ExecutionStatus getExecutionStatus() { |
||||
return executionStatus; |
||||
} |
||||
|
||||
public void setExecutionStatus(ExecutionStatus executionStatus) { |
||||
this.executionStatus = executionStatus; |
||||
} |
||||
|
||||
public int getTaskInstanceId() { |
||||
return taskInstanceId; |
||||
} |
||||
|
||||
public int getProcessInstanceId() { |
||||
return processInstanceId; |
||||
} |
||||
|
||||
public void setProcessInstanceId(int processInstanceId) { |
||||
this.processInstanceId = processInstanceId; |
||||
} |
||||
|
||||
public String getContext() { |
||||
return context; |
||||
} |
||||
|
||||
public void setContext(String context) { |
||||
this.context = context; |
||||
} |
||||
|
||||
public void setTaskInstanceId(int taskInstanceId) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
} |
||||
|
||||
public Channel getChannel() { |
||||
return channel; |
||||
} |
||||
|
||||
public void setChannel(Channel channel) { |
||||
this.channel = channel; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "State Event :" |
||||
+ "key: " + key |
||||
+ " type: " + type.toString() |
||||
+ " executeStatus: " + executionStatus |
||||
+ " task instance id: " + taskInstanceId |
||||
+ " process instance id: " + processInstanceId |
||||
+ " context: " + context |
||||
; |
||||
} |
||||
|
||||
public String getKey() { |
||||
return key; |
||||
} |
||||
|
||||
public void setKey(String key) { |
||||
this.key = key; |
||||
} |
||||
|
||||
public void setType(StateEventType type) { |
||||
this.type = type; |
||||
} |
||||
|
||||
public StateEventType getType() { |
||||
return this.type; |
||||
} |
||||
} |
@ -0,0 +1,45 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.enums; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.EnumValue; |
||||
|
||||
public enum StateEventType { |
||||
|
||||
PROCESS_STATE_CHANGE(0, "process statechange"), |
||||
TASK_STATE_CHANGE(1, "task state change"), |
||||
PROCESS_TIMEOUT(2, "process timeout"), |
||||
TASK_TIMEOUT(3, "task timeout"); |
||||
|
||||
StateEventType(int code, String descp) { |
||||
this.code = code; |
||||
this.descp = descp; |
||||
} |
||||
|
||||
@EnumValue |
||||
private final int code; |
||||
private final String descp; |
||||
|
||||
public int getCode() { |
||||
return code; |
||||
} |
||||
|
||||
public String getDescp() { |
||||
return descp; |
||||
} |
||||
} |
@ -0,0 +1,72 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* process host update |
||||
*/ |
||||
public class HostUpdateCommand implements Serializable { |
||||
|
||||
/** |
||||
* task id |
||||
*/ |
||||
private int taskInstanceId; |
||||
|
||||
private String processHost; |
||||
|
||||
public int getTaskInstanceId() { |
||||
return taskInstanceId; |
||||
} |
||||
|
||||
public void setTaskInstanceId(int taskInstanceId) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
} |
||||
|
||||
public String getProcessHost() { |
||||
return processHost; |
||||
} |
||||
|
||||
public void setProcessHost(String processHost) { |
||||
this.processHost = processHost; |
||||
} |
||||
|
||||
/** |
||||
* package request command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST); |
||||
byte[] body = JSONUtils.toJsonByteArray(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "HostUpdateCommand{" |
||||
+ "taskInstanceId=" + taskInstanceId |
||||
+ "host=" + processHost |
||||
+ '}'; |
||||
} |
||||
} |
@ -0,0 +1,83 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
public class HostUpdateResponseCommand implements Serializable { |
||||
|
||||
private int taskInstanceId; |
||||
|
||||
private String processHost; |
||||
|
||||
private int status; |
||||
|
||||
public HostUpdateResponseCommand(int taskInstanceId, String processHost, int code) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
this.processHost = processHost; |
||||
this.status = code; |
||||
} |
||||
|
||||
public int getTaskInstanceId() { |
||||
return this.taskInstanceId; |
||||
} |
||||
|
||||
public void setTaskInstanceId(int taskInstanceId) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
} |
||||
|
||||
public String getProcessHost() { |
||||
return this.processHost; |
||||
} |
||||
|
||||
public void setProcessHost(String processHost) { |
||||
this.processHost = processHost; |
||||
} |
||||
|
||||
public int getStatus() { |
||||
return status; |
||||
} |
||||
|
||||
public void setStatus(int status) { |
||||
this.status = status; |
||||
} |
||||
|
||||
/** |
||||
* package request command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST); |
||||
byte[] body = JSONUtils.toJsonByteArray(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "HostUpdateResponseCommand{" |
||||
+ "taskInstanceId=" + taskInstanceId |
||||
+ "host=" + processHost |
||||
+ '}'; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,131 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* db task final result response command |
||||
*/ |
||||
public class StateEventChangeCommand implements Serializable { |
||||
|
||||
private String key; |
||||
|
||||
private ExecutionStatus sourceStatus; |
||||
|
||||
private int sourceProcessInstanceId; |
||||
|
||||
private int sourceTaskInstanceId; |
||||
|
||||
private int destProcessInstanceId; |
||||
|
||||
private int destTaskInstanceId; |
||||
|
||||
public StateEventChangeCommand() { |
||||
super(); |
||||
} |
||||
|
||||
public StateEventChangeCommand(int sourceProcessInstanceId, int sourceTaskInstanceId, |
||||
ExecutionStatus sourceStatus, |
||||
int destProcessInstanceId, |
||||
int destTaskInstanceId |
||||
) { |
||||
this.key = String.format("%d-%d-%d-%d", |
||||
sourceProcessInstanceId, |
||||
sourceTaskInstanceId, |
||||
destProcessInstanceId, |
||||
destTaskInstanceId); |
||||
|
||||
this.sourceStatus = sourceStatus; |
||||
this.sourceProcessInstanceId = sourceProcessInstanceId; |
||||
this.sourceTaskInstanceId = sourceTaskInstanceId; |
||||
this.destProcessInstanceId = destProcessInstanceId; |
||||
this.destTaskInstanceId = destTaskInstanceId; |
||||
} |
||||
|
||||
public String getKey() { |
||||
return key; |
||||
} |
||||
|
||||
public void setKey(String key) { |
||||
this.key = key; |
||||
} |
||||
|
||||
/** |
||||
* package response command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.STATE_EVENT_REQUEST); |
||||
byte[] body = JSONUtils.toJsonByteArray(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "StateEventResponseCommand{" |
||||
+ "key=" + key |
||||
+ '}'; |
||||
} |
||||
|
||||
public ExecutionStatus getSourceStatus() { |
||||
return sourceStatus; |
||||
} |
||||
|
||||
public void setSourceStatus(ExecutionStatus sourceStatus) { |
||||
this.sourceStatus = sourceStatus; |
||||
} |
||||
|
||||
public int getSourceProcessInstanceId() { |
||||
return sourceProcessInstanceId; |
||||
} |
||||
|
||||
public void setSourceProcessInstanceId(int sourceProcessInstanceId) { |
||||
this.sourceProcessInstanceId = sourceProcessInstanceId; |
||||
} |
||||
|
||||
public int getSourceTaskInstanceId() { |
||||
return sourceTaskInstanceId; |
||||
} |
||||
|
||||
public void setSourceTaskInstanceId(int sourceTaskInstanceId) { |
||||
this.sourceTaskInstanceId = sourceTaskInstanceId; |
||||
} |
||||
|
||||
public int getDestProcessInstanceId() { |
||||
return destProcessInstanceId; |
||||
} |
||||
|
||||
public void setDestProcessInstanceId(int destProcessInstanceId) { |
||||
this.destProcessInstanceId = destProcessInstanceId; |
||||
} |
||||
|
||||
public int getDestTaskInstanceId() { |
||||
return destTaskInstanceId; |
||||
} |
||||
|
||||
public void setDestTaskInstanceId(int destTaskInstanceId) { |
||||
this.destTaskInstanceId = destTaskInstanceId; |
||||
} |
||||
} |
@ -0,0 +1,78 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* db task final result response command |
||||
*/ |
||||
public class StateEventResponseCommand implements Serializable { |
||||
|
||||
private String key; |
||||
private int status; |
||||
|
||||
public StateEventResponseCommand() { |
||||
super(); |
||||
} |
||||
|
||||
public StateEventResponseCommand(int status, String key) { |
||||
this.status = status; |
||||
this.key = key; |
||||
} |
||||
|
||||
public int getStatus() { |
||||
return status; |
||||
} |
||||
|
||||
public void setStatus(int status) { |
||||
this.status = status; |
||||
} |
||||
|
||||
public String getKey() { |
||||
return key; |
||||
} |
||||
|
||||
public void setKey(String key) { |
||||
this.key = key; |
||||
} |
||||
|
||||
/** |
||||
* package response command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.DB_TASK_RESPONSE); |
||||
byte[] body = JSONUtils.toJsonByteArray(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "StateEventResponseCommand{" |
||||
+ "key=" + key |
||||
+ ", status=" + status |
||||
+ '}'; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,125 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.processor; |
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; |
||||
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; |
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* task callback service |
||||
*/ |
||||
@Service |
||||
public class StateEventCallbackService { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(StateEventCallbackService.class); |
||||
private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200}; |
||||
|
||||
/** |
||||
* remote channels |
||||
*/ |
||||
private static final ConcurrentHashMap<String, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>(); |
||||
|
||||
/** |
||||
* netty remoting client |
||||
*/ |
||||
private final NettyRemotingClient nettyRemotingClient; |
||||
|
||||
public StateEventCallbackService() { |
||||
final NettyClientConfig clientConfig = new NettyClientConfig(); |
||||
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); |
||||
} |
||||
|
||||
/** |
||||
* add callback channel |
||||
* |
||||
* @param channel channel |
||||
*/ |
||||
public void addRemoteChannel(String host, NettyRemoteChannel channel) { |
||||
REMOTE_CHANNELS.put(host, channel); |
||||
} |
||||
|
||||
/** |
||||
* get callback channel |
||||
* |
||||
* @param host |
||||
* @return callback channel |
||||
*/ |
||||
private NettyRemoteChannel newRemoteChannel(Host host) { |
||||
Channel newChannel; |
||||
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host.getAddress()); |
||||
if (nettyRemoteChannel != null) { |
||||
if (nettyRemoteChannel.isActive()) { |
||||
return nettyRemoteChannel; |
||||
} |
||||
} |
||||
newChannel = nettyRemotingClient.getChannel(host); |
||||
if (newChannel != null) { |
||||
return newRemoteChannel(newChannel, host.getAddress()); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
public int pause(int ntries) { |
||||
return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length]; |
||||
} |
||||
|
||||
private NettyRemoteChannel newRemoteChannel(Channel newChannel, long opaque, String host) { |
||||
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque); |
||||
addRemoteChannel(host, remoteChannel); |
||||
return remoteChannel; |
||||
} |
||||
|
||||
private NettyRemoteChannel newRemoteChannel(Channel newChannel, String host) { |
||||
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel); |
||||
addRemoteChannel(host, remoteChannel); |
||||
return remoteChannel; |
||||
} |
||||
|
||||
/** |
||||
* remove callback channels |
||||
*/ |
||||
public void remove(String host) { |
||||
REMOTE_CHANNELS.remove(host); |
||||
} |
||||
|
||||
/** |
||||
* send result |
||||
* |
||||
* @param command command |
||||
*/ |
||||
public void sendResult(String address, int port, Command command) { |
||||
logger.info("send result, host:{}, command:{}", address, command.toString()); |
||||
Host host = new Host(address, port); |
||||
NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host); |
||||
if (nettyRemoteChannel != null) { |
||||
nettyRemoteChannel.writeAndFlush(command); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,42 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.processor; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
public class HostUpdateResponseProcessor implements NettyRequestProcessor { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(HostUpdateResponseProcessor.class); |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); |
||||
|
||||
HostUpdateResponseProcessor responseCommand = JSONUtils.parseObject(command.getBody(), HostUpdateResponseProcessor.class); |
||||
logger.info("received process host response command : {}", responseCommand); |
||||
} |
||||
} |
@ -0,0 +1,74 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.processor; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.StateEvent; |
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* handle state event received from master/api |
||||
*/ |
||||
public class StateEventProcessor implements NettyRequestProcessor { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(StateEventProcessor.class); |
||||
|
||||
private StateEventResponseService stateEventResponseService; |
||||
|
||||
public StateEventProcessor() { |
||||
stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class); |
||||
} |
||||
|
||||
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) { |
||||
this.stateEventResponseService.init(processInstanceExecMaps); |
||||
} |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType())); |
||||
|
||||
StateEventChangeCommand stateEventChangeCommand = JSONUtils.parseObject(command.getBody(), StateEventChangeCommand.class); |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); |
||||
stateEvent.setKey(stateEventChangeCommand.getKey()); |
||||
stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId()); |
||||
StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE; |
||||
stateEvent.setType(type); |
||||
|
||||
logger.info("received command : {}", stateEvent.toString()); |
||||
stateEventResponseService.addResponse(stateEvent); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,149 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.processor.queue; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.StateEvent; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
||||
import javax.annotation.PostConstruct; |
||||
import javax.annotation.PreDestroy; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* task manager |
||||
*/ |
||||
@Component |
||||
public class StateEventResponseService { |
||||
|
||||
/** |
||||
* logger |
||||
*/ |
||||
private final Logger logger = LoggerFactory.getLogger(StateEventResponseService.class); |
||||
|
||||
/** |
||||
* attemptQueue |
||||
*/ |
||||
private final BlockingQueue<StateEvent> eventQueue = new LinkedBlockingQueue<>(5000); |
||||
|
||||
/** |
||||
* task response worker |
||||
*/ |
||||
private Thread responseWorker; |
||||
|
||||
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper; |
||||
|
||||
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) { |
||||
if (this.processInstanceMapper == null) { |
||||
this.processInstanceMapper = processInstanceMapper; |
||||
} |
||||
} |
||||
|
||||
@PostConstruct |
||||
public void start() { |
||||
this.responseWorker = new StateEventResponseWorker(); |
||||
this.responseWorker.setName("StateEventResponseWorker"); |
||||
this.responseWorker.start(); |
||||
} |
||||
|
||||
@PreDestroy |
||||
public void stop() { |
||||
this.responseWorker.interrupt(); |
||||
if (!eventQueue.isEmpty()) { |
||||
List<StateEvent> remainEvents = new ArrayList<>(eventQueue.size()); |
||||
eventQueue.drainTo(remainEvents); |
||||
for (StateEvent event : remainEvents) { |
||||
this.persist(event); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* put task to attemptQueue |
||||
*/ |
||||
public void addResponse(StateEvent stateEvent) { |
||||
try { |
||||
eventQueue.put(stateEvent); |
||||
} catch (InterruptedException e) { |
||||
logger.error("put state event : {} error :{}", stateEvent, e); |
||||
Thread.currentThread().interrupt(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* task worker thread |
||||
*/ |
||||
class StateEventResponseWorker extends Thread { |
||||
|
||||
@Override |
||||
public void run() { |
||||
|
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
// if not task , blocking here
|
||||
StateEvent stateEvent = eventQueue.take(); |
||||
persist(stateEvent); |
||||
} catch (InterruptedException e) { |
||||
logger.warn("persist task error", e); |
||||
Thread.currentThread().interrupt(); |
||||
} |
||||
} |
||||
logger.info("StateEventResponseWorker stopped"); |
||||
} |
||||
} |
||||
|
||||
private void writeResponse(StateEvent stateEvent, ExecutionStatus status) { |
||||
Channel channel = stateEvent.getChannel(); |
||||
if (channel != null) { |
||||
StateEventResponseCommand command = new StateEventResponseCommand(status.getCode(), stateEvent.getKey()); |
||||
channel.writeAndFlush(command.convert2Command()); |
||||
} |
||||
} |
||||
|
||||
private void persist(StateEvent stateEvent) { |
||||
try { |
||||
if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) { |
||||
writeResponse(stateEvent, ExecutionStatus.FAILURE); |
||||
return; |
||||
} |
||||
|
||||
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId()); |
||||
workflowExecuteThread.addStateEvent(stateEvent); |
||||
writeResponse(stateEvent, ExecutionStatus.SUCCESS); |
||||
} catch (Exception e) { |
||||
logger.error("persist event queue error:", stateEvent.toString(), e); |
||||
} |
||||
} |
||||
|
||||
public BlockingQueue<StateEvent> getEventQueue() { |
||||
return eventQueue; |
||||
} |
||||
} |
@ -0,0 +1,195 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.StateEvent; |
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
||||
import org.apache.dolphinscheduler.common.utils.NetUtils; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; |
||||
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; |
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.ExecutorService; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import com.google.common.util.concurrent.FutureCallback; |
||||
import com.google.common.util.concurrent.Futures; |
||||
import com.google.common.util.concurrent.ListenableFuture; |
||||
import com.google.common.util.concurrent.ListeningExecutorService; |
||||
import com.google.common.util.concurrent.MoreExecutors; |
||||
|
||||
@Service |
||||
public class EventExecuteService extends Thread { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class); |
||||
|
||||
|
||||
/** |
||||
* dolphinscheduler database interface
|
||||
*/ |
||||
@Autowired |
||||
private ProcessService processService; |
||||
|
||||
@Autowired |
||||
private MasterConfig masterConfig; |
||||
|
||||
private ExecutorService eventExecService; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
private StateEventCallbackService stateEventCallbackService; |
||||
|
||||
|
||||
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps; |
||||
private ConcurrentHashMap<String, WorkflowExecuteThread> eventHandlerMap = new ConcurrentHashMap(); |
||||
ListeningExecutorService listeningExecutorService; |
||||
|
||||
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) { |
||||
|
||||
eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getMasterExecThreads()); |
||||
|
||||
this.processInstanceExecMaps = processInstanceExecMaps; |
||||
|
||||
listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService); |
||||
this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public synchronized void start() { |
||||
super.setName("EventServiceStarted"); |
||||
super.start(); |
||||
} |
||||
|
||||
public void close() { |
||||
eventExecService.shutdown(); |
||||
logger.info("event service stopped..."); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
logger.info("Event service started"); |
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
eventHandler(); |
||||
|
||||
} catch (Exception e) { |
||||
logger.error("Event service thread error", e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void eventHandler() { |
||||
for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecMaps.values()) { |
||||
if (workflowExecuteThread.eventSize() == 0 |
||||
|| StringUtils.isEmpty(workflowExecuteThread.getKey()) |
||||
|| eventHandlerMap.containsKey(workflowExecuteThread.getKey())) { |
||||
continue; |
||||
} |
||||
int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); |
||||
logger.info("handle process instance : {} events, count:{}", |
||||
processInstanceId, |
||||
workflowExecuteThread.eventSize()); |
||||
logger.info("already exists handler process size:{}", this.eventHandlerMap.size()); |
||||
eventHandlerMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); |
||||
ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread); |
||||
FutureCallback futureCallback = new FutureCallback() { |
||||
@Override |
||||
public void onSuccess(Object o) { |
||||
if (workflowExecuteThread.workFlowFinish()) { |
||||
processInstanceExecMaps.remove(processInstanceId); |
||||
notifyProcessChanged(); |
||||
logger.info("process instance {} finished.", processInstanceId); |
||||
} |
||||
if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) { |
||||
processInstanceExecMaps.remove(processInstanceId); |
||||
processInstanceExecMaps.put(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread); |
||||
|
||||
} |
||||
eventHandlerMap.remove(workflowExecuteThread.getKey()); |
||||
} |
||||
|
||||
private void notifyProcessChanged() { |
||||
Map<ProcessInstance, TaskInstance> fatherMaps |
||||
= processService.notifyProcessList(processInstanceId, 0); |
||||
|
||||
for (ProcessInstance processInstance : fatherMaps.keySet()) { |
||||
String address = NetUtils.getAddr(masterConfig.getListenPort()); |
||||
if (processInstance.getHost().equalsIgnoreCase(address)) { |
||||
notifyMyself(processInstance, fatherMaps.get(processInstance)); |
||||
} else { |
||||
notifyProcess(processInstance, fatherMaps.get(processInstance)); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) { |
||||
logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId()); |
||||
if (!processInstanceExecMaps.containsKey(processInstance.getId())) { |
||||
return; |
||||
} |
||||
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setTaskInstanceId(taskInstance.getId()); |
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); |
||||
stateEvent.setProcessInstanceId(processInstance.getId()); |
||||
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); |
||||
workflowExecuteThreadNotify.addStateEvent(stateEvent); |
||||
} |
||||
|
||||
private void notifyProcess(ProcessInstance processInstance, TaskInstance taskInstance) { |
||||
String host = processInstance.getHost(); |
||||
if (StringUtils.isEmpty(host)) { |
||||
logger.info("process {} host is empty, cannot notify task {} now.", |
||||
processInstance.getId(), taskInstance.getId()); |
||||
return; |
||||
} |
||||
String address = host.split(":")[0]; |
||||
int port = Integer.parseInt(host.split(":")[1]); |
||||
logger.info("notify process {} task {} state change, host:{}", |
||||
processInstance.getId(), taskInstance.getId(), host); |
||||
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( |
||||
processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId() |
||||
); |
||||
|
||||
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); |
||||
} |
||||
|
||||
@Override |
||||
public void onFailure(Throwable throwable) { |
||||
} |
||||
}; |
||||
Futures.addCallback(future, futureCallback, this.listeningExecutorService); |
||||
} |
||||
} |
||||
} |
@ -1,337 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; |
||||
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
||||
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils; |
||||
import org.apache.dolphinscheduler.dao.AlertDao; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; |
||||
|
||||
import java.util.Date; |
||||
import java.util.concurrent.Callable; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* master task exec base class
|
||||
*/ |
||||
public class MasterBaseTaskExecThread implements Callable<Boolean> { |
||||
|
||||
/** |
||||
* logger of MasterBaseTaskExecThread |
||||
*/ |
||||
protected Logger logger = LoggerFactory.getLogger(getClass()); |
||||
|
||||
|
||||
/** |
||||
* process service |
||||
*/ |
||||
protected ProcessService processService; |
||||
|
||||
/** |
||||
* alert database access |
||||
*/ |
||||
protected AlertDao alertDao; |
||||
|
||||
/** |
||||
* process instance |
||||
*/ |
||||
protected ProcessInstance processInstance; |
||||
|
||||
/** |
||||
* task instance |
||||
*/ |
||||
protected TaskInstance taskInstance; |
||||
|
||||
/** |
||||
* whether need cancel |
||||
*/ |
||||
protected boolean cancel; |
||||
|
||||
/** |
||||
* master config |
||||
*/ |
||||
protected MasterConfig masterConfig; |
||||
|
||||
/** |
||||
* taskUpdateQueue |
||||
*/ |
||||
private TaskPriorityQueue taskUpdateQueue; |
||||
|
||||
/** |
||||
* whether need check task time out. |
||||
*/ |
||||
protected boolean checkTimeoutFlag = false; |
||||
|
||||
/** |
||||
* task timeout parameters |
||||
*/ |
||||
protected TaskTimeoutParameter taskTimeoutParameter; |
||||
|
||||
/** |
||||
* constructor of MasterBaseTaskExecThread |
||||
* |
||||
* @param taskInstance task instance |
||||
*/ |
||||
public MasterBaseTaskExecThread(TaskInstance taskInstance) { |
||||
this.processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
this.alertDao = SpringApplicationContext.getBean(AlertDao.class); |
||||
this.cancel = false; |
||||
this.taskInstance = taskInstance; |
||||
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); |
||||
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class); |
||||
initTaskParams(); |
||||
} |
||||
|
||||
/** |
||||
* init task ordinary parameters |
||||
*/ |
||||
private void initTaskParams() { |
||||
initTimeoutParams(); |
||||
} |
||||
|
||||
/** |
||||
* init task timeout parameters |
||||
*/ |
||||
private void initTimeoutParams() { |
||||
TaskDefinition taskDefinition = processService.findTaskDefinition(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); |
||||
boolean timeoutEnable = taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN; |
||||
taskTimeoutParameter = new TaskTimeoutParameter(timeoutEnable, |
||||
taskDefinition.getTimeoutNotifyStrategy(), |
||||
taskDefinition.getTimeout()); |
||||
if (taskTimeoutParameter.getEnable()) { |
||||
checkTimeoutFlag = true; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get task instance |
||||
* |
||||
* @return TaskInstance |
||||
*/ |
||||
public TaskInstance getTaskInstance() { |
||||
return this.taskInstance; |
||||
} |
||||
|
||||
/** |
||||
* kill master base task exec thread |
||||
*/ |
||||
public void kill() { |
||||
this.cancel = true; |
||||
} |
||||
|
||||
/** |
||||
* submit master base task exec thread |
||||
* |
||||
* @return TaskInstance |
||||
*/ |
||||
protected TaskInstance submit() { |
||||
Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes(); |
||||
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); |
||||
|
||||
int retryTimes = 1; |
||||
boolean submitDB = false; |
||||
boolean submitTask = false; |
||||
TaskInstance task = null; |
||||
while (retryTimes <= commitRetryTimes) { |
||||
try { |
||||
if (!submitDB) { |
||||
// submit task to db
|
||||
task = processService.submitTask(taskInstance); |
||||
if (task != null && task.getId() != 0) { |
||||
submitDB = true; |
||||
} |
||||
} |
||||
if (submitDB && !submitTask) { |
||||
// dispatch task
|
||||
submitTask = dispatchTask(task); |
||||
} |
||||
if (submitDB && submitTask) { |
||||
return task; |
||||
} |
||||
if (!submitDB) { |
||||
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); |
||||
} else if (!submitTask) { |
||||
logger.error("task commit failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes); |
||||
} |
||||
Thread.sleep(commitRetryInterval); |
||||
} catch (Exception e) { |
||||
logger.error("task commit to mysql and dispatcht task failed", e); |
||||
} |
||||
retryTimes += 1; |
||||
} |
||||
return task; |
||||
} |
||||
|
||||
/** |
||||
* dispatch task |
||||
* |
||||
* @param taskInstance taskInstance |
||||
* @return whether submit task success |
||||
*/ |
||||
public Boolean dispatchTask(TaskInstance taskInstance) { |
||||
|
||||
try { |
||||
if (taskInstance.isConditionsTask() |
||||
|| taskInstance.isDependTask() |
||||
|| taskInstance.isSubProcess() |
||||
|| taskInstance.isSwitchTask() |
||||
) { |
||||
return true; |
||||
} |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); |
||||
return true; |
||||
} |
||||
// task cannot be submitted because its execution state is RUNNING or DELAY.
|
||||
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION |
||||
|| taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) { |
||||
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); |
||||
return true; |
||||
} |
||||
logger.info("task ready to submit: {}", taskInstance); |
||||
|
||||
/** |
||||
* taskPriority |
||||
*/ |
||||
TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(), |
||||
processInstance.getId(), |
||||
taskInstance.getProcessInstancePriority().getCode(), |
||||
taskInstance.getId(), |
||||
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); |
||||
taskUpdateQueue.put(taskPriority); |
||||
logger.info(String.format("master submit success, task : %s", taskInstance.getName())); |
||||
return true; |
||||
} catch (Exception e) { |
||||
logger.error("submit task Exception: ", e); |
||||
logger.error("task error : %s", JSONUtils.toJsonString(taskInstance)); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* buildTaskPriority |
||||
* |
||||
* @param processInstancePriority processInstancePriority |
||||
* @param processInstanceId processInstanceId |
||||
* @param taskInstancePriority taskInstancePriority |
||||
* @param taskInstanceId taskInstanceId |
||||
* @param workerGroup workerGroup |
||||
* @return TaskPriority |
||||
*/ |
||||
private TaskPriority buildTaskPriority(int processInstancePriority, |
||||
int processInstanceId, |
||||
int taskInstancePriority, |
||||
int taskInstanceId, |
||||
String workerGroup) { |
||||
return new TaskPriority(processInstancePriority, processInstanceId, |
||||
taskInstancePriority, taskInstanceId, workerGroup); |
||||
} |
||||
|
||||
/** |
||||
* submit wait complete |
||||
* |
||||
* @return true |
||||
*/ |
||||
protected Boolean submitWaitComplete() { |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* call |
||||
* |
||||
* @return boolean |
||||
*/ |
||||
@Override |
||||
public Boolean call() { |
||||
this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); |
||||
return submitWaitComplete(); |
||||
} |
||||
|
||||
/** |
||||
* alert time out |
||||
*/ |
||||
protected boolean alertTimeout() { |
||||
if (TaskTimeoutStrategy.FAILED == this.taskTimeoutParameter.getStrategy()) { |
||||
return true; |
||||
} |
||||
logger.warn("process id:{} process name:{} task id: {},name:{} execution time out", |
||||
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); |
||||
// send warn mail
|
||||
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(), processInstance.getId(), processInstance.getName(), |
||||
taskInstance.getId(), taskInstance.getName()); |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* handle time out for time out strategy warn&&failed |
||||
*/ |
||||
protected void handleTimeoutFailed() { |
||||
if (TaskTimeoutStrategy.WARN == this.taskTimeoutParameter.getStrategy()) { |
||||
return; |
||||
} |
||||
logger.info("process id:{} name:{} task id:{} name:{} cancel because of timeout.", |
||||
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); |
||||
this.cancel = true; |
||||
} |
||||
|
||||
/** |
||||
* check task remain time valid |
||||
*/ |
||||
protected boolean checkTaskTimeout() { |
||||
if (!checkTimeoutFlag || taskInstance.getStartTime() == null) { |
||||
return false; |
||||
} |
||||
long remainTime = getRemainTime(taskTimeoutParameter.getInterval() * 60L); |
||||
return remainTime <= 0; |
||||
} |
||||
|
||||
/** |
||||
* get remain time |
||||
* |
||||
* @return remain time |
||||
*/ |
||||
protected long getRemainTime(long timeoutSeconds) { |
||||
Date startTime = taskInstance.getStartTime(); |
||||
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000; |
||||
return timeoutSeconds - usedTime; |
||||
} |
||||
|
||||
protected String getThreadName() { |
||||
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, |
||||
processInstance.getProcessDefinitionCode(), |
||||
processInstance.getProcessDefinitionVersion(), |
||||
taskInstance.getProcessInstanceId(), |
||||
taskInstance.getId())); |
||||
return String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); |
||||
} |
||||
} |
@ -1,230 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; |
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.registry.RegistryClient; |
||||
|
||||
import java.util.Date; |
||||
import java.util.Set; |
||||
|
||||
/** |
||||
* master task exec thread |
||||
*/ |
||||
public class MasterTaskExecThread extends MasterBaseTaskExecThread { |
||||
|
||||
/** |
||||
* taskInstance state manager |
||||
*/ |
||||
private TaskInstanceCacheManager taskInstanceCacheManager; |
||||
|
||||
/** |
||||
* netty executor manager |
||||
*/ |
||||
private NettyExecutorManager nettyExecutorManager; |
||||
|
||||
|
||||
/** |
||||
* zookeeper register center |
||||
*/ |
||||
private RegistryClient registryClient; |
||||
|
||||
/** |
||||
* constructor of MasterTaskExecThread |
||||
* |
||||
* @param taskInstance task instance |
||||
*/ |
||||
public MasterTaskExecThread(TaskInstance taskInstance) { |
||||
super(taskInstance); |
||||
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); |
||||
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); |
||||
this.registryClient = RegistryClient.getInstance(); |
||||
} |
||||
|
||||
/** |
||||
* get task instance |
||||
* |
||||
* @return TaskInstance |
||||
*/ |
||||
@Override |
||||
public TaskInstance getTaskInstance() { |
||||
return this.taskInstance; |
||||
} |
||||
|
||||
/** |
||||
* whether already Killed,default false |
||||
*/ |
||||
private boolean alreadyKilled = false; |
||||
|
||||
/** |
||||
* submit task instance and wait complete |
||||
* |
||||
* @return true is task quit is true |
||||
*/ |
||||
@Override |
||||
public Boolean submitWaitComplete() { |
||||
Boolean result = false; |
||||
this.taskInstance = submit(); |
||||
if (this.taskInstance == null) { |
||||
logger.error("submit task instance to mysql and queue failed , please check and fix it"); |
||||
return result; |
||||
} |
||||
if (!this.taskInstance.getState().typeIsFinished()) { |
||||
result = waitTaskQuit(); |
||||
} |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
logger.info("task :{} id:{}, process id:{}, exec thread completed ", |
||||
this.taskInstance.getName(), taskInstance.getId(), processInstance.getId()); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* polling db |
||||
* <p> |
||||
* wait task quit |
||||
* |
||||
* @return true if task quit success |
||||
*/ |
||||
public Boolean waitTaskQuit() { |
||||
// query new state
|
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
logger.info("wait task: process id: {}, task id:{}, task name:{} complete", |
||||
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); |
||||
|
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
if (this.processInstance == null) { |
||||
logger.error("process instance not exists , master task exec thread exit"); |
||||
return true; |
||||
} |
||||
// task instance add queue , waiting worker to kill
|
||||
if (this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP) { |
||||
cancelTaskInstance(); |
||||
} |
||||
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { |
||||
pauseTask(); |
||||
} |
||||
// task instance finished
|
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
// if task is final result , then remove taskInstance from cache
|
||||
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId()); |
||||
break; |
||||
} |
||||
if (checkTaskTimeout()) { |
||||
this.checkTimeoutFlag = !alertTimeout(); |
||||
} |
||||
// updateProcessInstance task instance
|
||||
//issue#5539 Check status of taskInstance from cache
|
||||
taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId()); |
||||
processInstance = processService.findProcessInstanceById(processInstance.getId()); |
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} catch (Exception e) { |
||||
logger.error("exception", e); |
||||
if (processInstance != null) { |
||||
logger.error("wait task quit failed, instance id:{}, task id:{}", |
||||
processInstance.getId(), taskInstance.getId()); |
||||
} |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* pause task if task have not been dispatched to worker, do not dispatch anymore. |
||||
*/ |
||||
public void pauseTask() { |
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
if (taskInstance == null) { |
||||
return; |
||||
} |
||||
if (StringUtils.isBlank(taskInstance.getHost())) { |
||||
taskInstance.setState(ExecutionStatus.PAUSE); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* task instance add queue , waiting worker to kill |
||||
*/ |
||||
private void cancelTaskInstance() throws Exception { |
||||
if (alreadyKilled) { |
||||
return; |
||||
} |
||||
alreadyKilled = true; |
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
if (StringUtils.isBlank(taskInstance.getHost())) { |
||||
taskInstance.setState(ExecutionStatus.KILL); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
return; |
||||
} |
||||
|
||||
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); |
||||
killCommand.setTaskInstanceId(taskInstance.getId()); |
||||
|
||||
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER); |
||||
|
||||
Host host = Host.of(taskInstance.getHost()); |
||||
executionContext.setHost(host); |
||||
|
||||
nettyExecutorManager.executeDirectly(executionContext); |
||||
|
||||
logger.info("master kill taskInstance name :{} taskInstance id:{}", |
||||
taskInstance.getName(), taskInstance.getId()); |
||||
} |
||||
|
||||
/** |
||||
* whether exists valid worker group |
||||
* |
||||
* @param taskInstanceWorkerGroup taskInstanceWorkerGroup |
||||
* @return whether exists |
||||
*/ |
||||
public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup) { |
||||
Set<String> workerGroups = registryClient.getWorkerGroupDirectly(); |
||||
// not worker group
|
||||
if (CollectionUtils.isEmpty(workerGroups)) { |
||||
return false; |
||||
} |
||||
|
||||
// has worker group , but not taskInstance assigned worker group
|
||||
if (!workerGroups.contains(taskInstanceWorkerGroup)) { |
||||
return false; |
||||
} |
||||
Set<String> workers = registryClient.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup); |
||||
if (CollectionUtils.isEmpty(workers)) { |
||||
return false; |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,154 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.StateEvent; |
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import org.apache.hadoop.util.ThreadUtil; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* 1. timeout check wheel |
||||
* 2. dependent task check wheel |
||||
*/ |
||||
public class StateWheelExecuteThread extends Thread { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); |
||||
|
||||
ConcurrentHashMap<Integer, ProcessInstance> processInstanceCheckList; |
||||
ConcurrentHashMap<Integer, TaskInstance> taskInstanceCheckList; |
||||
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps; |
||||
|
||||
private int stateCheckIntervalSecs; |
||||
|
||||
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstances, |
||||
ConcurrentHashMap<Integer, TaskInstance> taskInstances, |
||||
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps, |
||||
int stateCheckIntervalSecs) { |
||||
this.processInstanceCheckList = processInstances; |
||||
this.taskInstanceCheckList = taskInstances; |
||||
this.processInstanceExecMaps = processInstanceExecMaps; |
||||
this.stateCheckIntervalSecs = stateCheckIntervalSecs; |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
|
||||
logger.info("state wheel thread start"); |
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
checkProcess(); |
||||
checkTask(); |
||||
} catch (Exception e) { |
||||
logger.error("state wheel thread check error:", e); |
||||
} |
||||
ThreadUtil.sleepAtLeastIgnoreInterrupts(stateCheckIntervalSecs); |
||||
} |
||||
} |
||||
|
||||
public boolean addProcess(ProcessInstance processInstance) { |
||||
this.processInstanceCheckList.put(processInstance.getId(), processInstance); |
||||
return true; |
||||
} |
||||
|
||||
public boolean addTask(TaskInstance taskInstance) { |
||||
this.taskInstanceCheckList.put(taskInstance.getId(), taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
private void checkTask() { |
||||
if (taskInstanceCheckList.isEmpty()) { |
||||
return; |
||||
} |
||||
|
||||
for (TaskInstance taskInstance : this.taskInstanceCheckList.values()) { |
||||
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { |
||||
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); |
||||
if (0 <= timeRemain && processTimeout(taskInstance)) { |
||||
taskInstanceCheckList.remove(taskInstance.getId()); |
||||
return; |
||||
} |
||||
} |
||||
if (taskInstance.isSubProcess() || taskInstance.isDependTask()) { |
||||
processDependCheck(taskInstance); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void checkProcess() { |
||||
if (processInstanceCheckList.isEmpty()) { |
||||
return; |
||||
} |
||||
for (ProcessInstance processInstance : this.processInstanceCheckList.values()) { |
||||
|
||||
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); |
||||
if (0 <= timeRemain && processTimeout(processInstance)) { |
||||
processInstanceCheckList.remove(processInstance.getId()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void putEvent(StateEvent stateEvent) { |
||||
|
||||
if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) { |
||||
return; |
||||
} |
||||
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId()); |
||||
workflowExecuteThread.addStateEvent(stateEvent); |
||||
} |
||||
|
||||
private boolean processDependCheck(TaskInstance taskInstance) { |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); |
||||
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(taskInstance.getId()); |
||||
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); |
||||
putEvent(stateEvent); |
||||
return true; |
||||
} |
||||
|
||||
private boolean processTimeout(TaskInstance taskInstance) { |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setType(StateEventType.TASK_TIMEOUT); |
||||
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(taskInstance.getId()); |
||||
putEvent(stateEvent); |
||||
return true; |
||||
} |
||||
|
||||
private boolean processTimeout(ProcessInstance processInstance) { |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setType(StateEventType.PROCESS_TIMEOUT); |
||||
stateEvent.setProcessInstanceId(processInstance.getId()); |
||||
putEvent(stateEvent); |
||||
return true; |
||||
} |
||||
|
||||
} |
@ -1,181 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import java.util.Date; |
||||
|
||||
/** |
||||
* subflow task exec thread |
||||
*/ |
||||
public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { |
||||
|
||||
/** |
||||
* sub process instance |
||||
*/ |
||||
private ProcessInstance subProcessInstance; |
||||
|
||||
/** |
||||
* sub process task exec thread |
||||
* @param taskInstance task instance |
||||
*/ |
||||
public SubProcessTaskExecThread(TaskInstance taskInstance){ |
||||
super(taskInstance); |
||||
} |
||||
|
||||
@Override |
||||
public Boolean submitWaitComplete() { |
||||
|
||||
Boolean result = false; |
||||
try{ |
||||
// submit task instance
|
||||
this.taskInstance = submit(); |
||||
|
||||
if(taskInstance == null){ |
||||
logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it"); |
||||
return result; |
||||
} |
||||
setTaskInstanceState(); |
||||
waitTaskQuit(); |
||||
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
|
||||
// at the end of the subflow , the task state is changed to the subflow state
|
||||
if(subProcessInstance != null){ |
||||
if(subProcessInstance.getState() == ExecutionStatus.STOP){ |
||||
this.taskInstance.setState(ExecutionStatus.KILL); |
||||
}else{ |
||||
this.taskInstance.setState(subProcessInstance.getState()); |
||||
} |
||||
} |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ", |
||||
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() ); |
||||
result = true; |
||||
|
||||
}catch (Exception e){ |
||||
logger.error("exception: ",e); |
||||
if (null != taskInstance) { |
||||
logger.error("wait task quit failed, instance id:{}, task id:{}", |
||||
processInstance.getId(), taskInstance.getId()); |
||||
} |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* set task instance state |
||||
* @return |
||||
*/ |
||||
private boolean setTaskInstanceState(){ |
||||
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
if(subProcessInstance == null || taskInstance.getState().typeIsFinished()){ |
||||
return false; |
||||
} |
||||
|
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
||||
taskInstance.setStartTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* updateProcessInstance parent state |
||||
*/ |
||||
private void updateParentProcessState(){ |
||||
ProcessInstance parentProcessInstance = processService.findProcessInstanceById(this.processInstance.getId()); |
||||
|
||||
if(parentProcessInstance == null){ |
||||
logger.error("parent work flow instance is null , please check it! work flow id {}", processInstance.getId()); |
||||
return; |
||||
} |
||||
this.processInstance.setState(parentProcessInstance.getState()); |
||||
} |
||||
|
||||
/** |
||||
* wait task quit |
||||
* @throws InterruptedException |
||||
*/ |
||||
private void waitTaskQuit() throws InterruptedException { |
||||
|
||||
logger.info("wait sub work flow: {} complete", this.taskInstance.getName()); |
||||
|
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}", |
||||
this.taskInstance.getName(), |
||||
this.taskInstance.getState(), |
||||
this.processInstance.getState()); |
||||
return; |
||||
} |
||||
while (Stopper.isRunning()) { |
||||
// waiting for subflow process instance establishment
|
||||
if (subProcessInstance == null) { |
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
if(!setTaskInstanceState()){ |
||||
continue; |
||||
} |
||||
} |
||||
subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId()); |
||||
if (checkTaskTimeout()) { |
||||
this.checkTimeoutFlag = !alertTimeout(); |
||||
handleTimeoutFailed(); |
||||
} |
||||
updateParentProcessState(); |
||||
if (subProcessInstance.getState().typeIsFinished()){ |
||||
break; |
||||
} |
||||
if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){ |
||||
// parent process "ready to pause" , child process "pause"
|
||||
pauseSubProcess(); |
||||
}else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ |
||||
// parent Process "Ready to Cancel" , subflow "Cancel"
|
||||
stopSubProcess(); |
||||
} |
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* stop sub process |
||||
*/ |
||||
private void stopSubProcess() { |
||||
if(subProcessInstance.getState() == ExecutionStatus.STOP || |
||||
subProcessInstance.getState() == ExecutionStatus.READY_STOP){ |
||||
return; |
||||
} |
||||
subProcessInstance.setState(ExecutionStatus.READY_STOP); |
||||
processService.updateProcessInstance(subProcessInstance); |
||||
} |
||||
|
||||
/** |
||||
* pause sub process |
||||
*/ |
||||
private void pauseSubProcess() { |
||||
if(subProcessInstance.getState() == ExecutionStatus.PAUSE || |
||||
subProcessInstance.getState() == ExecutionStatus.READY_PAUSE){ |
||||
return; |
||||
} |
||||
subProcessInstance.setState(ExecutionStatus.READY_PAUSE); |
||||
processService.updateProcessInstance(subProcessInstance); |
||||
} |
||||
} |
@ -0,0 +1,112 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public abstract class BaseTaskProcessor implements ITaskProcessor { |
||||
|
||||
protected Logger logger = LoggerFactory.getLogger(getClass()); |
||||
|
||||
protected boolean killed = false; |
||||
|
||||
protected boolean paused = false; |
||||
|
||||
protected boolean timeout = false; |
||||
|
||||
protected TaskInstance taskInstance = null; |
||||
|
||||
protected ProcessInstance processInstance; |
||||
|
||||
/** |
||||
* pause task, common tasks donot need this. |
||||
* |
||||
* @return |
||||
*/ |
||||
protected abstract boolean pauseTask(); |
||||
|
||||
/** |
||||
* kill task, all tasks need to realize this function |
||||
* |
||||
* @return |
||||
*/ |
||||
protected abstract boolean killTask(); |
||||
|
||||
/** |
||||
* task timeout process |
||||
* @return |
||||
*/ |
||||
protected abstract boolean taskTimeout(); |
||||
|
||||
@Override |
||||
public void run() { |
||||
} |
||||
|
||||
@Override |
||||
public boolean action(TaskAction taskAction) { |
||||
|
||||
switch (taskAction) { |
||||
case STOP: |
||||
return stop(); |
||||
case PAUSE: |
||||
return pause(); |
||||
case TIMEOUT: |
||||
return timeout(); |
||||
default: |
||||
logger.error("unknown task action: {}", taskAction.toString()); |
||||
|
||||
} |
||||
return false; |
||||
} |
||||
|
||||
protected boolean timeout() { |
||||
if (timeout) { |
||||
return true; |
||||
} |
||||
timeout = taskTimeout(); |
||||
return timeout; |
||||
} |
||||
|
||||
/** |
||||
* @return |
||||
*/ |
||||
protected boolean pause() { |
||||
if (paused) { |
||||
return true; |
||||
} |
||||
paused = pauseTask(); |
||||
return paused; |
||||
} |
||||
|
||||
protected boolean stop() { |
||||
if (killed) { |
||||
return true; |
||||
} |
||||
killed = killTask(); |
||||
return killed; |
||||
} |
||||
|
||||
@Override |
||||
public String getType() { |
||||
return null; |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
|
||||
public class CommonTaskProcessFactory implements ITaskProcessFactory { |
||||
@Override |
||||
public String type() { |
||||
return Constants.COMMON_TASK_TYPE; |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new CommonTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,179 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; |
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; |
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; |
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; |
||||
|
||||
import org.apache.logging.log4j.util.Strings; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
||||
/** |
||||
* common task processor |
||||
*/ |
||||
public class CommonTaskProcessor extends BaseTaskProcessor { |
||||
|
||||
@Autowired |
||||
private TaskPriorityQueue taskUpdateQueue; |
||||
|
||||
@Autowired |
||||
MasterConfig masterConfig; |
||||
|
||||
@Autowired |
||||
NettyExecutorManager nettyExecutorManager; |
||||
|
||||
/** |
||||
* logger of MasterBaseTaskExecThread |
||||
*/ |
||||
protected Logger logger = LoggerFactory.getLogger(getClass()); |
||||
|
||||
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
|
||||
@Override |
||||
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) { |
||||
this.processInstance = processInstance; |
||||
this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval); |
||||
|
||||
if (this.taskInstance == null) { |
||||
return false; |
||||
} |
||||
dispatchTask(taskInstance, processInstance); |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public ExecutionStatus taskState() { |
||||
return this.taskInstance.getState(); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
} |
||||
|
||||
@Override |
||||
protected boolean taskTimeout() { |
||||
return true; |
||||
} |
||||
|
||||
/** |
||||
* common task cannot be paused |
||||
* |
||||
* @return |
||||
*/ |
||||
@Override |
||||
protected boolean pauseTask() { |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public String getType() { |
||||
return Constants.COMMON_TASK_TYPE; |
||||
} |
||||
|
||||
private boolean dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance) { |
||||
|
||||
try { |
||||
if (taskUpdateQueue == null) { |
||||
this.initQueue(); |
||||
} |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); |
||||
return true; |
||||
} |
||||
// task cannot be submitted because its execution state is RUNNING or DELAY.
|
||||
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION |
||||
|| taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) { |
||||
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); |
||||
return true; |
||||
} |
||||
logger.info("task ready to submit: {}", taskInstance); |
||||
|
||||
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(), |
||||
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), |
||||
taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); |
||||
taskUpdateQueue.put(taskPriority); |
||||
logger.info(String.format("master submit success, task : %s", taskInstance.getName())); |
||||
return true; |
||||
} catch (Exception e) { |
||||
logger.error("submit task Exception: ", e); |
||||
logger.error("task error : %s", JSONUtils.toJsonString(taskInstance)); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
public void initQueue() { |
||||
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class); |
||||
} |
||||
|
||||
@Override |
||||
public boolean killTask() { |
||||
|
||||
try { |
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); |
||||
if (taskInstance == null) { |
||||
return true; |
||||
} |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
return true; |
||||
} |
||||
if (Strings.isBlank(taskInstance.getHost())) { |
||||
taskInstance.setState(ExecutionStatus.KILL); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
return true; |
||||
} |
||||
|
||||
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); |
||||
killCommand.setTaskInstanceId(taskInstance.getId()); |
||||
|
||||
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER); |
||||
|
||||
Host host = Host.of(taskInstance.getHost()); |
||||
executionContext.setHost(host); |
||||
|
||||
nettyExecutorManager.executeDirectly(executionContext); |
||||
} catch (ExecuteException e) { |
||||
logger.error("kill task error:", e); |
||||
return false; |
||||
} |
||||
|
||||
logger.info("master kill taskInstance name :{} taskInstance id:{}", |
||||
taskInstance.getName(), taskInstance.getId()); |
||||
return true; |
||||
} |
||||
} |
@ -0,0 +1,32 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
|
||||
public class ConditionTaskProcessFactory implements ITaskProcessFactory { |
||||
@Override |
||||
public String type() { |
||||
return TaskType.CONDITIONS.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new ConditionTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
|
||||
public class DependentTaskProcessFactory implements ITaskProcessFactory { |
||||
|
||||
@Override |
||||
public String type() { |
||||
return TaskType.DEPENDENT.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new DependentTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,25 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
public interface ITaskProcessFactory { |
||||
|
||||
String type(); |
||||
|
||||
ITaskProcessor create(); |
||||
} |
@ -0,0 +1,39 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
/** |
||||
* interface of task processor in master |
||||
*/ |
||||
public interface ITaskProcessor { |
||||
|
||||
void run(); |
||||
|
||||
boolean action(TaskAction taskAction); |
||||
|
||||
String getType(); |
||||
|
||||
boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval); |
||||
|
||||
ExecutionStatus taskState(); |
||||
|
||||
} |
@ -0,0 +1,32 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
|
||||
public class SubTaskProcessFactory implements ITaskProcessFactory { |
||||
@Override |
||||
public String type() { |
||||
return TaskType.SUB_PROCESS.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new SubTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,171 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; |
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.Date; |
||||
import java.util.concurrent.locks.Lock; |
||||
import java.util.concurrent.locks.ReentrantLock; |
||||
|
||||
/** |
||||
* |
||||
*/ |
||||
public class SubTaskProcessor extends BaseTaskProcessor { |
||||
|
||||
private ProcessInstance processInstance; |
||||
|
||||
private ProcessInstance subProcessInstance = null; |
||||
private TaskDefinition taskDefinition; |
||||
|
||||
/** |
||||
* run lock |
||||
*/ |
||||
private final Lock runLock = new ReentrantLock(); |
||||
|
||||
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); |
||||
|
||||
@Override |
||||
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { |
||||
this.processInstance = processInstance; |
||||
taskDefinition = processService.findTaskDefinition( |
||||
task.getTaskCode(), task.getTaskDefinitionVersion() |
||||
); |
||||
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval); |
||||
|
||||
if (this.taskInstance == null) { |
||||
return false; |
||||
} |
||||
|
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public ExecutionStatus taskState() { |
||||
return this.taskInstance.getState(); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
try { |
||||
this.runLock.lock(); |
||||
if (setSubWorkFlow()) { |
||||
updateTaskState(); |
||||
} |
||||
} catch (Exception e) { |
||||
logger.error("work flow {} sub task {} exceptions", |
||||
this.processInstance.getId(), |
||||
this.taskInstance.getId(), |
||||
e); |
||||
} finally { |
||||
this.runLock.unlock(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected boolean taskTimeout() { |
||||
TaskTimeoutStrategy taskTimeoutStrategy = |
||||
taskDefinition.getTimeoutNotifyStrategy(); |
||||
if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy |
||||
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) { |
||||
return true; |
||||
} |
||||
logger.info("sub process task {} timeout, strategy {} ", |
||||
taskInstance.getId(), taskTimeoutStrategy.getDescp()); |
||||
killTask(); |
||||
return true; |
||||
} |
||||
|
||||
private void updateTaskState() { |
||||
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
logger.info("work flow {} task {}, sub work flow: {} state: {}", |
||||
this.processInstance.getId(), |
||||
this.taskInstance.getId(), |
||||
subProcessInstance.getId(), |
||||
subProcessInstance.getState().getDescp()); |
||||
if (subProcessInstance != null && subProcessInstance.getState().typeIsFinished()) { |
||||
taskInstance.setState(subProcessInstance.getState()); |
||||
taskInstance.setEndTime(new Date()); |
||||
processService.saveTaskInstance(taskInstance); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected boolean pauseTask() { |
||||
pauseSubWorkFlow(); |
||||
return true; |
||||
} |
||||
|
||||
private boolean pauseSubWorkFlow() { |
||||
ProcessInstance subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) { |
||||
return false; |
||||
} |
||||
subProcessInstance.setState(ExecutionStatus.READY_PAUSE); |
||||
processService.updateProcessInstance(subProcessInstance); |
||||
//TODO...
|
||||
// send event to sub process master
|
||||
return true; |
||||
} |
||||
|
||||
private boolean setSubWorkFlow() { |
||||
logger.info("set work flow {} task {} running", |
||||
this.processInstance.getId(), |
||||
this.taskInstance.getId()); |
||||
if (this.subProcessInstance != null) { |
||||
return true; |
||||
} |
||||
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) { |
||||
return false; |
||||
} |
||||
|
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); |
||||
taskInstance.setStartTime(new Date()); |
||||
processService.updateTaskInstance(taskInstance); |
||||
logger.info("set sub work flow {} task {} state: {}", |
||||
processInstance.getId(), |
||||
taskInstance.getId(), |
||||
taskInstance.getState()); |
||||
return true; |
||||
|
||||
} |
||||
|
||||
@Override |
||||
protected boolean killTask() { |
||||
ProcessInstance subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId()); |
||||
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) { |
||||
return false; |
||||
} |
||||
subProcessInstance.setState(ExecutionStatus.READY_STOP); |
||||
processService.updateProcessInstance(subProcessInstance); |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public String getType() { |
||||
return TaskType.SUB_PROCESS.getDesc(); |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
|
||||
public class SwitchTaskProcessFactory implements ITaskProcessFactory { |
||||
|
||||
@Override |
||||
public String type() { |
||||
return TaskType.SWITCH.getDesc(); |
||||
} |
||||
|
||||
@Override |
||||
public ITaskProcessor create() { |
||||
return new SwitchTaskProcessor(); |
||||
} |
||||
} |
@ -0,0 +1,27 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
/** |
||||
* task action |
||||
*/ |
||||
public enum TaskAction { |
||||
PAUSE, |
||||
STOP, |
||||
TIMEOUT |
||||
} |
@ -0,0 +1,53 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
|
||||
import java.util.Map; |
||||
import java.util.ServiceLoader; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import com.google.common.base.Strings; |
||||
|
||||
/** |
||||
* the factory to create task processor |
||||
*/ |
||||
public class TaskProcessorFactory { |
||||
|
||||
public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = new ConcurrentHashMap<>(); |
||||
|
||||
private static final String DEFAULT_PROCESSOR = Constants.COMMON_TASK_TYPE; |
||||
|
||||
static { |
||||
for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) { |
||||
PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor); |
||||
} |
||||
} |
||||
|
||||
public static ITaskProcessor getTaskProcessor(String type) { |
||||
if (Strings.isNullOrEmpty(type)) { |
||||
return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create(); |
||||
} |
||||
if (!PROCESS_FACTORY_MAP.containsKey(type)) { |
||||
return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create(); |
||||
} |
||||
return PROCESS_FACTORY_MAP.get(type).create(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,59 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.worker.processor; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* update process host |
||||
* this used when master failover |
||||
*/ |
||||
public class HostUpdateProcessor implements NettyRequestProcessor { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(HostUpdateProcessor.class); |
||||
|
||||
/** |
||||
* task callback service |
||||
*/ |
||||
private final TaskCallbackService taskCallbackService; |
||||
|
||||
public HostUpdateProcessor() { |
||||
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); |
||||
} |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUST == command.getType(), String.format("invalid command type : %s", command.getType())); |
||||
HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class); |
||||
logger.info("received host update command : {}", updateCommand); |
||||
taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); |
||||
|
||||
} |
||||
} |
@ -0,0 +1,22 @@
|
||||
# |
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# |
||||
|
||||
org.apache.dolphinscheduler.server.master.runner.task.CommonTaskProcessFactory |
||||
org.apache.dolphinscheduler.server.master.runner.task.ConditionTaskProcessFactory |
||||
org.apache.dolphinscheduler.server.master.runner.task.DependentTaskProcessFactory |
||||
org.apache.dolphinscheduler.server.master.runner.task.SubTaskProcessFactory |
||||
org.apache.dolphinscheduler.server.master.runner.task.SwitchTaskProcessFactory |
@ -0,0 +1,38 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner.task; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
|
||||
public class TaskProcessorFactoryTest { |
||||
|
||||
@Test |
||||
public void testFactory() { |
||||
|
||||
TaskInstance taskInstance = new TaskInstance(); |
||||
taskInstance.setTaskType("shell"); |
||||
|
||||
ITaskProcessor iTaskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); |
||||
|
||||
Assert.assertNotNull(iTaskProcessor); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,109 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.service.queue; |
||||
|
||||
import org.apache.dolphinscheduler.common.model.Server; |
||||
|
||||
import java.util.Comparator; |
||||
import java.util.HashMap; |
||||
import java.util.Iterator; |
||||
import java.util.List; |
||||
import java.util.concurrent.PriorityBlockingQueue; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
public class MasterPriorityQueue implements TaskPriorityQueue<Server> { |
||||
|
||||
/** |
||||
* queue size |
||||
*/ |
||||
private static final Integer QUEUE_MAX_SIZE = 20; |
||||
|
||||
/** |
||||
* queue |
||||
*/ |
||||
private PriorityBlockingQueue<Server> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new ServerComparator()); |
||||
|
||||
private HashMap<String, Integer> hostIndexMap = new HashMap<>(); |
||||
|
||||
@Override |
||||
public void put(Server serverInfo) { |
||||
this.queue.put(serverInfo); |
||||
refreshMasterList(); |
||||
} |
||||
|
||||
@Override |
||||
public Server take() throws InterruptedException { |
||||
return queue.take(); |
||||
} |
||||
|
||||
@Override |
||||
public Server poll(long timeout, TimeUnit unit) { |
||||
return queue.poll(); |
||||
} |
||||
|
||||
@Override |
||||
public int size() { |
||||
return queue.size(); |
||||
} |
||||
|
||||
public void putList(List<Server> serverList) { |
||||
for (Server server : serverList) { |
||||
this.queue.put(server); |
||||
} |
||||
refreshMasterList(); |
||||
} |
||||
|
||||
public void remove(Server server) { |
||||
this.queue.remove(server); |
||||
} |
||||
|
||||
public void clear() { |
||||
queue.clear(); |
||||
refreshMasterList(); |
||||
} |
||||
|
||||
private void refreshMasterList() { |
||||
hostIndexMap.clear(); |
||||
Iterator<Server> iterator = queue.iterator(); |
||||
int index = 0; |
||||
while (iterator.hasNext()) { |
||||
Server server = iterator.next(); |
||||
hostIndexMap.put(server.getHost(), index); |
||||
index += 1; |
||||
} |
||||
|
||||
} |
||||
|
||||
public int getIndex(String host) { |
||||
if (!hostIndexMap.containsKey(host)) { |
||||
return -1; |
||||
} |
||||
return hostIndexMap.get(host); |
||||
} |
||||
|
||||
/** |
||||
* server comparator |
||||
*/ |
||||
private class ServerComparator implements Comparator<Server> { |
||||
@Override |
||||
public int compare(Server o1, Server o2) { |
||||
return o1.getCreateTime().before(o2.getCreateTime()) ? 1 : 0; |
||||
} |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue