diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml index 76dd8980b7..cd6e04fc26 100644 --- a/dolphinscheduler-api/pom.xml +++ b/dolphinscheduler-api/pom.xml @@ -162,6 +162,12 @@ org.apache.hadoop hadoop-client + + + org.slf4j + slf4j-log4j12 + + diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java index e20c845d42..4ff769dce6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskRecordController.java @@ -22,11 +22,13 @@ import org.apache.dolphinscheduler.api.service.TaskRecordService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.User; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; + import springfox.documentation.annotations.ApiIgnore; import java.util.Map; @@ -59,7 +61,7 @@ public class TaskRecordController extends BaseController { * @param taskDate task date * @param startTime start time * @param endTime end time - * @param pageNo page numbere + * @param pageNo page number * @param pageSize page size * @return task record list */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 43c03f09d8..4fda99c37d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -193,7 +193,9 @@ public enum Status { BATCH_COPY_PROCESS_DEFINITION_ERROR(10159, "batch copy process definition error", "复制工作流错误"), BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"), QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"), - DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10162,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), + DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10162, "delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), + CHECK_TENANT_CODE_ERROR(10163, "Please enter the English tenant code", "请输入英文租户编码"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index cade36a1d6..90239b3969 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -434,7 +434,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements if (resultEnum != Status.SUCCESS) { return checkResult; } - ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), name); + ProcessDefinition processDefinition = processDefineMapper.verifyByDefineName(project.getId(), name); if (processDefinition == null) { putMsg(result, Status.SUCCESS); } else { @@ -683,6 +683,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements exportProcessMeta.setProjectName(processDefinition.getProjectName()); exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson()); + exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription()); exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index 3a267bcc8c..52f0d79ead 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.TenantService; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; @@ -73,11 +74,11 @@ public class TenantServiceImpl extends BaseService implements TenantService { /** * create tenant * - * @param loginUser login user + * @param loginUser login user * @param tenantCode tenant code * @param tenantName tenant name - * @param queueId queue id - * @param desc description + * @param queueId queue id + * @param desc description * @return create result code * @throws Exception exception */ @@ -94,6 +95,11 @@ public class TenantServiceImpl extends BaseService implements TenantService { return result; } + if (RegexUtils.isNumeric(tenantCode)) { + putMsg(result, Status.CHECK_TENANT_CODE_ERROR); + return result; + } + if (checkTenantExists(tenantCode)) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, tenantCode); return result; @@ -131,8 +137,8 @@ public class TenantServiceImpl extends BaseService implements TenantService { * * @param loginUser login user * @param searchVal search value - * @param pageNo page number - * @param pageSize page size + * @param pageNo page number + * @param pageSize page size * @return tenant list page */ public Map queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { @@ -157,12 +163,12 @@ public class TenantServiceImpl extends BaseService implements TenantService { /** * updateProcessInstance tenant * - * @param loginUser login user - * @param id tennat id + * @param loginUser login user + * @param id tennat id * @param tenantCode tennat code * @param tenantName tennat name - * @param queueId queue id - * @param desc description + * @param queueId queue id + * @param desc description * @return update result code * @throws Exception exception */ @@ -229,7 +235,7 @@ public class TenantServiceImpl extends BaseService implements TenantService { * delete tenant * * @param loginUser login user - * @param id tenant id + * @param id tenant id * @return delete result code * @throws Exception exception */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java new file mode 100644 index 0000000000..9ff7fac463 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.utils; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This is Regex expression utils. + */ +public class RegexUtils { + + /** + * check number regex expression + */ + private static final String CHECK_NUMBER = "^-?\\d+(\\.\\d+)?$"; + + private RegexUtils() { + } + + /** + * check if the input is number + * + * @param str input + * @return + */ + public static boolean isNumeric(String str) { + Pattern pattern = Pattern.compile(CHECK_NUMBER); + Matcher isNum = pattern.matcher(str); + return isNum.matches(); + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index f35ff9509c..9a9fcb0043 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -618,13 +618,13 @@ public class ProcessDefinitionServiceTest { //project check auth success, process not exist putMsg(result, Status.SUCCESS, projectName); - Mockito.when(processDefineMapper.queryByDefineName(project.getId(), "test_pdf")).thenReturn(null); + Mockito.when(processDefineMapper.verifyByDefineName(project.getId(), "test_pdf")).thenReturn(null); Map processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser, "project_test1", "test_pdf"); Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS)); //process exist - Mockito.when(processDefineMapper.queryByDefineName(project.getId(), "test_pdf")).thenReturn(getProcessDefinition()); + Mockito.when(processDefineMapper.verifyByDefineName(project.getId(), "test_pdf")).thenReturn(getProcessDefinition()); Map processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser, "project_test1", "test_pdf"); Assert.assertEquals(Status.PROCESS_INSTANCE_EXIST, processExistRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegexUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegexUtilsTest.java new file mode 100644 index 0000000000..5b62d51b07 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegexUtilsTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.utils; + +import org.junit.Assert; +import org.junit.Test; + +/** + * RegexUtils test case + */ +public class RegexUtilsTest { + + @Test + public void testIsNumeric() { + String num1 = "123467854678"; + boolean numeric = RegexUtils.isNumeric(num1); + Assert.assertTrue(numeric); + + String num2 = "0.0.01"; + boolean numeric2 = RegexUtils.isNumeric(num2); + Assert.assertFalse(numeric2); + } + +} \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java index 86e3172f23..621ec43c27 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java @@ -34,6 +34,15 @@ import com.baomidou.mybatisplus.core.metadata.IPage; */ public interface ProcessDefinitionMapper extends BaseMapper { + /** + * verify process definition by name + * + * @param projectId projectId + * @param name name + * @return process definition + */ + ProcessDefinition verifyByDefineName(@Param("projectId") int projectId, + @Param("processDefinitionName") String name); /** * query process definition by name diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml index 0481f7deab..e0ede93227 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml @@ -18,6 +18,12 @@ + select * from t_ds_process_instance - where 1=1 + where process_definition_id=#{processDefinitionId} and state in @@ -175,7 +175,6 @@ - and process_definition_id=#{processDefinitionId} and (schedule_time = ]]> #{startTime} and schedule_time #{endTime} or start_time = ]]> #{startTime} and start_time #{endTime}) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 38f00fb4fd..c1aea90393 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -17,17 +17,6 @@ package org.apache.dolphinscheduler.remote; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; - import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.Command; @@ -41,19 +30,40 @@ import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore; import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.handler.NettyClientHandler; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy; +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NettyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.InetSocketAddress; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; + /** * remoting netty client */ @@ -162,11 +172,10 @@ public class NettyRemotingClient { .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) .handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new NettyDecoder(), - clientHandler, - encoder); + public void initChannel(SocketChannel ch) { + ch.pipeline() + .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS)) + .addLast(new NettyDecoder(), clientHandler, encoder); } }); this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index ad5c95bb38..867cf4dc56 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.remote.utils.NettyUtils; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,11 +40,11 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; /** * remoting netty server @@ -183,10 +184,11 @@ public class NettyRemotingServer { * @param ch socket channel */ private void initNettyChannel(SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("encoder", encoder); - pipeline.addLast("decoder", new NettyDecoder()); - pipeline.addLast("handler", serverHandler); + ch.pipeline() + .addLast("encoder", encoder) + .addLast("decoder", new NettyDecoder()) + .addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS)) + .addLast("handler", serverHandler); } /** diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index d1ffc65f57..4f477fb467 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -1 +1,105 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +public enum CommandType { + + /** + * remove task log request, + */ + REMOVE_TAK_LOG_REQUEST, + + /** + * remove task log response + */ + REMOVE_TAK_LOG_RESPONSE, + + /** + * roll view log request + */ + ROLL_VIEW_LOG_REQUEST, + + /** + * roll view log response + */ + ROLL_VIEW_LOG_RESPONSE, + + /** + * view whole log request + */ + VIEW_WHOLE_LOG_REQUEST, + + /** + * view whole log response + */ + VIEW_WHOLE_LOG_RESPONSE, + + /** + * get log bytes request + */ + GET_LOG_BYTES_REQUEST, + + /** + * get log bytes response + */ + GET_LOG_BYTES_RESPONSE, + + + WORKER_REQUEST, + MASTER_RESPONSE, + + /** + * execute task request + */ + TASK_EXECUTE_REQUEST, + + /** + * execute task ack + */ + TASK_EXECUTE_ACK, + + /** + * execute task response + */ + TASK_EXECUTE_RESPONSE, + + /** + * kill task + */ + TASK_KILL_REQUEST, + + /** + * kill task response + */ + TASK_KILL_RESPONSE, + + /** + * HEART_BEAT + */ + HEART_BEAT, + + /** + * ping + */ + PING, + + /** + * pong + */ + PONG; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index 48d78d9ad6..a988acfe17 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.handler; -import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -25,16 +25,24 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + /** - * netty client request handler + * netty client request handler */ @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @@ -42,12 +50,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); /** - * netty client + * netty client */ private final NettyRemotingClient nettyRemotingClient; + private static byte[] heartBeatData = "heart_beat".getBytes(); + /** - * callback thread executor + * callback thread executor */ private final ExecutorService callbackExecutor; @@ -57,19 +67,19 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final ConcurrentHashMap> processors; /** - * default executor + * default executor */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); - public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){ + public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) { this.nettyRemotingClient = nettyRemotingClient; this.callbackExecutor = callbackExecutor; this.processors = new ConcurrentHashMap(); } /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle + * When the current channel is not active, + * the current channel has reached the end of its life cycle * * @param ctx channel handler context * @throws Exception @@ -81,7 +91,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } /** - * The current channel reads data from the remote + * The current channel reads data from the remote * * @param ctx channel handler context * @param msg message @@ -89,55 +99,55 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived(ctx.channel(), (Command) msg); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; - if(executorRef == null){ + if (executorRef == null) { executorRef = defaultExecutor; } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); } /** - * process received logic + * process received logic * * @param command command */ private void processReceived(final Channel channel, final Command command) { ResponseFuture future = ResponseFuture.getFuture(command.getOpaque()); - if(future != null){ + if (future != null) { future.setResponseCommand(command); future.release(); - if(future.getInvokeCallback() != null){ + if (future.getInvokeCallback() != null) { this.callbackExecutor.submit(new Runnable() { @Override public void run() { future.executeInvokeCallback(); } }); - } else{ + } else { future.putResponse(command); } - } else{ + } else { processByCommandType(channel, command); } } @@ -163,9 +173,10 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } /** - * caught exception - * @param ctx channel handler context - * @param cause cause + * caught exception + * + * @param ctx channel handler context + * @param cause cause * @throws Exception */ @Override @@ -175,4 +186,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ctx.channel().close(); } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + Command heartBeat = new Command(); + heartBeat.setType(CommandType.HEART_BEAT); + heartBeat.setBody(heartBeatData); + ctx.writeAndFlush(heartBeat) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + + } else { + super.userEventTriggered(ctx, evt); + } + } + } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java index da2a6ff8bf..09e41e9b54 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java @@ -17,22 +17,30 @@ package org.apache.dolphinscheduler.remote.handler; -import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + + /** - * netty server request handler + * netty server request handler */ @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { @@ -40,22 +48,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); /** - * netty remote server + * netty remote server */ private final NettyRemotingServer nettyRemotingServer; /** - * server processors queue + * server processors queue */ private final ConcurrentHashMap> processors = new ConcurrentHashMap(); - public NettyServerHandler(NettyRemotingServer nettyRemotingServer){ + public NettyServerHandler(NettyRemotingServer nettyRemotingServer) { this.nettyRemotingServer = nettyRemotingServer; } /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle + * When the current channel is not active, + * the current channel has reached the end of its life cycle + * * @param ctx channel handler context * @throws Exception */ @@ -73,38 +82,39 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived(ctx.channel(), (Command) msg); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; - if(executorRef == null){ + if (executorRef == null) { executorRef = nettyRemotingServer.getDefaultExecutor(); } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); } /** - * process received logic + * process received logic + * * @param channel channel - * @param msg message + * @param msg message */ private void processReceived(final Channel channel, final Command msg) { final CommandType commandType = msg.getType(); @@ -132,22 +142,22 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { } /** - * caught exception + * caught exception * - * @param ctx channel handler context + * @param ctx channel handler context * @param cause cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("exceptionCaught : {}",cause.getMessage(), cause); + logger.error("exceptionCaught : {}", cause.getMessage(), cause); ctx.channel().close(); } /** - * channel write changed + * channel write changed * - * @param ctx channel handler context + * @param ctx channel handler context * @throws Exception */ @Override @@ -158,16 +168,25 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { if (!ch.isWritable()) { if (logger.isWarnEnabled()) { logger.warn("{} is not writable, over high water level : {}", - ch, config.getWriteBufferHighWaterMark()); + ch, config.getWriteBufferHighWaterMark()); } config.setAutoRead(false); } else { if (logger.isWarnEnabled()) { logger.warn("{} is writable, to low water : {}", - ch, config.getWriteBufferLowWaterMark()); + ch, config.getWriteBufferLowWaterMark()); } config.setAutoRead(true); } } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); + } else { + super.userEventTriggered(ctx, evt); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index 91d4ac245e..866ebb6c2b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.remote.utils; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; - /** * constant */ @@ -30,6 +29,10 @@ public class Constants { public static final String SLASH = "/"; + public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000; + + public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60; + /** * charset */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 6abb381583..49bfb5f9a8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.locks.InterProcessMutex; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -45,309 +47,309 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** - * zookeeper master client - * - * single instance + * zookeeper master client + *

