diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java index 809ed45d77..18031acd28 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.dao.MonitorDBDao; -import org.apache.dolphinscheduler.common.model.MasterServer; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.MonitorRecord; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; @@ -65,7 +65,7 @@ public class MonitorService extends BaseService{ Map result = new HashMap<>(5); - List masterServers = getServerListFromZK(true); + List masterServers = getServerListFromZK(true); result.put(Constants.DATA_LIST, masterServers); putMsg(result,Status.SUCCESS); @@ -99,16 +99,16 @@ public class MonitorService extends BaseService{ public Map queryWorker(User loginUser) { Map result = new HashMap<>(5); - List workerServers = getServerListFromZK(false); + List masterServers = getServerListFromZK(false); - result.put(Constants.DATA_LIST, workerServers); + result.put(Constants.DATA_LIST, masterServers); putMsg(result,Status.SUCCESS); return result; } - public List getServerListFromZK(boolean isMaster){ - List servers = new ArrayList<>(); + public List getServerListFromZK(boolean isMaster){ + List servers = new ArrayList<>(); ZookeeperMonitor zookeeperMonitor = null; try{ zookeeperMonitor = new ZookeeperMonitor(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java index a0eed0127a..c1a2835b46 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.model.MasterServer; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.ProcessDao; @@ -334,7 +334,7 @@ public class SchedulerService extends BaseService { } // check master server exists - List masterServers = monitorService.getServerListFromZK(true); + List masterServers = monitorService.getServerListFromZK(true); if (masterServers.size() == 0) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java index 1ccd77e201..3870c101d2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.api.utils; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; -import org.apache.dolphinscheduler.common.model.MasterServer; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -55,7 +55,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ * get master servers * @return */ - public List getMasterServers(){ + public List getMasterServers(){ return getServersList(ZKNodeType.MASTER); } @@ -63,7 +63,7 @@ public class ZookeeperMonitor extends AbstractZKClient{ * master construct is the same with worker, use the master instead * @return */ - public List getWorkerServers(){ + public List getWorkerServers(){ return getServersList(ZKNodeType.WORKER); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitorUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitorUtilsTest.java index d04fea34c7..b3626fa8a9 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitorUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitorUtilsTest.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.api.utils; -import org.apache.dolphinscheduler.common.model.MasterServer; +import org.apache.dolphinscheduler.common.model.Server; import org.junit.Assert; import org.junit.Test; import java.util.List; @@ -33,9 +33,9 @@ public class ZookeeperMonitorUtilsTest { ZookeeperMonitor zookeeperMonitor = new ZookeeperMonitor(); - List masterServerList = zookeeperMonitor.getMasterServers(); + List masterServerList = zookeeperMonitor.getMasterServers(); - List workerServerList = zookeeperMonitor.getWorkerServers(); + List workerServerList = zookeeperMonitor.getWorkerServers(); Assert.assertTrue(masterServerList.size() >= 0); Assert.assertTrue(workerServerList.size() >= 0); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 5eff895841..4d4e3127e0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -587,7 +587,7 @@ public final class Constants { /** * heartbeat for zk info length */ - public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 6; + public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 7; /** @@ -696,6 +696,12 @@ public final class Constants { public static final String SPARK_QUEUE = "--queue"; + /** + * --queue --qu + */ + public static final String FLINK_QUEUE = "--qu"; + + /** * exit code success */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterServer.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java similarity index 98% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterServer.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java index cf5ae5f8c2..29c96c147f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterServer.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java @@ -20,9 +20,9 @@ package org.apache.dolphinscheduler.common.model; import java.util.Date; /** - * master server + * server */ -public class MasterServer { +public class Server { /** * id diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java index e83dba71fb..d8fd69a0b1 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.model.MasterServer; +import org.apache.dolphinscheduler.common.model.Server; import java.util.Date; @@ -49,8 +49,7 @@ public class ResInfo { } public ResInfo(double cpuUsage, double memoryUsage, double loadAverage) { - this.cpuUsage = cpuUsage; - this.memoryUsage = memoryUsage; + this(cpuUsage,memoryUsage); this.loadAverage = loadAverage; } @@ -80,30 +79,25 @@ public class ResInfo { /** * get CPU and memory usage - * add cpu load average by lidong for service monitor * @return */ - public static String getResInfoJson(){ - ResInfo resInfo = new ResInfo(OSUtils.cpuUsage(), OSUtils.memoryUsage(),OSUtils.loadAverage()); + public static String getResInfoJson(double cpuUsage , double memoryUsage,double loadAverage){ + ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage); return JSONUtils.toJson(resInfo); } /** - * get CPU and memory usage + * get heart beat info + * @param now * @return */ - public static String getResInfoJson(double cpuUsage , double memoryUsage){ - ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage); - return JSONUtils.toJson(resInfo); - } - - public static String getHeartBeatInfo(Date now){ return buildHeartbeatForZKInfo(OSUtils.getHost(), OSUtils.getProcessID(), OSUtils.cpuUsage(), OSUtils.memoryUsage(), + OSUtils.loadAverage(), DateUtils.dateToString(now), DateUtils.dateToString(now)); @@ -115,17 +109,19 @@ public class ResInfo { * @param port * @param cpuUsage * @param memoryUsage + * @param loadAverage * @param createTime * @param lastHeartbeatTime * @return */ public static String buildHeartbeatForZKInfo(String host , int port , - double cpuUsage , double memoryUsage, + double cpuUsage , double memoryUsage,double loadAverage, String createTime,String lastHeartbeatTime){ return host + Constants.COMMA + port + Constants.COMMA + cpuUsage + Constants.COMMA + memoryUsage + Constants.COMMA + + loadAverage + Constants.COMMA + createTime + Constants.COMMA + lastHeartbeatTime; } @@ -135,19 +131,22 @@ public class ResInfo { * @param heartBeatInfo * @return */ - public static MasterServer parseHeartbeatForZKInfo(String heartBeatInfo){ - MasterServer masterServer = null; + public static Server parseHeartbeatForZKInfo(String heartBeatInfo){ + Server masterServer = null; String[] masterArray = heartBeatInfo.split(Constants.COMMA); - if(masterArray.length != 6){ + if(masterArray == null || + masterArray.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ return masterServer; } - masterServer = new MasterServer(); + masterServer = new Server(); masterServer.setHost(masterArray[0]); masterServer.setPort(Integer.parseInt(masterArray[1])); - masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[2]), Double.parseDouble(masterArray[3]))); - masterServer.setCreateTime(DateUtils.stringToDate(masterArray[4])); - masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[5])); + masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[2]), + Double.parseDouble(masterArray[3]), + Double.parseDouble(masterArray[4]))); + masterServer.setCreateTime(DateUtils.stringToDate(masterArray[5])); + masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[6])); return masterServer; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java index ec33575670..a491ead969 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.common.zk; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.ZKNodeType; -import org.apache.dolphinscheduler.common.model.MasterServer; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ResInfo; @@ -158,10 +158,12 @@ public abstract class AbstractZKClient { if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ return; } - String str = splits[0] + Constants.COMMA +splits[1] + Constants.COMMA + String str = splits[0] + Constants.COMMA + + splits[1] + Constants.COMMA + OSUtils.cpuUsage() + Constants.COMMA + OSUtils.memoryUsage() + Constants.COMMA - + splits[4] + Constants.COMMA + + OSUtils.loadAverage() + Constants.COMMA + + splits[5] + Constants.COMMA + DateUtils.dateToString(new Date()); zkClient.setData().forPath(znode,str.getBytes()); @@ -342,14 +344,14 @@ public abstract class AbstractZKClient { * @param zkNodeType * @return */ - public List getServersList(ZKNodeType zkNodeType){ + public List getServersList(ZKNodeType zkNodeType){ Map masterMap = getServerMaps(zkNodeType); String parentPath = getZNodeParentPath(zkNodeType); - List masterServers = new ArrayList<>(); + List masterServers = new ArrayList<>(); int i = 0; for (Map.Entry entry : masterMap.entrySet()) { - MasterServer masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); + Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); masterServer.setZkDirectory( parentPath + "/"+ entry.getKey()); masterServer.setId(i); i ++; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java index daa6ca97ef..9277496763 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.commons.lang.StringUtils; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -32,46 +33,58 @@ import java.util.List; public class FlinkArgsUtils { /** - * build args + * build args + * * @param param * @return */ + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(FlinkArgsUtils.class); + public static List buildArgs(FlinkParameters param) { List args = new ArrayList<>(); + String deployMode = "cluster"; + if (StringUtils.isNotEmpty(param.getDeployMode())) { + deployMode = param.getDeployMode(); - args.add(Constants.FLINK_RUN_MODE); //-m + } + if (!"local".equals(deployMode)) { + args.add(Constants.FLINK_RUN_MODE); //-m - args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster + args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster - if (param.getSlot() != 0) { - args.add(Constants.FLINK_YARN_SLOT); - args.add(String.format("%d", param.getSlot())); //-ys - } - if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm - args.add(Constants.FLINK_APP_NAME); - args.add(param.getAppName()); - } + if (param.getSlot() != 0) { + args.add(Constants.FLINK_YARN_SLOT); + args.add(String.format("%d", param.getSlot())); //-ys + } - if (param.getTaskManager() != 0) { //-yn - args.add(Constants.FLINK_TASK_MANAGE); - args.add(String.format("%d", param.getTaskManager())); - } + if (StringUtils.isNotEmpty(param.getAppName())) { //-ynm + args.add(Constants.FLINK_APP_NAME); + args.add(param.getAppName()); + } - if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { - args.add(Constants.FLINK_JOB_MANAGE_MEM); - args.add(param.getJobManagerMemory()); //-yjm - } + if (param.getTaskManager() != 0) { //-yn + args.add(Constants.FLINK_TASK_MANAGE); + args.add(String.format("%d", param.getTaskManager())); + } - if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm - args.add(Constants.FLINK_TASK_MANAGE_MEM); - args.add(param.getTaskManagerMemory()); - } - args.add(Constants.FLINK_detach); //-d + if (StringUtils.isNotEmpty(param.getJobManagerMemory())) { + args.add(Constants.FLINK_JOB_MANAGE_MEM); + args.add(param.getJobManagerMemory()); //-yjm + } + if (StringUtils.isNotEmpty(param.getTaskManagerMemory())) { // -ytm + args.add(Constants.FLINK_TASK_MANAGE_MEM); + args.add(param.getTaskManagerMemory()); + } + + args.add(Constants.FLINK_detach); //-d + + + } - if(param.getProgramType() !=null ){ - if(param.getProgramType()!=ProgramType.PYTHON){ + if (param.getProgramType() != null) { + if (param.getProgramType() != ProgramType.PYTHON) { if (StringUtils.isNotEmpty(param.getMainClass())) { args.add(Constants.FLINK_MAIN_CLASS); //-c args.add(param.getMainClass()); //main class @@ -83,28 +96,29 @@ public class FlinkArgsUtils { args.add(param.getMainJar().getRes()); } + if (StringUtils.isNotEmpty(param.getMainArgs())) { + args.add(param.getMainArgs()); + } // --files --conf --libjar ... - if (StringUtils.isNotEmpty(param.getOthers())) { + if (StringUtils.isNotEmpty(param.getOthers())) { String others = param.getOthers(); - if(!others.contains("--queue")){ - if (StringUtils.isNotEmpty(param.getQueue())) { - args.add(Constants.SPARK_QUEUE); + if (!others.contains("--qu")) { + if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { + args.add(Constants.FLINK_QUEUE); args.add(param.getQueue()); } } args.add(param.getOthers()); - }else if (StringUtils.isNotEmpty(param.getQueue())) { - args.add(Constants.SPARK_QUEUE); + } else if (StringUtils.isNotEmpty(param.getQueue()) && !deployMode.equals("local")) { + args.add(Constants.FLINK_QUEUE); args.add(param.getQueue()); } - if (StringUtils.isNotEmpty(param.getMainArgs())) { - args.add(param.getMainArgs()); - } return args; } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 90a2679f5b..eec01be782 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.zk; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; -import org.apache.dolphinscheduler.common.model.MasterServer; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.DaoFactory; @@ -141,7 +141,6 @@ public class ZKMasterClient extends AbstractZKClient { */ public void initDao(){ this.alertDao = DaoFactory.getDaoInstance(AlertDao.class); -// this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } /** * get alert dao @@ -345,10 +344,10 @@ public class ZKMasterClient extends AbstractZKClient { return false; } Date workerServerStartDate = null; - List workerServers= getServersList(ZKNodeType.WORKER); - for(MasterServer server : workerServers){ - if(server.getHost().equals(taskInstance.getHost())){ - workerServerStartDate = server.getCreateTime(); + List workerServers= getServersList(ZKNodeType.WORKER); + for(Server workerServer : workerServers){ + if(workerServer.getHost().equals(taskInstance.getHost())){ + workerServerStartDate = workerServer.getCreateTime(); break; } } diff --git a/dolphinscheduler-ui/.env b/dolphinscheduler-ui/.env index 3bbb537f51..9018a5d4e2 100644 --- a/dolphinscheduler-ui/.env +++ b/dolphinscheduler-ui/.env @@ -1,5 +1,5 @@ # 后端接口地址 -API_BASE = http://127.0.0.1:12345 +API_BASE = http://192.168.xx.xx:12345 # 本地开发如需ip访问项目把"#"号去掉 #DEV_HOST = 192.168.xx.xx