diff --git a/.github/workflows/ci_e2e.yml b/.github/workflows/ci_e2e.yml index 7f5fc8a989..8070d7e0c5 100644 --- a/.github/workflows/ci_e2e.yml +++ b/.github/workflows/ci_e2e.yml @@ -59,7 +59,7 @@ jobs: sudo dpkg -i google-chrome*.deb sudo apt-get install -f -y google-chrome -version - googleVersion=`google-chrome -version | awk '{print $3}'` + googleVersion=$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE) wget -N https://chromedriver.storage.googleapis.com/${googleVersion}/chromedriver_linux64.zip unzip chromedriver_linux64.zip sudo mv -f chromedriver /usr/local/share/chromedriver diff --git a/docker/build/conf/zookeeper/zoo.cfg b/docker/build/conf/zookeeper/zoo.cfg index 7980d37ae9..94f92d0620 100644 --- a/docker/build/conf/zookeeper/zoo.cfg +++ b/docker/build/conf/zookeeper/zoo.cfg @@ -43,3 +43,5 @@ clientPort=2181 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 +#Four Letter Words commands:stat,ruok,conf,isro +4lw.commands.whitelist=* diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index e30da31ad4..43c03f09d8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -193,7 +193,7 @@ public enum Status { BATCH_COPY_PROCESS_DEFINITION_ERROR(10159, "batch copy process definition error", "复制工作流错误"), BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"), QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"), - + DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10162,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), @@ -294,4 +294,4 @@ public enum Status { return this.enMsg; } } -} +} \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 54f6e9042e..c8d3c74da0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -24,7 +24,6 @@ import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; - import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.enums.Status; @@ -711,4 +710,24 @@ public class ProcessInstanceService extends BaseService { return DagHelper.buildDagGraph(processDag); } + /** + * query process instance by processDefinitionId and stateArray + * @param processDefinitionId processDefinitionId + * @param states states array + * @return process instance list + */ + public List queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) { + return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states); + } + + /** + * query process instance by processDefinitionId + * @param processDefinitionId processDefinitionId + * @param size size + * @return process instance list + */ + public List queryByProcessDefineId(int processDefinitionId,int size) { + return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index 56d40d9cab..bd7598979d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; @@ -139,6 +140,10 @@ public class ResourcesService extends BaseService { } } result.setData(resultMap); + } catch (DuplicateKeyException e) { + logger.error("resource directory {} has exist, can't recreate", fullName); + putMsg(result, Status.RESOURCE_EXIST); + return result; } catch (Exception e) { logger.error("resource already exists, can't recreate ", e); throw new RuntimeException("resource already exists, can't recreate"); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 0d0f309842..a5e297072c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService; +import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.utils.CheckUtils; @@ -64,7 +65,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; @@ -134,7 +134,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements private ProcessDefinitionMapper processDefineMapper; @Autowired - private ProcessInstanceMapper processInstanceMapper; + private ProcessInstanceService processInstanceService; @Autowired private TaskInstanceMapper taskInstanceMapper; @@ -488,6 +488,12 @@ public class ProcessDefinitionServiceImpl extends BaseService implements putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId); return result; } + // check process instances is already running + List processInstances = processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId, Constants.NOT_TERMINATED_STATES); + if (CollectionUtils.isNotEmpty(processInstances)) { + putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL,processInstances.size()); + return result; + } // get the timing according to the process definition List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); @@ -1247,7 +1253,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements /** * List of process instances */ - List processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); + List processInstanceList = processInstanceService.queryByProcessDefineId(processId, limit); for (ProcessInstance processInstance : processInstanceList) { processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime())); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index d791cc3717..33032f54e4 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -41,7 +41,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; @@ -94,7 +93,7 @@ public class ProcessDefinitionServiceTest { private ProcessService processService; @Mock - private ProcessInstanceMapper processInstanceMapper; + private ProcessInstanceService processInstanceService; @Mock private TaskInstanceMapper taskInstanceMapper; @@ -734,7 +733,7 @@ public class ProcessDefinitionServiceTest { //task instance not exist Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); - Mockito.when(processInstanceMapper.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); + Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null); Map taskNullRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index cc75624b60..68d03506c6 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -195,7 +195,7 @@ public class HadoopUtils implements Closeable { */ String appUrl = ""; - if (StringUtils.isEmpty(rmHaIds)){ + if (StringUtils.isEmpty(rmHaIds)) { //single resourcemanager enabled appUrl = appAddress; yarnEnabled = true; @@ -206,7 +206,7 @@ public class HadoopUtils implements Closeable { logger.info("application url : {}", appUrl); } - if(StringUtils.isBlank(appUrl)){ + if (StringUtils.isBlank(appUrl)) { throw new Exception("application url is blank"); } return String.format(appUrl, applicationId); @@ -417,25 +417,33 @@ public class HadoopUtils implements Closeable { String applicationUrl = getApplicationUrl(applicationId); logger.info("applicationUrl={}", applicationUrl); - String responseContent ; - if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) { - responseContent = KerberosHttpClient.get(applicationUrl); - } else { - responseContent = HttpUtils.get(applicationUrl); - } + String responseContent; + if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) { + responseContent = KerberosHttpClient.get(applicationUrl); + } else { + responseContent = HttpUtils.get(applicationUrl); + } if (responseContent != null) { ObjectNode jsonObject = JSONUtils.parseObject(responseContent); + if (!jsonObject.has("app")) { + return ExecutionStatus.FAILURE; + } result = jsonObject.path("app").path("finalStatus").asText(); + } else { //may be in job history String jobHistoryUrl = getJobHistoryUrl(applicationId); logger.info("jobHistoryUrl={}", jobHistoryUrl); responseContent = HttpUtils.get(jobHistoryUrl); - ObjectNode jsonObject = JSONUtils.parseObject(responseContent); - if (!jsonObject.has("job")){ + if (null != responseContent) { + ObjectNode jsonObject = JSONUtils.parseObject(responseContent); + if (!jsonObject.has("job")) { + return ExecutionStatus.FAILURE; + } + result = jsonObject.path("job").path("state").asText(); + } else { return ExecutionStatus.FAILURE; } - result = jsonObject.path("job").path("state").asText(); } switch (result) { @@ -474,7 +482,7 @@ public class HadoopUtils implements Closeable { /** * hdfs resource dir * - * @param tenantCode tenant code + * @param tenantCode tenant code * @param resourceType resource type * @return hdfs resource dir */ @@ -679,7 +687,7 @@ public class HadoopUtils implements Closeable { ObjectNode jsonObject = JSONUtils.parseObject(retStr); //get ResourceManager state - if (!jsonObject.has("clusterInfo")){ + if (!jsonObject.has("clusterInfo")) { return null; } return jsonObject.get("clusterInfo").path("haState").asText(); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index b7bd081cfe..8048fda812 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -14,19 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; + import org.apache.ibatis.annotations.Param; import java.util.Date; import java.util.List; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + /** * process instance mapper interface */ @@ -201,9 +204,20 @@ public interface ProcessInstanceMapper extends BaseMapper { * @param endTime * @return ProcessInstance list */ + List queryTopNProcessInstance(@Param("size") int size, @Param("startTime") Date startTime, @Param("endTime") Date endTime, @Param("status")ExecutionStatus status); + /** + * query process instance by processDefinitionId and stateArray + * @param processDefinitionId processDefinitionId + * @param states states array + * @return process instance list + */ + + List queryByProcessDefineIdAndStatus( + @Param("processDefinitionId") int processDefinitionId, + @Param("states") int[] states); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index bbc331a67d..831c4a9c23 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -191,6 +191,16 @@ order by end_time desc limit 1 - + \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 92e3cae857..38f00fb4fd 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -27,7 +27,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; @@ -155,11 +154,12 @@ public class NettyRemotingClient { this.bootstrap .group(this.workerGroup) - .channel(NioSocketChannel.class) + .channel(NettyUtils.getSocketChannelClass()) .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index e96f22aafc..ad5c95bb38 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -17,17 +17,6 @@ package org.apache.dolphinscheduler.remote; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; - import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -37,15 +26,25 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NettyUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; + /** * remoting netty server */ @@ -146,17 +145,17 @@ public class NettyRemotingServer { if (isStarted.compareAndSet(false, true)) { this.serverBootstrap .group(this.bossGroup, this.workGroup) - .channel(NioServerSocketChannel.class) + .channel(NettyUtils.getServerSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) - .childHandler(new ChannelInitializer() { + .childHandler(new ChannelInitializer() { @Override - protected void initChannel(NioSocketChannel ch) throws Exception { + protected void initChannel(SocketChannel ch) throws Exception { initNettyChannel(ch); } }); @@ -182,9 +181,8 @@ public class NettyRemotingServer { * init netty channel * * @param ch socket channel - * @throws Exception */ - private void initNettyChannel(NioSocketChannel ch) throws Exception { + private void initNettyChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder", encoder); pipeline.addLast("decoder", new NettyDecoder()); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java index 831e05f7e7..739cbbebe1 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java @@ -14,22 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.config; import org.apache.dolphinscheduler.remote.utils.Constants; /** - * netty client config + * netty client config */ public class NettyClientConfig { /** - * worker threads,default get machine cpus + * worker threads,default get machine cpus */ private int workerThreads = Constants.CPUS; /** - * whether tpc delay + * whether tpc delay */ private boolean tcpNoDelay = true; @@ -39,15 +40,20 @@ public class NettyClientConfig { private boolean soKeepalive = true; /** - * send buffer size + * send buffer size */ private int sendBufferSize = 65535; /** - * receive buffer size + * receive buffer size */ private int receiveBufferSize = 65535; + /** + * connect timeout millis + */ + private int connectTimeoutMillis = 3000; + public int getWorkerThreads() { return workerThreads; } @@ -88,4 +94,11 @@ public class NettyClientConfig { this.receiveBufferSize = receiveBufferSize; } + public int getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + public void setConnectTimeoutMillis(int connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index 297d4041a7..370467f6ca 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.utils; import java.nio.charset.Charset; @@ -21,7 +22,7 @@ import java.nio.charset.StandardCharsets; /** - * constant + * constant */ public class Constants { @@ -30,12 +31,12 @@ public class Constants { public static final String SLASH = "/"; /** - * charset + * charset */ public static final Charset UTF8 = StandardCharsets.UTF_8; /** - * cpus + * cpus */ public static final int CPUS = Runtime.getRuntime().availableProcessors(); @@ -45,7 +46,7 @@ public class Constants { /** * netty epoll enable switch */ - public static final String NETTY_EPOLL_ENABLE = System.getProperty("netty.epoll.enable"); + public static final String NETTY_EPOLL_ENABLE = System.getProperty("netty.epoll.enable", "true"); /** * OS Name diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java index 3e242bd95d..89eb1f9607 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java @@ -18,6 +18,12 @@ package org.apache.dolphinscheduler.remote.utils; import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; /** * NettyUtils @@ -39,4 +45,18 @@ public class NettyUtils { return Boolean.parseBoolean(enableNettyEpoll); } + public static Class getServerSocketChannelClass() { + if (useEpoll()) { + return EpollServerSocketChannel.class; + } + return NioServerSocketChannel.class; + } + + public static Class getSocketChannelClass() { + if (useEpoll()) { + return EpollSocketChannel.class; + } + return NioSocketChannel.class; + } + } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java index f5f60dc735..e95dbddac9 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java @@ -17,20 +17,28 @@ package org.apache.dolphinscheduler.remote; +import static org.apache.dolphinscheduler.remote.utils.Constants.OS_NAME; + import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.junit.Assert; import org.junit.Test; +import io.netty.channel.epoll.Epoll; + /** * NettyUtilTest */ public class NettyUtilTest { + @Test public void testUserEpoll() { - System.setProperty("netty.epoll.enable", "false"); - Assert.assertFalse(NettyUtils.useEpoll()); + if (OS_NAME.toLowerCase().contains("linux") && Epoll.isAvailable()) { + Assert.assertTrue(NettyUtils.useEpoll()); + } else { + Assert.assertFalse(NettyUtils.useEpoll()); + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java index 6878d5a8b1..021f10d444 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.utils.LogUtils; + import org.slf4j.LoggerFactory; import java.util.ArrayList; @@ -123,7 +125,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { } private void initTaskParameters() { - this.taskInstance.setLogPath(getTaskLogPath(taskInstance)); + this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance)); this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java index cfe885cbfa..319afedd7b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java @@ -14,9 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.runner; -import com.fasterxml.jackson.annotation.JsonFormat; +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -24,16 +26,22 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DependentUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.DependentExecute; -import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonFormat; public class DependentTaskExecThread extends MasterBaseTaskExecThread { @@ -172,7 +180,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { } private void initTaskParameters() { - taskInstance.setLogPath(getTaskLogPath(taskInstance)); + taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance)); taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index cf5359d579..ea3ad19950 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -14,17 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.runner; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -36,10 +35,6 @@ import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; - - /** * master task exec base class */ @@ -85,11 +80,13 @@ public class MasterBaseTaskExecThread implements Callable { * taskUpdateQueue */ private TaskPriorityQueue taskUpdateQueue; + /** * constructor of MasterBaseTaskExecThread - * @param taskInstance task instance + * + * @param taskInstance task instance */ - public MasterBaseTaskExecThread(TaskInstance taskInstance){ + public MasterBaseTaskExecThread(TaskInstance taskInstance) { this.processService = SpringApplicationContext.getBean(ProcessService.class); this.alertDao = SpringApplicationContext.getBean(AlertDao.class); this.cancel = false; @@ -100,24 +97,26 @@ public class MasterBaseTaskExecThread implements Callable { /** * get task instance + * * @return TaskInstance */ - public TaskInstance getTaskInstance(){ + public TaskInstance getTaskInstance() { return this.taskInstance; } /** * kill master base task exec thread */ - public void kill(){ + public void kill() { this.cancel = true; } /** * submit master base task exec thread + * * @return TaskInstance */ - protected TaskInstance submit(){ + protected TaskInstance submit() { Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes(); Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); @@ -156,14 +155,13 @@ public class MasterBaseTaskExecThread implements Callable { } - /** * dispatcht task + * * @param taskInstance taskInstance * @return whether submit task success */ public Boolean dispatchTask(TaskInstance taskInstance) { - try{ if(taskInstance.isConditionsTask() || taskInstance.isDependTask() @@ -202,7 +200,7 @@ public class MasterBaseTaskExecThread implements Callable { /** - * buildTaskPriorityInfo + * buildTaskPriorityInfo * * @param processInstancePriority processInstancePriority * @param processInstanceId processInstanceId @@ -215,7 +213,7 @@ public class MasterBaseTaskExecThread implements Callable { int processInstanceId, int taskInstancePriority, int taskInstanceId, - String workerGroup){ + String workerGroup) { return processInstancePriority + UNDERLINE + processInstanceId + @@ -229,14 +227,16 @@ public class MasterBaseTaskExecThread implements Callable { /** * submit wait complete + * * @return true */ - protected Boolean submitWaitComplete(){ + protected Boolean submitWaitComplete() { return true; } /** * call + * * @return boolean * @throws Exception exception */ @@ -246,34 +246,4 @@ public class MasterBaseTaskExecThread implements Callable { return submitWaitComplete(); } - /** - * get task log path - * @return log path - */ - public String getTaskLogPath(TaskInstance task) { - String logPath; - try{ - String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) - .getLogger("ROOT") - .getAppender("TASKLOGFILE")) - .getDiscriminator()).getLogBase(); - if (baseLog.startsWith(Constants.SINGLE_SLASH)){ - logPath = baseLog + Constants.SINGLE_SLASH + - task.getProcessDefinitionId() + Constants.SINGLE_SLASH + - task.getProcessInstanceId() + Constants.SINGLE_SLASH + - task.getId() + ".log"; - }else{ - logPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH + - baseLog + Constants.SINGLE_SLASH + - task.getProcessDefinitionId() + Constants.SINGLE_SLASH + - task.getProcessInstanceId() + Constants.SINGLE_SLASH + - task.getId() + ".log"; - } - }catch (Exception e){ - logger.error("logger", e); - logPath = ""; - } - return logPath; - } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java new file mode 100644 index 0000000000..bb8ddc85de --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +import javax.transaction.NotSupportedException; + +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.sift.SiftingAppender; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.AppenderAttachable; + +public class LogUtils { + + private LogUtils() throws NotSupportedException { + throw new NotSupportedException(); + } + + /** + * get task log path + */ + @SuppressWarnings("unchecked") + private static String getTaskLogPath(int processDefinitionId, int processInstanceId, int taskInstanceId) { + // Optional.map will be skipped if null + return Optional.of(LoggerFactory.getILoggerFactory()) + .map(e -> (AppenderAttachable) (e.getLogger("ROOT"))) + .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE"))) + .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator()))) + .map(TaskLogDiscriminator::getLogBase) + .map(e -> Paths.get(e) + .toAbsolutePath() + .resolve(String.valueOf(processDefinitionId)) + .resolve(String.valueOf(processInstanceId)) + .resolve(taskInstanceId + ".log")) + .map(Path::toString) + .orElse(""); + } + + /** + * get task log path by TaskInstance + */ + public static String getTaskLogPath(TaskInstance taskInstance) { + return getTaskLogPath(taskInstance.getProcessDefinitionId(), taskInstance.getProcessInstanceId(), taskInstance.getId()); + } + + /** + * get task log path by TaskExecutionContext + */ + public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) { + return getTaskLogPath(taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 2dc9bb2ce9..3717ce37ae 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.worker.processor; - -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -36,7 +34,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; +import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -51,8 +49,6 @@ import org.slf4j.LoggerFactory; import com.github.rholder.retry.RetryException; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; import io.netty.channel.Channel; /** @@ -154,28 +150,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } } - /** - * get task log path - * @return log path - */ - private String getTaskLogPath(TaskExecutionContext taskExecutionContext) { - String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) - .getLogger("ROOT") - .getAppender("TASKLOGFILE")) - .getDiscriminator()).getLogBase(); - if (baseLog.startsWith(Constants.SINGLE_SLASH)){ - return baseLog + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskExecutionContext.getTaskInstanceId() + ".log"; - } - return System.getProperty("user.dir") + Constants.SINGLE_SLASH + - baseLog + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskExecutionContext.getTaskInstanceId() + ".log"; - } - /** * build ack command * @param taskExecutionContext taskExecutionContext @@ -185,7 +159,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); - ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); + ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setStartTime(taskExecutionContext.getStartTime()); if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index f29691e9bb..f6fdfaab63 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -17,14 +17,15 @@ package org.apache.dolphinscheduler.server.master.runner; -import java.util.HashSet; -import java.util.Set; - import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.HashSet; +import java.util.Set; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java new file mode 100644 index 0000000000..02cca399ed --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; + +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.sift.SiftingAppender; + +@RunWith(MockitoJUnitRunner.class) +public class LogUtilsTest { + + @Test + public void testGetTaskLogPath() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(100); + taskInstance.setId(1000); + + Logger rootLogger = (Logger) LoggerFactory.getILoggerFactory().getLogger("ROOT"); + Assert.assertNotNull(rootLogger); + + SiftingAppender appender = Mockito.mock(SiftingAppender.class); + // it's a trick to mock logger.getAppend("TASKLOGFILE") + Mockito.when(appender.getName()).thenReturn("TASKLOGFILE"); + rootLogger.addAppender(appender); + + Path logBase = Paths.get("path").resolve("to").resolve("test"); + + TaskLogDiscriminator taskLogDiscriminator = Mockito.mock(TaskLogDiscriminator.class); + Mockito.when(taskLogDiscriminator.getLogBase()).thenReturn(logBase.toString()); + Mockito.when(appender.getDiscriminator()).thenReturn(taskLogDiscriminator); + + Path logPath = Paths.get(".").toAbsolutePath().getParent() + .resolve(logBase) + .resolve("1").resolve("100").resolve("1000.log"); + Assert.assertEquals(logPath.toString(), LogUtils.getTaskLogPath(taskInstance)); + } + +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 8444863aea..7cd63c07db 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -277,6 +277,12 @@ :backfill-item="backfillItem" :pre-node="preNode"> + +
@@ -310,6 +316,7 @@ import mSelectInput from './_source/selectInput' import mTimeoutAlarm from './_source/timeoutAlarm' import mWorkerGroups from './_source/workerGroups' + import mPreTasks from './tasks/pre_tasks' import clickoutside from '@/module/util/clickoutside' import disabledState from '@/module/mixin/disabledState' import { isNameExDag, rtBantpl } from './../plugIn/util' @@ -369,7 +376,11 @@ value: 'failed', label: `${i18n.$t('failed')}` } - ] + ], + // preTasks + preTaskIdsInWorkflow: [], + preTasksToAdd: [], // pre-taskIds to add, used in jsplumb connects + preTasksToDelete: [], // pre-taskIds to delete, used in jsplumb connects } }, /** @@ -393,6 +404,14 @@ _onDependent (o) { this.dependence = Object.assign(this.dependence, {}, o) }, + /** + * Pre-tasks in workflow + */ + _onPreTasks (o) { + this.preTaskIdsInWorkflow = o.preTasks + this.preTasksToAdd = o.preTasksToAdd + this.preTasksToDelete = o.preTasksToDelete + }, /** * cache dependent */ @@ -543,6 +562,43 @@ if (!this.$refs[this.taskType]._verification()) { return } + // Verify preTasks and update dag-things + if (this.$refs['PRE_TASK']) { + if (!this.$refs['PRE_TASK']._verification()) { + return + } + else { + // Sync data-targetarr + $(`#${this.id}`).attr( + 'data-targetarr', this.preTaskIdsInWorkflow ? this.preTaskIdsInWorkflow.join(',') : '') + + // Update JSP connections + let plumbIns = JSP.JspInstance + var targetId = this.id + + // Update new connections + this.preTasksToAdd.map(sourceId => { + plumbIns.connect({ + source: sourceId, + target: targetId, + type: 'basic', + paintStyle: { strokeWidth: 2, stroke: '#2d8cf0' }, + HoverPaintStyle: {stroke: '#ccc', strokeWidth: 3} + }) + }) + + // Update remove connections + let currentConnects = plumbIns.getAllConnections() + let len = currentConnects.length + for (let i = 0; i < len; i++) { + if (this.preTasksToDelete.indexOf(currentConnects[i].sourceId) > -1 && currentConnects[i].targetId == targetId) { + plumbIns.deleteConnection(currentConnects[i]) + i -= 1 + len -= 1 + } + } + } + } $(`#${this.id}`).find('span').text(this.name) this.conditionResult.successNode[0] = this.successBranch @@ -684,6 +740,16 @@ } this.cacheBackfillItem = JSON.parse(JSON.stringify(o)) this.isContentBox = true + + // Init value of preTask selector + let preTaskIds = $(`#${this.id}`).attr('data-targetarr') + if (!_.isEmpty(this.backfillItem)) { + if (preTaskIds && preTaskIds.length) { + this.backfillItem.preTasks = preTaskIds.split(',') + } else { + this.backfillItem.preTasks = [] + } + } }, mounted () { let self = this @@ -745,7 +811,8 @@ mSelectInput, mTimeoutAlarm, mPriority, - mWorkerGroups + mWorkerGroups, + mPreTasks, } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/pre_tasks.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/pre_tasks.vue new file mode 100644 index 0000000000..adf889e958 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/pre_tasks.vue @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/versions.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/versions.vue index 1294f48538..fcc3410080 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/versions.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/versions.vue @@ -25,13 +25,10 @@ - - - - - @@ -64,7 +58,7 @@ + width="260">