+ * single instance */ @Component public class ZKMasterClient extends AbstractZKClient { - /** - * logger - */ - private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); - - /** - * process service - */ - @Autowired - private ProcessService processService; - - public void start() { - - InterProcessMutex mutex = null; - try { - // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master - String znodeLock = getMasterStartUpLockPath(); - mutex = new InterProcessMutex(getZkClient(), znodeLock); - mutex.acquire(); - - // init system znode - this.initSystemZNode(); - - while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){ - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } - - - // self tolerant - if (getActiveMasterNum() == 1) { - failoverWorker(null, true); - failoverMaster(null); - } - - }catch (Exception e){ - logger.error("master start up exception",e); - }finally { - releaseMutex(mutex); - } - } - - @Override - public void close(){ - super.close(); - } - - /** - * handle path events that this class cares about - * @param client zkClient - * @param event path event - * @param path zk path - */ - @Override - protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - //monitor master - if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){ - handleMasterEvent(event,path); - }else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ - //monitor worker - handleWorkerEvent(event,path); - } - } - - /** - * remove zookeeper node path - * - * @param path zookeeper node path - * @param zkNodeType zookeeper node type - * @param failover is failover - */ - private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { - logger.info("{} node deleted : {}", zkNodeType.toString(), path); - InterProcessMutex mutex = null; - try { - String failoverPath = getFailoverLockPath(zkNodeType); - // create a distributed lock - mutex = new InterProcessMutex(getZkClient(), failoverPath); - mutex.acquire(); - - String serverHost = getHostByEventDataPath(path); - // handle dead server - handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); - //failover server - if(failover){ - failoverServerWhenDown(serverHost, zkNodeType); - } - }catch (Exception e){ - logger.error("{} server failover failed.", zkNodeType.toString()); - logger.error("failover exception ",e); - } - finally { - releaseMutex(mutex); - } - } - - /** - * failover server when server down - * - * @param serverHost server host - * @param zkNodeType zookeeper node type - * @throws Exception exception - */ - private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { - if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(NetUtils.getHost())){ - return ; - } - switch (zkNodeType){ - case MASTER: - failoverMaster(serverHost); - break; - case WORKER: - failoverWorker(serverHost, true); - default: - break; - } - } - - /** - * get failover lock path - * - * @param zkNodeType zookeeper node type - * @return fail over lock path - */ - private String getFailoverLockPath(ZKNodeType zkNodeType){ - - switch (zkNodeType){ - case MASTER: - return getMasterFailoverLockPath(); - case WORKER: - return getWorkerFailoverLockPath(); - default: - return ""; - } - } - - /** - * monitor master - * @param event event - * @param path path - */ - public void handleMasterEvent(TreeCacheEvent event, String path){ - switch (event.getType()) { - case NODE_ADDED: - logger.info("master node added : {}", path); - break; - case NODE_REMOVED: - removeZKNodePath(path, ZKNodeType.MASTER, true); - break; - default: - break; - } - } - - /** - * monitor worker - * @param event event - * @param path path - */ - public void handleWorkerEvent(TreeCacheEvent event, String path){ - switch (event.getType()) { - case NODE_ADDED: - logger.info("worker node added : {}", path); - break; - case NODE_REMOVED: - logger.info("worker node deleted : {}", path); - removeZKNodePath(path, ZKNodeType.WORKER, true); - break; - default: - break; - } - } - - /** - * task needs failover if task start before worker starts - * - * @param taskInstance task instance - * @return true if task instance need fail over - */ - private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { - - boolean taskNeedFailover = true; - - //now no host will execute this task instance,so no need to failover the task - if(taskInstance.getHost() == null){ - return false; - } - - // if the worker node exists in zookeeper, we must check the task starts after the worker - if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){ - //if task start after worker starts, there is no need to failover the task. - if(checkTaskAfterWorkerStart(taskInstance)){ - taskNeedFailover = false; - } - } - return taskNeedFailover; - } - - /** - * check task start after the worker server starts. - * - * @param taskInstance task instance - * @return true if task instance start time after worker server start date - */ - private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { - if(StringUtils.isEmpty(taskInstance.getHost())){ - return false; - } - Date workerServerStartDate = null; - List workerServers = getServersList(ZKNodeType.WORKER); - for(Server workerServer : workerServers){ - if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){ - workerServerStartDate = workerServer.getCreateTime(); - break; - } - } - - if(workerServerStartDate != null){ - return taskInstance.getStartTime().after(workerServerStartDate); - }else{ - return false; - } - } - - /** - * failover worker tasks - * - * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null - * @param workerHost worker host - */ - - /** - * failover worker tasks - * - * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null - * @param workerHost worker host - * @param needCheckWorkerAlive need check worker alive - * @throws Exception exception - */ - private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { - logger.info("start worker[{}] failover ...", workerHost); - - List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); - for(TaskInstance taskInstance : needFailoverTaskInstanceList){ - if(needCheckWorkerAlive){ - if(!checkTaskInstanceNeedFailover(taskInstance)){ - continue; - } - } - - ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - if(processInstance != null){ - taskInstance.setProcessInstance(processInstance); - } - - TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); - - taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); - processService.saveTaskInstance(taskInstance); - } - logger.info("end worker[{}] failover ...", workerHost); - } - - /** - * failover master tasks - * - * @param masterHost master host - */ - private void failoverMaster(String masterHost) { - logger.info("start master failover ..."); - - List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); - - //updateProcessInstance host is null and insert into command - for(ProcessInstance processInstance : needFailoverProcessInstanceList){ - if(Constants.NULL.equals(processInstance.getHost()) ){ - continue; - } - processService.processNeedFailoverProcessInstances(processInstance); - } - - logger.info("master failover end"); - } - - public InterProcessMutex blockAcquireMutex() throws Exception { - InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); - mutex.acquire(); - return mutex; - } - + /** + * logger + */ + private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); + + /** + * process service + */ + @Autowired + private ProcessService processService; + + public void start() { + + InterProcessMutex mutex = null; + try { + // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master + String znodeLock = getMasterStartUpLockPath(); + mutex = new InterProcessMutex(getZkClient(), znodeLock); + mutex.acquire(); + + // init system znode + this.initSystemZNode(); + + while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)) { + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } + + // self tolerant + if (getActiveMasterNum() == 1) { + failoverWorker(null, true); + failoverMaster(null); + } + + } catch (Exception e) { + logger.error("master start up exception", e); + } finally { + releaseMutex(mutex); + } + } + + @Override + public void close() { + super.close(); + } + + /** + * handle path events that this class cares about + * + * @param client zkClient + * @param event path event + * @param path zk path + */ + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + //monitor master + if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) { + handleMasterEvent(event, path); + } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) { + //monitor worker + handleWorkerEvent(event, path); + } + } + + /** + * remove zookeeper node path + * + * @param path zookeeper node path + * @param zkNodeType zookeeper node type + * @param failover is failover + */ + private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { + logger.info("{} node deleted : {}", zkNodeType.toString(), path); + InterProcessMutex mutex = null; + try { + String failoverPath = getFailoverLockPath(zkNodeType); + // create a distributed lock + mutex = new InterProcessMutex(getZkClient(), failoverPath); + mutex.acquire(); + + String serverHost = getHostByEventDataPath(path); + // handle dead server + handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + //failover server + if (failover) { + failoverServerWhenDown(serverHost, zkNodeType); + } + } catch (Exception e) { + logger.error("{} server failover failed.", zkNodeType.toString()); + logger.error("failover exception ", e); + } finally { + releaseMutex(mutex); + } + } + + /** + * failover server when server down + * + * @param serverHost server host + * @param zkNodeType zookeeper node type + * @throws Exception exception + */ + private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { + if (StringUtils.isEmpty(serverHost)) { + return; + } + switch (zkNodeType) { + case MASTER: + failoverMaster(serverHost); + break; + case WORKER: + failoverWorker(serverHost, true); + break; + default: + break; + } + } + + /** + * get failover lock path + * + * @param zkNodeType zookeeper node type + * @return fail over lock path + */ + private String getFailoverLockPath(ZKNodeType zkNodeType) { + + switch (zkNodeType) { + case MASTER: + return getMasterFailoverLockPath(); + case WORKER: + return getWorkerFailoverLockPath(); + default: + return ""; + } + } + + /** + * monitor master + * + * @param event event + * @param path path + */ + public void handleMasterEvent(TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("master node added : {}", path); + break; + case NODE_REMOVED: + removeZKNodePath(path, ZKNodeType.MASTER, true); + break; + default: + break; + } + } + + /** + * monitor worker + * + * @param event event + * @param path path + */ + public void handleWorkerEvent(TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("worker node added : {}", path); + break; + case NODE_REMOVED: + logger.info("worker node deleted : {}", path); + removeZKNodePath(path, ZKNodeType.WORKER, true); + break; + default: + break; + } + } + + /** + * task needs failover if task start before worker starts + * + * @param taskInstance task instance + * @return true if task instance need fail over + */ + private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { + + boolean taskNeedFailover = true; + + //now no host will execute this task instance,so no need to failover the task + if (taskInstance.getHost() == null) { + return false; + } + + // if the worker node exists in zookeeper, we must check the task starts after the worker + if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)) { + //if task start after worker starts, there is no need to failover the task. + if (checkTaskAfterWorkerStart(taskInstance)) { + taskNeedFailover = false; + } + } + return taskNeedFailover; + } + + /** + * check task start after the worker server starts. + * + * @param taskInstance task instance + * @return true if task instance start time after worker server start date + */ + private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { + if (StringUtils.isEmpty(taskInstance.getHost())) { + return false; + } + Date workerServerStartDate = null; + List workerServers = getServersList(ZKNodeType.WORKER); + for (Server workerServer : workerServers) { + if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) { + workerServerStartDate = workerServer.getCreateTime(); + break; + } + } + if (workerServerStartDate != null) { + return taskInstance.getStartTime().after(workerServerStartDate); + } + return false; + } + + /** + * failover worker tasks + * + * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null + * @param workerHost worker host + */ + + /** + * failover worker tasks + *

+ * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null + * + * @param workerHost worker host + * @param needCheckWorkerAlive need check worker alive + * @throws Exception exception + */ + private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { + logger.info("start worker[{}] failover ...", workerHost); + + List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); + for (TaskInstance taskInstance : needFailoverTaskInstanceList) { + if (needCheckWorkerAlive) { + if (!checkTaskInstanceNeedFailover(taskInstance)) { + continue; + } + } + + ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + if (processInstance != null) { + taskInstance.setProcessInstance(processInstance); + } + + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + processService.saveTaskInstance(taskInstance); + } + logger.info("end worker[{}] failover ...", workerHost); + } + + /** + * failover master tasks + * + * @param masterHost master host + */ + private void failoverMaster(String masterHost) { + logger.info("start master failover ..."); + + List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); + + //updateProcessInstance host is null and insert into command + for (ProcessInstance processInstance : needFailoverProcessInstanceList) { + if (Constants.NULL.equals(processInstance.getHost())) { + continue; + } + processService.processNeedFailoverProcessInstances(processInstance); + } + + logger.info("master failover end"); + } + + public InterProcessMutex blockAcquireMutex() throws Exception { + InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); + mutex.acquire(); + return mutex; + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 7344cf13e5..7fca37470d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -896,7 +896,7 @@ public class ProcessService { return task; } if(!task.getState().typeIsFinished()){ - createSubWorkProcessCommand(processInstance, task); + createSubWorkProcess(processInstance, task); } logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", @@ -906,20 +906,22 @@ public class ProcessService { /** * set work process instance map + * consider o + * repeat running does not generate new sub process instance + * set map {parent instance id, task instance id, 0(child instance id)} * @param parentInstance parentInstance * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){ ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); - if(processMap != null){ + if (processMap != null) { return processMap; - }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING - || parentInstance.isComplementData()){ + } + if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) { // update current task id to map - // repeat running does not generate new sub process instance processMap = findPreviousTaskProcessMap(parentInstance, parentTask); - if(processMap!= null){ + if (processMap != null) { processMap.setParentTaskInstanceId(parentTask.getId()); updateWorkProcessInstanceMap(processMap); return processMap; @@ -944,11 +946,11 @@ public class ProcessService { Integer preTaskId = 0; List preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId()); - for(TaskInstance task : preTaskList){ - if(task.getName().equals(parentTask.getName())){ + for (TaskInstance task : preTaskList) { + if (task.getName().equals(parentTask.getName())) { preTaskId = task.getId(); ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); - if(map!=null){ + if (map != null) { return map; } } @@ -960,66 +962,111 @@ public class ProcessService { /** * create sub work process command + * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ - private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, - TaskInstance task){ - if(!task.isSubProcess()){ + public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { + if (!task.isSubProcess()) { return; } - ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); - TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); - Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); - - ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId()); - - CommandType fatherType = parentProcessInstance.getCommandType(); - CommandType commandType = fatherType; - if(childInstance == null){ - String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); - // sub process must begin with schedule/complement data - // if father begin with scheduler/complement data - if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) || - fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){ - commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); - } + //check create sub work flow firstly + ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); + if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) { + // recover failover tolerance would not create a new command when the sub command already have been created + return; } - - if(childInstance != null){ - childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - updateProcessInstance(childInstance); + instanceMap = setProcessInstanceMap(parentProcessInstance, task); + ProcessInstance childInstance = null; + if (instanceMap.getProcessInstanceId() != 0) { + childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); } + Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); + updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId()); + initSubInstanceState(childInstance); + createCommand(subProcessCommand); + logger.info("sub process command created: {} ", subProcessCommand); + } + + /** + * complement data needs transform parent parameter to child. + * @param instanceMap + * @param parentProcessInstance + * @return + */ + private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) { // set sub work process command String processMapStr = JSONUtils.toJsonString(instanceMap); Map cmdParam = JSONUtils.toMap(processMapStr); - - if(commandType == CommandType.COMPLEMENT_DATA || - (childInstance != null && childInstance.isComplementData())){ + if (parentProcessInstance.isComplementData()) { Map parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam()); - String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); - String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); + String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); + String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); processMapStr = JSONUtils.toJsonString(cmdParam); } + return processMapStr; + } - updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId); + /** + * create sub work process command + * @param parentProcessInstance + * @param childInstance + * @param instanceMap + * @param task + */ + public Command createSubProcessCommand(ProcessInstance parentProcessInstance, + ProcessInstance childInstance, + ProcessInstanceMap instanceMap, + TaskInstance task) { + CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); + TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); + Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); + Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); + String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance); + + return new Command( + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + childDefineId, + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + parentProcessInstance.getProcessInstancePriority() + ); + } + + /** + * initialize sub work flow state + * child instance state would be initialized when 'recovery from pause/stop/failure' + * @param childInstance + */ + private void initSubInstanceState(ProcessInstance childInstance) { + if (childInstance != null) { + childInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + updateProcessInstance(childInstance); + } + } - Command command = new Command(); - command.setWarningType(parentProcessInstance.getWarningType()); - command.setWarningGroupId(parentProcessInstance.getWarningGroupId()); - command.setFailureStrategy(parentProcessInstance.getFailureStrategy()); - command.setProcessDefinitionId(childDefineId); - command.setScheduleTime(parentProcessInstance.getScheduleTime()); - command.setExecutorId(parentProcessInstance.getExecutorId()); - command.setCommandParam(processMapStr); - command.setCommandType(commandType); - command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority()); - command.setWorkerGroup(parentProcessInstance.getWorkerGroup()); - createCommand(command); - logger.info("sub process command created: {} ", command.toString()); + /** + * get sub work flow command type + * child instance exist: child command = fatherCommand + * child instance not exists: child command = fatherCommand[0] + * + * @param parentProcessInstance + * @return + */ + private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) { + CommandType commandType = parentProcessInstance.getCommandType(); + if (childInstance == null) { + String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); + commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); + } + return commandType; } /** @@ -1497,20 +1544,6 @@ public class ProcessService { return result; } - /** - * update pid and app links field by task instance id - * @param taskInstId taskInstId - * @param pid pid - * @param appLinks appLinks - */ - public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) { - - TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); - taskInstance.setPid(pid); - taskInstance.setAppLink(appLinks); - saveTaskInstance(taskInstance); - } - /** * query schedule by id * @param id id diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java new file mode 100644 index 0000000000..74b52bb316 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.process; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * process service test + */ +public class ProcessServiceTest { + + @Test + public void testCreateSubCommand() { + ProcessService processService = new ProcessService(); + ProcessInstance parentInstance = new ProcessInstance(); + parentInstance.setProcessDefinitionId(1); + parentInstance.setWarningType(WarningType.SUCCESS); + parentInstance.setWarningGroupId(0); + + TaskInstance task = new TaskInstance(); + task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}"); + task.setId(10); + + ProcessInstance childInstance = null; + ProcessInstanceMap instanceMap = new ProcessInstanceMap(); + instanceMap.setParentProcessInstanceId(1); + instanceMap.setParentTaskInstanceId(10); + Command command = null; + + //father history: start; child null == command type: start + parentInstance.setHistoryCmd("START_PROCESS"); + parentInstance.setCommandType(CommandType.START_PROCESS); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + //father history: start,start failure; child null == command type: start + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + //father history: scheduler,start failure; child null == command type: scheduler + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType()); + + //father history: complement,start failure; child null == command type: complement + + String startString = "2020-01-01 00:00:00"; + String endString = "2020-01-10 00:00:00"; + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS"); + Map complementMap = new HashMap<>(); + complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString); + complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString); + parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap)); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType()); + + JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam()); + Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText()); + Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText()); + Assert.assertEquals(startString, DateUtils.dateToString(start)); + Assert.assertEquals(endString, DateUtils.dateToString(end)); + + //father history: start,failure,start failure; child not null == command type: start failure + childInstance = new ProcessInstance(); + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType()); + } +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js index 2e60929577..18fbd94341 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -236,6 +236,13 @@ const tasksState = { color: '#5102ce', icoUnicode: 'ans-icon-coin', isSpin: false + }, + FORCED_SUCCESS: { + id: 13, + desc: `${i18n.$t('Forced success')}`, + color: '#5102ce', + icoUnicode: 'ans-icon-success-solid', + isSpin: false } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue new file mode 100644 index 0000000000..2b2ed78ccc --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/dependentTimeout.vue @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 7cd63c07db..b6c48fe5e8 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -162,10 +162,18 @@ + + + {{item.duration}} {{item.retryTimes}} + + ` @@ -156,6 +168,17 @@ } }) }, + _forceSuccess (item) { + this.forceTaskSuccess({taskInstanceId: item.id}).then(res => { + if (res.code === 0) { + this.$message.success(res.msg) + } else { + this.$message.error(res.msg) + } + }).catch(e => { + this.$message.error(e.msg) + }) + }, _go (item) { this.$router.push({ path: `/projects/instance/list/${item.processInstanceId}` }) }, diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js index 93fab5a224..b35d07052a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js @@ -734,6 +734,18 @@ export default { }) }) }, + /** + * Force fail/kill/need_fault_tolerance task success + */ + forceTaskSuccess ({ state }, payload) { + return new Promise((resolve, reject) => { + io.post(`projects/${state.projectName}/task-instance/force-success`, payload, res => { + resolve(res) + }).catch(e => { + reject(e) + }) + }) + }, /** * Query task record list */ diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 07dfb7c5a7..0f59aecc5a 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -32,6 +32,7 @@ export default { 'Current node settings': 'Current node settings', 'View history': 'View history', 'View log': 'View log', + 'Force success': 'Force success', 'Enter this child node': 'Enter this child node', 'Node name': 'Node name', 'Run flag': 'Run flag', @@ -426,8 +427,13 @@ export default { 'Timeout alarm': 'Timeout alarm', 'Timeout failure': 'Timeout failure', 'Timeout period': 'Timeout period', + 'Waiting Dependent complete': 'Waiting Dependent complete', + 'Waiting Dependent start': 'Waiting Dependent start', + 'Check interval': 'Check interval', + 'Timeout must be longer than check interval': 'Timeout must be longer than check interval', 'Timeout strategy must be selected': 'Timeout strategy must be selected', 'Timeout must be a positive integer': 'Timeout must be a positive integer', + 'Forced success': 'Forced success', 'Add dependency': 'Add dependency', and: 'and', or: 'or', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index e3f2562f6f..0481cd0137 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -32,6 +32,7 @@ export default { 'Current node settings': '当前节点设置', 'View history': '查看历史', 'View log': '查看日志', + 'Force success': '强制成功', 'Enter this child node': '进入该子节点', 'Node name': '节点名称', 'Please enter name (required)': '请输入名称(必填)', @@ -420,8 +421,12 @@ export default { 'Timeout alarm': '超时告警', 'Timeout failure': '超时失败', 'Timeout period': '超时时长', + 'Waiting Dependent complete': '等待依赖完成', + 'Waiting Dependent start': '等待依赖启动', + 'Check interval': '检查间隔', 'Timeout strategy must be selected': '超时策略必须选一个', 'Timeout must be a positive integer': '超时时长必须为正整数', + 'Timeout must be longer than check interval': '超时时间必须比检查间隔长', 'Add dependency': '添加依赖', and: '且', or: '或', @@ -432,6 +437,7 @@ export default { Running: '正在运行', 'Waiting for dependency to complete': '等待依赖完成', 'Delay execution': '延时执行', + 'Forced success': '强制成功过', Selected: '已选', CurrentHour: '当前小时', Last1Hour: '前1小时', diff --git a/pom.xml b/pom.xml index e895b01d89..9e4934e833 100644 --- a/pom.xml +++ b/pom.xml @@ -853,6 +853,7 @@ **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java **/service/quartz/cron/CronUtilsTest.java + **/service/process/ProcessServiceTest.java **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java **/service/zk/CuratorZookeeperClientTest.java diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql index ae66da914f..f7b3bbcabb 100644 --- a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql @@ -101,7 +101,7 @@ drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version; delimiter d// CREATE PROCEDURE ct_dolphin_T_t_ds_process_definition_version() BEGIN - CREATE TABLE `t_ds_process_definition_version` ( + CREATE TABLE IF NOT EXISTS `t_ds_process_definition_version` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `process_definition_id` int(11) NOT NULL COMMENT 'process definition id', `version` int(11) DEFAULT NULL COMMENT 'process definition version', diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql index 3351cac88c..477cb3bf60 100644 --- a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql @@ -91,7 +91,7 @@ DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool(); delimiter d// CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version() RETURNS void AS $$ BEGIN -CREATE TABLE t_ds_process_definition_version ( +CREATE TABLE IF NOT EXISTS t_ds_process_definition_version ( id int NOT NULL , process_definition_id int NOT NULL , version int DEFAULT NULL , @@ -140,5 +140,3 @@ DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_resources_un(); - -