diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java index 5fb245afef..cc3a29565b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java @@ -57,4 +57,14 @@ public enum DbType { public String getDescp() { return descp; } + + + public static DbType of(int type){ + for(DbType ty : values()){ + if(ty.getCode() == type){ + return ty; + } + } + throw new IllegalArgumentException("invalid type : " + type); + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java index 2e8754031b..b8cfd89fc2 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class KillTaskRequestCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * processId */ private int processId; /** * host */ private String host; /** * tenantCode */ private String tenantCode; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getTenantCode() { return tenantCode; } public void setTenantCode(String tenantCode) { this.tenantCode = tenantCode; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class KillTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public KillTaskRequestCommand() { } public KillTaskRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "KillTaskRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 8cdd13ef7f..1388e79c5a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -17,10 +17,11 @@ package org.apache.dolphinscheduler.server.builder; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; /** * TaskExecutionContext builder @@ -47,6 +48,8 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setLogPath(taskInstance.getLogPath()); taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); + taskExecutionContext.setHost(taskInstance.getHost()); + taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); return this; } @@ -54,7 +57,7 @@ public class TaskExecutionContextBuilder { /** * build processInstance related info * - * @param processInstance + * @param processInstance processInstance * @return TaskExecutionContextBuilder */ public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(ProcessInstance processInstance){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java new file mode 100644 index 0000000000..dd8d64698f --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.entity; + +import java.io.Serializable; + +/** + * master/worker task transport + */ +public class DataxTaskExecutionContext implements Serializable{ + + /** + * dataSourceId + */ + private int dataSourceId; + + /** + * sourcetype + */ + private int sourcetype; + + /** + * sourceConnectionParams + */ + private String sourceConnectionParams; + + /** + * dataTargetId + */ + private int dataTargetId; + + /** + * targetType + */ + private int targetType; + + /** + * targetConnectionParams + */ + private String targetConnectionParams; + + public int getDataSourceId() { + return dataSourceId; + } + + public void setDataSourceId(int dataSourceId) { + this.dataSourceId = dataSourceId; + } + + public int getSourcetype() { + return sourcetype; + } + + public void setSourcetype(int sourcetype) { + this.sourcetype = sourcetype; + } + + public String getSourceConnectionParams() { + return sourceConnectionParams; + } + + public void setSourceConnectionParams(String sourceConnectionParams) { + this.sourceConnectionParams = sourceConnectionParams; + } + + public int getDataTargetId() { + return dataTargetId; + } + + public void setDataTargetId(int dataTargetId) { + this.dataTargetId = dataTargetId; + } + + public int getTargetType() { + return targetType; + } + + public void setTargetType(int targetType) { + this.targetType = targetType; + } + + public String getTargetConnectionParams() { + return targetConnectionParams; + } + + public void setTargetConnectionParams(String targetConnectionParams) { + this.targetConnectionParams = targetConnectionParams; + } + + @Override + public String toString() { + return "DataxTaskExecutionContext{" + + "dataSourceId=" + dataSourceId + + ", sourcetype=" + sourcetype + + ", sourceConnectionParams='" + sourceConnectionParams + '\'' + + ", dataTargetId=" + dataTargetId + + ", targetType=" + targetType + + ", targetConnectionParams='" + targetConnectionParams + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java new file mode 100644 index 0000000000..b1ec20dd52 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.entity; + +import org.apache.dolphinscheduler.dao.entity.UdfFunc; + +import java.io.Serializable; +import java.util.List; + +/** + * SQL Task ExecutionContext + */ +public class SQLTaskExecutionContext implements Serializable { + + + /** + * warningGroupId + */ + private int warningGroupId; + /** + * udf function list + */ + private List udfFuncList; + + + public int getWarningGroupId() { + return warningGroupId; + } + + public void setWarningGroupId(int warningGroupId) { + this.warningGroupId = warningGroupId; + } + + public List getUdfFuncList() { + return udfFuncList; + } + + public void setUdfFuncList(List udfFuncList) { + this.udfFuncList = udfFuncList; + } + + @Override + public String toString() { + return "SQLTaskExecutionContext{" + + "warningGroupId=" + warningGroupId + + ", udfFuncList=" + udfFuncList + + '}'; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java similarity index 89% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 3ed71e5e93..fb3aab9761 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -15,12 +15,10 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.entity; +package org.apache.dolphinscheduler.server.entity; import java.io.Serializable; import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** @@ -144,7 +142,6 @@ public class TaskExecutionContext implements Serializable{ */ private Map definedParams; - /** * task AppId */ @@ -165,6 +162,20 @@ public class TaskExecutionContext implements Serializable{ */ private String workerGroup; + + /** + * sql TaskExecutionContext + */ + private SQLTaskExecutionContext sqlTaskExecutionContext; + + /** + * datax TaskExecutionContext + */ + private DataxTaskExecutionContext dataxTaskExecutionContext; + + + + public String getWorkerGroup() { return workerGroup; } @@ -373,6 +384,21 @@ public class TaskExecutionContext implements Serializable{ this.appIds = appIds; } + public SQLTaskExecutionContext getSqlTaskExecutionContext() { + return sqlTaskExecutionContext; + } + + public void setSqlTaskExecutionContext(SQLTaskExecutionContext sqlTaskExecutionContext) { + this.sqlTaskExecutionContext = sqlTaskExecutionContext; + } + + public DataxTaskExecutionContext getDataxTaskExecutionContext() { + return dataxTaskExecutionContext; + } + + public void setDataxTaskExecutionContext(DataxTaskExecutionContext dataxTaskExecutionContext) { + this.dataxTaskExecutionContext = dataxTaskExecutionContext; + } @Override public String toString() { @@ -402,6 +428,9 @@ public class TaskExecutionContext implements Serializable{ ", taskAppId='" + taskAppId + '\'' + ", taskTimeoutStrategy=" + taskTimeoutStrategy + ", taskTimeout=" + taskTimeout + + ", workerGroup='" + workerGroup + '\'' + + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext + + ", dataxTaskExecutionContext=" + dataxTaskExecutionContext + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 7c33b9026f..4c0c3e8ce2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; @@ -127,6 +128,7 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.start(); // diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java index 98d2a24726..a62ee49f8e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.cache; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; /** * task instance state manager diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java index 6624eebc64..dc775d8d67 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java @@ -20,7 +20,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.service.process.ProcessService; import org.springframework.beans.factory.annotation.Autowired; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index c597dc196a..df563a6209 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -67,8 +67,10 @@ public class ExecutorDispatcher implements InitializingBean { } /** - * task dispatch + * task dispatch + * * @param context context + * @return result * @throws ExecuteException */ public Boolean dispatch(final ExecutionContext context) throws ExecuteException { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java index 5157dd288f..19124d3a0c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.server.master.dispatch.context; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java index c0be5a875f..9e4c222211 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java @@ -27,6 +27,7 @@ public abstract class AbstractExecutorManager implements ExecutorManager{ /** * before execute , add time monitor , timeout + * * @param context context * @throws ExecuteException */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java index 9b0b9af0e4..f1707df66b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java @@ -27,6 +27,7 @@ public interface ExecutorManager { /** * before execute + * * @param executeContext executeContext * @throws ExecuteException */ @@ -35,12 +36,13 @@ public interface ExecutorManager { /** * execute task * @param context context + * @return T * @throws ExecuteException */ T execute(ExecutionContext context) throws ExecuteException; /** - * after execute + * after execute * @param context context * @throws ExecuteException */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index f4b1dab6b1..544a958cc2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; @@ -72,9 +72,11 @@ public class NettyExecutorManager extends AbstractExecutorManager{ this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); } + /** - * execute logic + * execute logic * @param context context + * @return result * @throws ExecuteException */ @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java new file mode 100644 index 0000000000..54d0022cfe --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.executor; + +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; +import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; +import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * netty executor manager + */ +@Service +public class NettyKillManager extends AbstractExecutorManager{ + + private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class); + /** + * netty remote client + */ + private final NettyRemotingClient nettyRemotingClient; + + public NettyKillManager(){ + final NettyClientConfig clientConfig = new NettyClientConfig(); + this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + /** + * register KILL_TASK_RESPONSE command type TaskKillResponseProcessor + */ + this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor()); + } + + /** + * execute logic + * + * @param context context + * @return result + * @throws ExecuteException + */ + @Override + public Boolean execute(ExecutionContext context) throws ExecuteException { + Host host = context.getHost(); + Command command = buildCommand(context); + try { + doExecute(host, command); + return true; + }catch (ExecuteException ex) { + logger.error(String.format("execute context : %s error", context.getContext()), ex); + return false; + } + } + + + private Command buildCommand(ExecutionContext context) { + KillTaskRequestCommand requestCommand = new KillTaskRequestCommand(); + TaskExecutionContext taskExecutionContext = context.getContext(); + + requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext)); + return requestCommand.convert2Command(); + } + + /** + * execute logic + * @param host host + * @param command command + * @throws ExecuteException + */ + private void doExecute(final Host host, final Command command) throws ExecuteException { + /** + * retry count,default retry 3 + */ + int retryCount = 3; + boolean success = false; + do { + try { + nettyRemotingClient.send(host, command); + success = true; + } catch (Exception ex) { + logger.error(String.format("send command : %s to %s error", command, host), ex); + retryCount--; + try { + Thread.sleep(100); + } catch (InterruptedException ignore) {} + } + } while (retryCount >= 0 && !success); + + if (!success) { + throw new ExecuteException(String.format("send command : %s to %s error", command, host)); + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java index cadf418f51..bdf0f412f4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java @@ -24,6 +24,12 @@ import java.util.Collection; */ public class LowerWeightRoundRobin implements Selector{ + /** + * select + * @param sources sources + * @return HostWeight + */ + @Override public HostWeight select(Collection sources){ int totalWeight = 0; int lowWeight = 0; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java index 0c6d7402be..d22c6f20e7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java @@ -68,7 +68,7 @@ public class TaskFuture { } /** - * wait for response + * wait for response * @return command * @throws InterruptedException */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java new file mode 100644 index 0000000000..d6c3f69c1f --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; +import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * task response processor + */ +public class TaskKillResponseProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class); + + /** + * task final result response + * need master process , state persistence + * + * @param channel channel + * @param command command + */ + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); + + KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class); + logger.info("received command : {}", responseCommand); + logger.info("已经接受到了worker杀任务的回应"); + } + + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index d6279c625e..ed761539f9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -72,6 +72,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor { processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), + responseCommand.getProcessId(), + responseCommand.getAppIds(), responseCommand.getTaskInstanceId()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index ebfb2f4dc0..1eb06b6d65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -67,9 +67,10 @@ public class MasterRegistry { private final String startTime; /** - * construct + * construct * @param zookeeperRegistryCenter zookeeperRegistryCenter * @param port port + * @param heartBeatInterval heartBeatInterval */ public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ this.zookeeperRegistryCenter = zookeeperRegistryCenter; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 7812dbf5e7..9d40de98d8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -147,7 +147,7 @@ public class MasterBaseTaskExecThread implements Callable { * @param taskInstance taskInstance * @return TaskExecutionContext */ - private TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){ + protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){ taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 576dc76ba7..a05f8dc9ee 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -142,8 +142,9 @@ public class MasterExecThread implements Runnable { /** * constructor of MasterExecThread - * @param processInstance process instance - * @param processService process dao + * @param processInstance processInstance + * @param processService processService + * @param nettyRemotingClient nettyRemotingClient */ public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){ this.processService = processService; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index feba5a209e..07f9168594 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -26,15 +26,19 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; +import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; -import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_KILL; /** * master task exec thread @@ -52,6 +56,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { */ private TaskInstanceCacheManager taskInstanceCacheManager; + + private NettyKillManager nettyKillManager; + /** * constructor of MasterTaskExecThread * @param taskInstance task instance @@ -60,6 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ super(taskInstance, processInstance); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); + this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class); } /** @@ -78,6 +86,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { /** * TODO submit task instance and wait complete + * * @return true is task quit is true */ @Override @@ -99,14 +108,14 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } /** - * TODO 在这里轮询数据库 + * TODO polling db * * wait task quit * @return true if task quit success */ public Boolean waitTaskQuit(){ // query new state - taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId()); + taskInstance = processService.findTaskInstanceById(taskInstance.getId()); logger.info("wait task: process id: {}, task id:{}, task name:{} complete", this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); // task time out @@ -147,7 +156,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } } // updateProcessInstance task instance - taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId()); + taskInstance = processService.findTaskInstanceById(taskInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId()); Thread.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { @@ -163,23 +172,26 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { /** - * TODO Kill 任务 + * TODO Kill TASK * * task instance add queue , waiting worker to kill */ - private void cancelTaskInstance(){ + private void cancelTaskInstance() throws Exception{ if(alreadyKilled){ return ; } alreadyKilled = true; - String host = taskInstance.getHost(); - if(host == null){ - host = Constants.NULL; - } - String queueValue = String.format("%s-%d", - host, taskInstance.getId()); - // TODO 这里写 - taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue); + + TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance); + + ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER); + + Host host = new Host(); + host.setIp(taskInstance.getHost()); + host.setPort(12346); + executionContext.setHost(host); + + nettyKillManager.execute(executionContext); logger.info("master add kill task :{} id:{} to kill queue", taskInstance.getName(), taskInstance.getId() ); @@ -197,7 +209,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { /** - * get remain time(s) + * get remain time?s? * * @return remain time */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java index 3ee9488a3e..8d7bf0bb89 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java @@ -23,6 +23,11 @@ public interface Monitor { /** * monitor server and restart + * + * @param masterPath masterPath + * @param workerPath workerPath + * @param port port + * @param installPath installPath */ void monitor(String masterPath, String workerPath, Integer port, String installPath); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java index 25355e2925..9a4a7caaf1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -74,7 +74,7 @@ public class ZookeeperNodeManager implements InitializingBean { private ZookeeperRegistryCenter registryCenter; /** - * init listener + * init listener * @throws Exception */ @Override @@ -234,8 +234,8 @@ public class ZookeeperNodeManager implements InitializingBean { /** * get worker group nodes - * @param workerGroup - * @return + * @param workerGroup workerGroup + * @return worker nodes */ public Set getWorkerGroupNodes(String workerGroup){ workerGroupLock.lock(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index a6a3ea0822..b186a4290a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -129,7 +129,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker group directly - * @return + * @return worker group nodes */ public Set getWorkerGroupDirectly() { List workers = getChildrenKeys(getWorkerPath()); @@ -166,8 +166,8 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker group path - * @param workerGroup - * @return + * @param workerGroup workerGroup + * @return worker group path */ public String getWorkerGroupPath(String workerGroup) { return WORKER_PATH + "/" + workerGroup; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java index 3aba54622a..063a7d7f82 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java @@ -111,7 +111,8 @@ public class ParamUtils { /** * get parameters map - * @return user defined params map + * @param definedParams definedParams + * @return parameters map */ public static Map getUserDefParamsMap(Map definedParams) { if (definedParams != null) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index c6efbaebf3..ee1f09173b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -21,9 +21,8 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.commons.io.FileUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.service.log.LogClientService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java index db78127dc1..7df8e01b3d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java @@ -17,7 +17,8 @@ package org.apache.dolphinscheduler.server.worker.cache; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; + +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; /** * TaskExecutionContextCacheManager diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java index 584c42bbba..009332f05c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.worker.cache.impl; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.springframework.stereotype.Service; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 3c79e8cb43..98e4e92235 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -33,13 +33,12 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 1ea73940ff..c910aed279 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -28,9 +29,9 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand; import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; @@ -75,32 +76,35 @@ public class TaskKillProcessor implements NettyRequestProcessor { this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } + /** * kill task logic * - * @param killCommand killCommand + * @param context context + * @return execute result */ - private Boolean doKill(KillTaskRequestCommand killCommand){ + private Boolean doKill(TaskExecutionContext context){ try { - TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); + TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(context.getTaskInstanceId()); + context.setProcessId(taskExecutionContext.getProcessId()); Integer processId = taskExecutionContext.getProcessId(); if (processId == null || processId.equals(0)){ - logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId()); + logger.error("process kill failed, process id :{}, task id:{}", processId, context.getTaskInstanceId()); return false; } - killCommand.setProcessId(processId); - String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(killCommand.getProcessId())); + String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(context.getProcessId())); - logger.info("process id:{}, cmd:{}", killCommand.getProcessId(), cmd); + logger.info("process id:{}, cmd:{}", context.getProcessId(), cmd); OSUtils.exeCmd(cmd); + // find log and kill yarn job - killYarnJob(killCommand.getHost(), killCommand.getLogPath(), killCommand.getExecutePath(), killCommand.getTenantCode()); + killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode()); return true; } catch (Exception e) { @@ -115,29 +119,37 @@ public class TaskKillProcessor implements NettyRequestProcessor { KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class); logger.info("received command : {}", killTaskRequestCommand); - Boolean killStatus = doKill(killTaskRequestCommand); - KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(killTaskRequestCommand,killStatus); + String contextJson = killTaskRequestCommand.getTaskExecutionContext(); + + TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); + + Boolean killStatus = doKill(taskExecutionContext); + + killTaskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), + new NettyRemoteChannel(channel, command.getOpaque())); + + KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus); killTaskCallbackService.sendKillResult(killTaskResponseCommand.getTaskInstanceId(),killTaskResponseCommand); } /** * build KillTaskResponseCommand * - * @param killTaskRequestCommand killTaskRequestCommand + * @param taskExecutionContext taskExecutionContext * @param killStatus killStatus - * @return KillTaskResponseCommand + * @return build KillTaskResponseCommand */ - private KillTaskResponseCommand buildKillTaskResponseCommand(KillTaskRequestCommand killTaskRequestCommand, + private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext, Boolean killStatus) { KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand(); - killTaskResponseCommand.setTaskInstanceId(killTaskRequestCommand.getTaskInstanceId()); - killTaskResponseCommand.setHost(killTaskRequestCommand.getHost()); + killTaskResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + killTaskResponseCommand.setHost(taskExecutionContext.getHost()); killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); - killTaskResponseCommand.setProcessId(killTaskRequestCommand.getProcessId()); + killTaskResponseCommand.setProcessId(taskExecutionContext.getProcessId()); killTaskResponseCommand.setAppIds(appIds); - return null; + return killTaskResponseCommand; } /** @@ -156,6 +168,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { String log = null; try { logClient = new LogClientService(); + logger.info("view log host : {},logPath : {}", host,logPath); log = logClient.viewLog(host, Constants.RPC_PORT, logPath); } finally { if(logClient != null){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 977643c25a..a1d55240b2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -76,9 +76,11 @@ public class WorkerRegistry { private String workerGroup; /** - * construct + * construct + * * @param zookeeperRegistryCenter zookeeperRegistryCenter * @param port port + * @param heartBeatInterval heartBeatInterval */ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index d26feaf016..d3161eca59 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index e573d3a1a4..772711585d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -21,17 +21,13 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -66,11 +62,6 @@ public abstract class AbstractCommandExecutor { */ protected Consumer> logHandler; - /** - * timeout - */ - protected int timeout; - /** * logger */ @@ -132,6 +123,7 @@ public abstract class AbstractCommandExecutor { /** * task specific execution logic + * * @param execCommand execCommand * @return CommandExecuteResult * @throws Exception @@ -174,8 +166,6 @@ public abstract class AbstractCommandExecutor { // waiting for the run to finish boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); - // SHELL task state - result.setExitStatusCode(process.exitValue()); logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}", taskExecutionContext.getExecutePath(), @@ -195,6 +185,9 @@ public abstract class AbstractCommandExecutor { ProcessUtils.kill(taskExecutionContext); result.setExitStatusCode(EXIT_CODE_FAILURE); } + + // SHELL task state + result.setExitStatusCode(process.exitValue()); return result; } @@ -378,7 +371,7 @@ public abstract class AbstractCommandExecutor { List appIds = new ArrayList<>(); /** - * analysis log,get submited yarn application id + * analysis log?get submited yarn application id */ for (String log : logs) { String appId = findAppId(log); @@ -440,13 +433,13 @@ public abstract class AbstractCommandExecutor { /** - * get remain time(s) + * get remain time?s? * * @return remain time */ private long getRemaintime() { long usedTime = (System.currentTimeMillis() - taskExecutionContext.getStartTime().getTime()) / 1000; - long remainTime = timeout - usedTime; + long remainTime = taskExecutionContext.getTaskTimeout() - usedTime; if (remainTime < 0) { throw new RuntimeException("task execution time out"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index e6dd973edd..86aed54ac1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.TaskRecordDao; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java index 2ce397ab77..62e35fd20c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index ad0671f8c5..344d00fa88 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index 877c6076d9..6b25cd3577 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.commons.io.FileUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.slf4j.Logger; import java.io.File; @@ -25,7 +25,6 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Date; import java.util.List; import java.util.function.Consumer; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index 468375c264..1fef7e656e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.EnumUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.http.HttpTask; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java index a7b66bb398..483dd18cd5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java @@ -129,19 +129,21 @@ public class TaskProps { /** * constructor - * @param taskParams task params - * @param taskDir task dir - * @param scheduleTime schedule time - * @param nodeName node name - * @param taskType task type - * @param taskInstanceId task instance id - * @param envFile env file - * @param tenantCode tenant code - * @param queue queue - * @param taskStartTime task start time - * @param definedParams defined params - * @param dependence dependence - * @param cmdTypeIfComplement cmd type if complement + * @param taskParams taskParams + * @param scheduleTime scheduleTime + * @param nodeName nodeName + * @param taskType taskType + * @param taskInstanceId taskInstanceId + * @param envFile envFile + * @param tenantCode tenantCode + * @param queue queue + * @param taskStartTime taskStartTime + * @param definedParams definedParams + * @param dependence dependence + * @param cmdTypeIfComplement cmdTypeIfComplement + * @param host host + * @param logPath logPath + * @param executePath executePath */ public TaskProps(String taskParams, Date scheduleTime, diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 24abe57eb1..3e5aa51d00 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -39,6 +39,7 @@ import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -50,13 +51,13 @@ import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.DataxUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -98,11 +99,6 @@ public class DataxTask extends AbstractTask { */ private DataxParameters dataXParameters; - /** - * task dir - */ - private String taskDir; - /** * shell command executor */ @@ -113,11 +109,6 @@ public class DataxTask extends AbstractTask { */ private TaskExecutionContext taskExecutionContext; - /** - * processService - */ - private ProcessService processService; - /** * constructor * @param taskExecutionContext taskExecutionContext @@ -127,13 +118,9 @@ public class DataxTask extends AbstractTask { super(taskExecutionContext, logger); this.taskExecutionContext = taskExecutionContext; - logger.info("task dir : {}", taskDir); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext,logger); - - processService = SpringApplicationContext.getBean(ProcessService.class); - } /** @@ -151,12 +138,11 @@ public class DataxTask extends AbstractTask { /** * run DataX process - * + * * @throws Exception */ @Override - public void handle() - throws Exception { + public void handle() throws Exception { try { // set the name of the current thread String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); @@ -180,8 +166,8 @@ public class DataxTask extends AbstractTask { /** * cancel DataX process - * - * @param cancelApplication + * + * @param cancelApplication cancelApplication * @throws Exception */ @Override @@ -200,7 +186,9 @@ public class DataxTask extends AbstractTask { private String buildDataxJsonFile() throws Exception { // generate json - String fileName = String.format("%s/%s_job.json", taskDir, taskExecutionContext.getTaskAppId()); + String fileName = String.format("%s/%s_job.json", + taskExecutionContext.getExecutePath(), + taskExecutionContext.getTaskAppId()); Path path = new File(fileName).toPath(); if (Files.exists(path)) { @@ -230,13 +218,14 @@ public class DataxTask extends AbstractTask { */ private List buildDataxJobContentJson() throws SQLException { - DataSource dataSource = processService.findDataSourceById(dataXParameters.getDataSource()); - BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(), - dataSource.getConnectionParams()); + DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext(); - DataSource dataTarget = processService.findDataSourceById(dataXParameters.getDataTarget()); - BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(), - dataTarget.getConnectionParams()); + + BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getSourcetype()), + dataxTaskExecutionContext.getSourceConnectionParams()); + + BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getTargetType()), + dataxTaskExecutionContext.getTargetConnectionParams()); List readerConnArr = new ArrayList<>(); JSONObject readerConn = new JSONObject(); @@ -250,7 +239,7 @@ public class DataxTask extends AbstractTask { readerParam.put("connection", readerConnArr); JSONObject reader = new JSONObject(); - reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType())); + reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype()))); reader.put("parameter", readerParam); List writerConnArr = new ArrayList<>(); @@ -263,7 +252,9 @@ public class DataxTask extends AbstractTask { writerParam.put("username", dataTargetCfg.getUser()); writerParam.put("password", dataTargetCfg.getPassword()); writerParam.put("column", - parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql())); + parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()), + DbType.of(dataxTaskExecutionContext.getTargetType()), + dataSourceCfg, dataXParameters.getSql())); writerParam.put("connection", writerConnArr); if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) { @@ -275,7 +266,7 @@ public class DataxTask extends AbstractTask { } JSONObject writer = new JSONObject(); - writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType())); + writer.put("name", DataxUtils.getWriterPluginName(DbType.of(dataxTaskExecutionContext.getTargetType()))); writer.put("parameter", writerParam); List contentList = new ArrayList<>(); @@ -348,7 +339,9 @@ public class DataxTask extends AbstractTask { private String buildShellCommandFile(String jobConfigFilePath) throws Exception { // generate scripts - String fileName = String.format("%s/%s_node.sh", taskDir, taskExecutionContext.getTaskAppId()); + String fileName = String.format("%s/%s_node.sh", + taskExecutionContext.getExecutePath(), + taskExecutionContext.getTaskAppId()); Path path = new File(fileName).toPath(); if (Files.exists(path)) { @@ -364,9 +357,6 @@ public class DataxTask extends AbstractTask { sbr.append(jobConfigFilePath); String dataxCommand = sbr.toString(); - // find process instance by task id - ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId()); - // combining local and global parameters // replace placeholder Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index f9ef958ade..f264749ed5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -23,12 +23,10 @@ import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.slf4j.Logger; import java.util.ArrayList; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index d5d2bb2277..74a17284d0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -30,13 +30,9 @@ import org.apache.dolphinscheduler.common.task.http.HttpParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.http.HttpEntity; import org.apache.http.ParseException; import org.apache.http.client.config.RequestConfig; @@ -67,10 +63,7 @@ public class HttpTask extends AbstractTask { */ private HttpParameters httpParameters; - /** - * process service - */ - private ProcessService processService; + /** * Convert mill seconds to second unit @@ -146,7 +139,6 @@ public class HttpTask extends AbstractTask { */ protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException { RequestBuilder builder = createRequestBuilder(); - ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId()); // replace placeholder Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index 3923e7c996..fbc7e21ad2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java index 5c351d410b..82cb6a216e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java @@ -31,10 +31,9 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index ada6b70891..7a66227b8d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import java.util.Map; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 68b9b04ad3..9fa2abec80 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import java.io.File; @@ -54,11 +51,6 @@ public class ShellTask extends AbstractTask { */ private ShellParameters shellParameters; - /** - * task dir - */ - private String taskDir; - /** * shell command executor */ @@ -122,7 +114,10 @@ public class ShellTask extends AbstractTask { */ private String buildCommand() throws Exception { // generate scripts - String fileName = String.format("%s/%s_node.sh", taskDir, taskExecutionContext.getTaskAppId()); + String fileName = String.format("%s/%s_node.sh", + taskExecutionContext.getExecutePath(), + taskExecutionContext.getTaskAppId()); + Path path = new File(fileName).toPath(); if (Files.exists(path)) { @@ -148,7 +143,7 @@ public class ShellTask extends AbstractTask { shellParameters.setRawScript(script); logger.info("raw script : {}", shellParameters.getRawScript()); - logger.info("task dir : {}", taskDir); + logger.info("task execute path : {}", taskExecutionContext.getExecutePath()); Set perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X); FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 4bb91dd1aa..e25cffb9be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -24,11 +24,10 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.slf4j.Logger; import java.util.ArrayList; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 87758307ea..9a45c7d090 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.alert.utils.MailUtils; import org.apache.dolphinscheduler.common.Constants; @@ -33,18 +32,15 @@ import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; -import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.UDFUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.permission.PermissionCheck; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import java.sql.*; @@ -64,22 +60,10 @@ public class SqlTask extends AbstractTask { * sql parameters */ private SqlParameters sqlParameters; - - /** - * process service - */ - private ProcessService processService; - /** * alert dao */ private AlertDao alertDao; - - /** - * datasource - */ - private DataSource dataSource; - /** * base datasource */ @@ -102,7 +86,7 @@ public class SqlTask extends AbstractTask { if (!sqlParameters.checkParameters()) { throw new RuntimeException("sql task params is not valid"); } - this.processService = SpringApplicationContext.getBean(ProcessService.class); + this.alertDao = SpringApplicationContext.getBean(AlertDao.class); } @@ -111,6 +95,7 @@ public class SqlTask extends AbstractTask { // set the name of the current thread String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); + logger.info("Full sql parameters: {}", sqlParameters); logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}", sqlParameters.getType(), @@ -121,37 +106,15 @@ public class SqlTask extends AbstractTask { sqlParameters.getShowType(), sqlParameters.getConnParams()); - // not set data source - if (sqlParameters.getDatasource() == 0){ - logger.error("datasource id not exists"); - exitStatusCode = -1; - return; - } - - dataSource= processService.findDataSourceById(sqlParameters.getDatasource()); - - // data source is null - if (dataSource == null){ - logger.error("datasource not exists"); - exitStatusCode = -1; - return; - } - - logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", - dataSource.getName(), - dataSource.getType(), - dataSource.getNote(), - dataSource.getUserId(), - dataSource.getConnectionParams()); - Connection con = null; List createFuncs = null; try { // load class - DataSourceFactory.loadClass(dataSource.getType()); + DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType())); + // get datasource - baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(), - dataSource.getConnectionParams()); + baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()), + sqlParameters.getConnParams()); // ready to execute SQL and parameter entity Map SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql()); @@ -175,9 +138,8 @@ public class SqlTask extends AbstractTask { for(int i=0;i udfFuncList = processService.queryUdfFunListByids(idsArray); - createFuncs = UDFUtils.createFuncs(udfFuncList, taskExecutionContext.getTenantCode(), logger); + SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext(); + createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), taskExecutionContext.getTenantCode(), logger); } // execute sql task @@ -262,7 +224,7 @@ public class SqlTask extends AbstractTask { CommonUtils.loadKerberosConf(); // if hive , load connection params if exists - if (HIVE == dataSource.getType()) { + if (HIVE == DbType.valueOf(sqlParameters.getType())) { Properties paramProp = new Properties(); paramProp.setProperty(USER, baseDataSource.getUser()); paramProp.setProperty(PASSWORD, baseDataSource.getPassword()); @@ -387,10 +349,7 @@ public class SqlTask extends AbstractTask { */ public void sendAttachment(String title,String content){ - // process instance - ProcessInstance instance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId()); - - List users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId()); + List users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId()); // receiving group list List receviersList = new ArrayList(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 05a0790126..77d2139aab 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; @@ -236,6 +236,8 @@ public class ZKMasterClient extends AbstractZKClient { /** * monitor master + * @param event event + * @param path path */ public void handleMasterEvent(TreeCacheEvent event, String path){ switch (event.getType()) { @@ -256,6 +258,8 @@ public class ZKMasterClient extends AbstractZKClient { /** * monitor worker + * @param event event + * @param path path */ public void handleWorkerEvent(TreeCacheEvent event, String path){ switch (event.getType()) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java index 33990bc59f..a1d70f8343 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java @@ -72,6 +72,8 @@ public class ZKWorkerClient extends AbstractZKClient { /** * monitor worker + * @param event event + * @param path path */ public void handleWorkerEvent(TreeCacheEvent event, String path){ switch (event.getType()) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index a26044e417..55cd634818 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1411,8 +1411,12 @@ public class ProcessService { */ public void changeTaskState(ExecutionStatus state, Date endTime, + int processId, + String appIds, int taskInstId) { TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); + taskInstance.setPid(processId); + taskInstance.setAppLink(appIds); taskInstance.setState(state); taskInstance.setEndTime(endTime); saveTaskInstance(taskInstance);