{{$t('Confirm Switch To This Version?')}}

@@ -195,18 +189,14 @@ * Close the switch version layer */ _closeSwitchVersion (i) { - if (i > 0) { - this.$refs[`poptip-switch-version-${i}`][0].doClose() - } + this.$refs[`poptip-switch-version-${i}`][0].doClose() }, /** * Close the delete layer */ _closeDelete (i) { - if (i > 0) { - this.$refs[`poptip-delete-${i}`][0].doClose() - } + this.$refs[`poptip-delete-${i}`][0].doClose() }, /** diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index b0f315a539..07dfb7c5a7 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -641,5 +641,7 @@ export default { 'Batch copy': 'Batch copy', 'Related items': 'Related items', 'Project name is required': 'Project name is required', - 'Batch move': 'Batch move' + 'Batch move': 'Batch move', + Version: 'Version', + 'Pre tasks': 'Pre tasks', } diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 53f0e3e483..e3f2562f6f 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -641,5 +641,7 @@ export default { 'Batch copy': '批量复制', 'Related items': '关联项目', 'Project name is required': '项目名称必填', - 'Batch move': '批量移动' + 'Batch move': '批量移动', + Version: '版本', + 'Pre tasks': '前置任务', } diff --git a/pom.xml b/pom.xml index ef57a3d180..0321c1a24c 100644 --- a/pom.xml +++ b/pom.xml @@ -833,6 +833,7 @@ **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java + **/server/utils/LogUtilsTest.java **/server/utils/ParamUtilsTest.java **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql index 1fedf05cfd..5ae37e1be8 100644 --- a/sql/dolphinscheduler-postgre.sql +++ b/sql/dolphinscheduler-postgre.sql @@ -523,7 +523,8 @@ CREATE TABLE t_ds_resources ( pid int, full_name varchar(64), is_directory int, - PRIMARY KEY (id) + PRIMARY KEY (id), + CONSTRAINT t_ds_resources_un UNIQUE (full_name, type) ) ; diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index 70bb7cddfc..61e697568a 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -657,7 +657,8 @@ CREATE TABLE `t_ds_resources` ( `pid` int(11) DEFAULT NULL, `full_name` varchar(64) DEFAULT NULL, `is_directory` tinyint(4) DEFAULT NULL, - PRIMARY KEY (`id`) + PRIMARY KEY (`id`), + UNIQUE KEY `t_ds_resources_un` (`full_name`,`type`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; -- ---------------------------- diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql index e9f8b5b6b5..43488272e2 100644 --- a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql @@ -87,3 +87,29 @@ delimiter ; CALL ct_dolphin_T_t_ds_process_definition_version; DROP PROCEDURE ct_dolphin_T_t_ds_process_definition_version; + + + +-- add t_ds_resources_un +DROP PROCEDURE IF EXISTS uc_dolphin_T_t_ds_resources_un; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_resources_un() +BEGIN + IF NOT EXISTS ( + SELECT * FROM information_schema.KEY_COLUMN_USAGE + WHERE TABLE_NAME = 't_ds_resources' + AND CONSTRAINT_NAME = 't_ds_resources_un' + ) + THEN + ALTER TABLE t_ds_resources ADD CONSTRAINT t_ds_resources_un UNIQUE KEY (full_name,`type`); + END IF; +END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_resources_un(); +DROP PROCEDURE IF EXISTS uc_dolphin_T_t_ds_resources_un; + + + diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql index 52045f61f0..e2767617df 100644 --- a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql @@ -79,4 +79,30 @@ d// delimiter ; SELECT ct_dolphin_T_t_ds_process_definition_version(); -DROP FUNCTION IF EXISTS ct_dolphin_T_t_ds_process_definition_version(); \ No newline at end of file +DROP FUNCTION IF EXISTS ct_dolphin_T_t_ds_process_definition_version(); + + + + +-- add t_ds_resources_un +CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_resources_un() RETURNS void AS $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.KEY_COLUMN_USAGE + WHERE TABLE_NAME = 't_ds_resources' + AND CONSTRAINT_NAME = 't_ds_resources_un' + ) + THEN +ALTER TABLE t_ds_resources ADD CONSTRAINT t_ds_resources_un UNIQUE (full_name,"type"); +END IF; +END; +$$ LANGUAGE plpgsql; + +SELECT uc_dolphin_T_t_ds_resources_un(); +DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_resources_un(); + + + + + +
- # + + {{$t('Version')}} - Version - + {{$t('Description')}} @@ -42,9 +39,6 @@
- - - {{item.version}} {{$t('Current Version')}} @@ -52,7 +46,7 @@ - + {{item.description}} -