diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/AlertServer.java b/escheduler-alert/src/main/java/cn/escheduler/alert/AlertServer.java index 27c5f0ce56..b8402aacf1 100644 --- a/escheduler-alert/src/main/java/cn/escheduler/alert/AlertServer.java +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/AlertServer.java @@ -20,28 +20,33 @@ import cn.escheduler.alert.runner.AlertSender; import cn.escheduler.alert.utils.Constants; import cn.escheduler.common.thread.Stopper; import cn.escheduler.dao.AlertDao; -import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.model.Alert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.context.annotation.ComponentScan; import java.util.List; /** * alert of start */ -public class AlertServer { +@ComponentScan("cn.escheduler") +public class AlertServer implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(AlertServer.class); /** * Alert Dao */ - private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class); + @Autowired + private AlertDao alertDao; private AlertSender alertSender; private static volatile AlertServer instance; - private AlertServer() { + public AlertServer() { } @@ -72,7 +77,12 @@ public class AlertServer { public static void main(String[] args){ + SpringApplication app = new SpringApplication(AlertServer.class); + app.run(args); + } + @Override + public void run(String... strings) throws Exception { AlertServer alertServer = AlertServer.getInstance(); alertServer.start(); } diff --git a/escheduler-alert/src/main/resources/application_alert.properties b/escheduler-alert/src/main/resources/application_alert.properties new file mode 100644 index 0000000000..cbc81b06e6 --- /dev/null +++ b/escheduler-alert/src/main/resources/application_alert.properties @@ -0,0 +1,5 @@ +logging.config=classpath:alert_logback.xml + +# server port +server.port=7789 + diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java index 45d0b46e79..813e4b13fb 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java @@ -570,7 +570,7 @@ public class ProcessDefinitionService extends BaseDAGService { row.put("scheduleEndTime", schedule.getEndTime()); row.put("scheduleCrontab", schedule.getCrontab()); row.put("scheduleFailureStrategy", schedule.getFailureStrategy()); - row.put("scheduleReleaseState", schedule.getReleaseState()); + row.put("scheduleReleaseState", ReleaseState.OFFLINE); row.put("scheduleProcessInstancePriority", schedule.getProcessInstancePriority()); if(schedule.getId() == -1){ row.put("scheduleWorkerGroupId", -1); diff --git a/escheduler-common/pom.xml b/escheduler-common/pom.xml index b5f1d58620..fe24d02209 100644 --- a/escheduler-common/pom.xml +++ b/escheduler-common/pom.xml @@ -572,6 +572,12 @@ org.mortbay.jetty jsp-2.1 + + + org.mortbay.jetty + servlet-api-2.5 + + diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpCheckCondition.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpCheckCondition.java new file mode 100644 index 0000000000..b88ec01980 --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpCheckCondition.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.enums; + +/** + * http check condition + */ +public enum HttpCheckCondition { + /** + * 0 status_code_default:200 + * 1 status_code_custom + * 2 body_contains + * 3 body_not_contains + */ + STATUS_CODE_DEFAULT,STATUS_CODE_CUSTOM, BODY_CONTAINS, BODY_NOT_CONTAINS +} \ No newline at end of file diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpMethod.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpMethod.java new file mode 100644 index 0000000000..32cbf1c42f --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpMethod.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.enums; + +/** + * http method + */ +public enum HttpMethod { + /** + * 0 get + * 1 post + * 2 head + * 3 put + * 4 delete + */ + GET, POST, HEAD, PUT, DELETE +} \ No newline at end of file diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpParametersType.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpParametersType.java new file mode 100644 index 0000000000..21f4d8b3cc --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/HttpParametersType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.enums; + +/** + * http parameters type + */ +public enum HttpParametersType { + /** + * 0 parameter; + * 1 body; + * 2 headers; + */ + PARAMETER,BODY,HEADERS +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java index 7e4fde6a34..a212261db6 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java @@ -30,8 +30,9 @@ public enum TaskType { * 6 PYTHON * 7 DEPENDENT * 8 FLINK + * 9 HTTP */ - SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT,FLINK; + SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT,FLINK,HTTP; public static boolean typeIsNormalTask(String typeName) { TaskType taskType = TaskType.valueOf(typeName); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/process/HttpProperty.java b/escheduler-common/src/main/java/cn/escheduler/common/process/HttpProperty.java new file mode 100644 index 0000000000..ed7cc46a38 --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/process/HttpProperty.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.process; + +import cn.escheduler.common.enums.HttpParametersType; + +import java.util.Objects; + +public class HttpProperty { + /** + * key + */ + private String prop; + + /** + * httpParametersType + */ + private HttpParametersType httpParametersType; + + /** + * value + */ + private String value; + + public HttpProperty() { + } + + public HttpProperty(String prop, HttpParametersType httpParametersType, String value) { + this.prop = prop; + this.httpParametersType = httpParametersType; + this.value = value; + } + + /** + * getter method + * + * @return the prop + * @see HttpProperty#prop + */ + public String getProp() { + return prop; + } + + /** + * setter method + * + * @param prop the prop to set + * @see HttpProperty#prop + */ + public void setProp(String prop) { + this.prop = prop; + } + + /** + * getter method + * + * @return the value + * @see HttpProperty#value + */ + public String getValue() { + return value; + } + + /** + * setter method + * + * @param value the value to set + * @see HttpProperty#value + */ + public void setValue(String value) { + this.value = value; + } + + public HttpParametersType getHttpParametersType() { + return httpParametersType; + } + + public void setHttpParametersType(HttpParametersType httpParametersType) { + this.httpParametersType = httpParametersType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HttpProperty property = (HttpProperty) o; + return Objects.equals(prop, property.prop) && + Objects.equals(value, property.value); + } + + + @Override + public int hashCode() { + return Objects.hash(prop, value); + } + + @Override + public String toString() { + return "HttpProperty{" + + "prop='" + prop + '\'' + + ", httpParametersType=" + httpParametersType + + ", value='" + value + '\'' + + '}'; + } + + +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/task/http/HttpParameters.java b/escheduler-common/src/main/java/cn/escheduler/common/task/http/HttpParameters.java new file mode 100644 index 0000000000..dc23f738b8 --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/task/http/HttpParameters.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.task.http; + +import cn.escheduler.common.enums.HttpCheckCondition; +import cn.escheduler.common.enums.HttpMethod; +import cn.escheduler.common.process.HttpProperty; +import cn.escheduler.common.task.AbstractParameters; +import org.apache.commons.lang.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * http parameter + */ +public class HttpParameters extends AbstractParameters { + /** + * url + */ + private String url; + + /** + * httpMethod + */ + private HttpMethod httpMethod; + + /** + * http params + */ + private List httpParams; + + /** + * httpCheckCondition + */ + private HttpCheckCondition httpCheckCondition = HttpCheckCondition.STATUS_CODE_DEFAULT; + + /** + * condition + */ + private String condition; + + + + @Override + public boolean checkParameters() { + return StringUtils.isNotEmpty(url); + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public HttpMethod getHttpMethod() { + return httpMethod; + } + + public void setHttpMethod(HttpMethod httpMethod) { + this.httpMethod = httpMethod; + } + + public List getHttpParams() { + return httpParams; + } + + public void setHttpParams(List httpParams) { + this.httpParams = httpParams; + } + + public HttpCheckCondition getHttpCheckCondition() { + return httpCheckCondition; + } + + public void setHttpCheckCondition(HttpCheckCondition httpCheckCondition) { + this.httpCheckCondition = httpCheckCondition; + } + + public String getCondition() { + return condition; + } + + public void setCondition(String condition) { + this.condition = condition; + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java index c8ceeb44a2..b3fcade4a7 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/TaskParametersUtils.java @@ -20,6 +20,7 @@ import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.dependent.DependentParameters; import cn.escheduler.common.task.flink.FlinkParameters; +import cn.escheduler.common.task.http.HttpParameters; import cn.escheduler.common.task.mr.MapreduceParameters; import cn.escheduler.common.task.procedure.ProcedureParameters; import cn.escheduler.common.task.python.PythonParameters; @@ -66,6 +67,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, DependentParameters.class); case FLINK: return JSONUtils.parseObject(parameter, FlinkParameters.class); + case HTTP: + return JSONUtils.parseObject(parameter, HttpParameters.class); default: return null; } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 73ae868c80..dee22d6f36 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -1024,11 +1024,11 @@ public class ProcessDao extends AbstractBaseDao { } /** - * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${task executed by ip1},${ip2}... + * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskInstanceId}_${task executed by ip1},${ip2}... * * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low. * - * 流程实例优先级_流程实例id_任务优先级_任务id_任务执行机器ip1,ip2... high <- low + * 流程实例优先级_流程实例id_任务优先级_任务实例id_任务执行机器ip1,ip2... high <- low * * @param taskInstance * @return diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/DolphinSchedulerManager.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/DolphinSchedulerManager.java index 210b95ff6f..f15312a3f3 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/DolphinSchedulerManager.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/DolphinSchedulerManager.java @@ -81,7 +81,9 @@ public class DolphinSchedulerManager { String version = ""; // Gets the version of the current system if (upgradeDao.isExistsTable("t_escheduler_version")) { - version = upgradeDao.getCurrentVersion(); + version = upgradeDao.getCurrentVersion("t_escheduler_version"); + }else if(upgradeDao.isExistsTable("t_dolphinscheduler_version")){ + version = upgradeDao.getCurrentVersion("t_dolphinscheduler_version"); }else if(upgradeDao.isExistsColumn("t_escheduler_queue","create_time")){ version = "1.0.1"; }else if(upgradeDao.isExistsTable("t_escheduler_queue")){ @@ -93,21 +95,14 @@ public class DolphinSchedulerManager { // The target version of the upgrade String schemaVersion = ""; for(String schemaDir : schemaList) { - - schemaVersion = schemaDir.split("_")[0]; if(SchemaUtils.isAGreatVersion(schemaVersion , version)) { logger.info("upgrade DolphinScheduler metadata version from " + version + " to " + schemaVersion); - logger.info("Begin upgrading DolphinScheduler's table structure"); upgradeDao.upgradeDolphinScheduler(schemaDir); - if(SchemaUtils.isAGreatVersion(version,"1.0.1")){ - version = upgradeDao.getCurrentVersion(); - }else { - version = schemaVersion; - } + version = schemaVersion; } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/MysqlUpgradeDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/MysqlUpgradeDao.java index 17832896bc..d6a8974d28 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/MysqlUpgradeDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/MysqlUpgradeDao.java @@ -28,8 +28,6 @@ import java.sql.SQLException; public class MysqlUpgradeDao extends UpgradeDao { public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); - private static final String T_VERSION_NAME = "t_escheduler_version"; - private static final String rootDir = System.getProperty("user.dir"); @Override protected void init() { @@ -54,10 +52,11 @@ public class MysqlUpgradeDao extends UpgradeDao { * @return */ public boolean isExistsTable(String tableName) { + ResultSet rs = null; Connection conn = null; try { conn = ConnectionFactory.getDataSource().getConnection(); - ResultSet rs = conn.getMetaData().getTables(null, null, tableName, null); + rs = conn.getMetaData().getTables(null, null, tableName, null); if (rs.next()) { return true; } else { @@ -68,8 +67,7 @@ public class MysqlUpgradeDao extends UpgradeDao { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage(),e); } finally { - ConnectionUtils.releaseResource(null, null, conn); - + ConnectionUtils.releaseResource(rs, null, conn); } } @@ -96,7 +94,6 @@ public class MysqlUpgradeDao extends UpgradeDao { throw new RuntimeException(e.getMessage(),e); } finally { ConnectionUtils.releaseResource(null, null, conn); - } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/PostgresqlUpgradeDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/PostgresqlUpgradeDao.java index 03ec8c819a..87ac1b4bde 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/PostgresqlUpgradeDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/PostgresqlUpgradeDao.java @@ -29,8 +29,6 @@ import java.sql.SQLException; public class PostgresqlUpgradeDao extends UpgradeDao { public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); - private static final String T_VERSION_NAME = "t_escheduler_version"; - private static final String rootDir = System.getProperty("user.dir"); private static final String schema = getSchema(); @Override @@ -55,23 +53,24 @@ public class PostgresqlUpgradeDao extends UpgradeDao { super.initSchema(initSqlPath); } - private static String getSchema(){ + public static String getSchema(){ Connection conn = null; PreparedStatement pstmt = null; + ResultSet resultSet = null; try { conn = ConnectionFactory.getDataSource().getConnection(); pstmt = conn.prepareStatement("select current_schema()"); - ResultSet resultSet = pstmt.executeQuery(); + resultSet = pstmt.executeQuery(); while (resultSet.next()){ if(resultSet.isFirst()){ return resultSet.getString(1); } } + } catch (SQLException e) { logger.error(e.getMessage(),e); - } finally { - ConnectionUtils.releaseResource(null, null, conn); + ConnectionUtils.releaseResource(resultSet, pstmt, conn); } return ""; } @@ -83,10 +82,11 @@ public class PostgresqlUpgradeDao extends UpgradeDao { */ public boolean isExistsTable(String tableName) { Connection conn = null; + ResultSet rs = null; try { conn = ConnectionFactory.getDataSource().getConnection(); - ResultSet rs = conn.getMetaData().getTables(null, schema, tableName, null); + rs = conn.getMetaData().getTables(null, schema, tableName, null); if (rs.next()) { return true; } else { @@ -97,7 +97,7 @@ public class PostgresqlUpgradeDao extends UpgradeDao { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage(),e); } finally { - ConnectionUtils.releaseResource(null, null, conn); + ConnectionUtils.releaseResource(rs, null, conn); } } @@ -110,9 +110,10 @@ public class PostgresqlUpgradeDao extends UpgradeDao { */ public boolean isExistsColumn(String tableName,String columnName) { Connection conn = null; + ResultSet rs = null; try { conn = ConnectionFactory.getDataSource().getConnection(); - ResultSet rs = conn.getMetaData().getColumns(null,schema,tableName,columnName); + rs = conn.getMetaData().getColumns(null,schema,tableName,columnName); if (rs.next()) { return true; } else { @@ -123,7 +124,7 @@ public class PostgresqlUpgradeDao extends UpgradeDao { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage(),e); } finally { - ConnectionUtils.releaseResource(null, null, conn); + ConnectionUtils.releaseResource(rs, null, conn); } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/UpgradeDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/UpgradeDao.java index 13c0deffb9..820d8a3c99 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/UpgradeDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/UpgradeDao.java @@ -18,6 +18,7 @@ package cn.escheduler.dao.upgrade; import cn.escheduler.common.enums.DbType; import cn.escheduler.common.utils.ConnectionUtils; +import cn.escheduler.common.utils.SchemaUtils; import cn.escheduler.common.utils.ScriptRunner; import cn.escheduler.dao.AbstractBaseDao; import cn.escheduler.dao.datasource.ConnectionFactory; @@ -36,7 +37,9 @@ public abstract class UpgradeDao extends AbstractBaseDao { public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); private static final String T_VERSION_NAME = "t_escheduler_version"; + private static final String T_NEW_VERSION_NAME = "t_dolphinscheduler_version"; private static final String rootDir = System.getProperty("user.dir"); + private static final DbType dbType = getCurrentDbType(); @Override protected void init() { @@ -48,13 +51,24 @@ public abstract class UpgradeDao extends AbstractBaseDao { * @return */ public static DbType getDbType(){ + return dbType; + } + + /** + * get db type + * @return + */ + private static DbType getCurrentDbType(){ + Connection conn = null; try { - Connection conn = ConnectionFactory.getDataSource().getConnection(); + conn = ConnectionFactory.getDataSource().getConnection(); String name = conn.getMetaData().getDatabaseProductName().toUpperCase(); return DbType.valueOf(name); } catch (Exception e) { logger.error(e.getMessage(),e); return null; + }finally { + ConnectionUtils.releaseResource(null, null, conn); } } @@ -175,8 +189,8 @@ public abstract class UpgradeDao extends AbstractBaseDao { public abstract boolean isExistsColumn(String tableName,String columnName); - public String getCurrentVersion() { - String sql = String.format("select version from %s",T_VERSION_NAME); + public String getCurrentVersion(String versionName) { + String sql = String.format("select version from %s",versionName); Connection conn = null; ResultSet rs = null; PreparedStatement pstmt = null; @@ -231,6 +245,12 @@ public abstract class UpgradeDao extends AbstractBaseDao { pstmt = conn.prepareStatement(upgradeSQL); pstmt.setString(1, schemaVersion); pstmt.executeUpdate(); + }else if (isExistsTable(T_NEW_VERSION_NAME)) { + // Change version in the version table to the new version + String upgradeSQL = String.format("update %s set version = ?",T_NEW_VERSION_NAME); + pstmt = conn.prepareStatement(upgradeSQL); + pstmt.setString(1, schemaVersion); + pstmt.executeUpdate(); } conn.commit(); } catch (FileNotFoundException e) { @@ -275,7 +295,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { if (StringUtils.isEmpty(rootDir)) { throw new RuntimeException("Environment variable user.dir not found"); } - String mysqlSQLFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_dml.sql",rootDir,schemaDir,getDbType().name().toLowerCase()); + String mysqlSQLFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_ddl.sql",rootDir,schemaDir,getDbType().name().toLowerCase()); Connection conn = null; PreparedStatement pstmt = null; try { @@ -314,7 +334,11 @@ public abstract class UpgradeDao extends AbstractBaseDao { public void updateVersion(String version) { // Change version in the version table to the new version - String upgradeSQL = String.format("update %s set version = ?",T_VERSION_NAME); + String versionName = T_VERSION_NAME; + if(!SchemaUtils.isAGreatVersion("1.2.0" , version)){ + versionName = "t_dolphinscheduler_version"; + } + String upgradeSQL = String.format("update %s set version = ?",versionName); PreparedStatement pstmt = null; Connection conn = null; try { diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java index 986a6179c9..de96ff25be 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskManager.java @@ -20,6 +20,7 @@ package cn.escheduler.server.worker.task; import cn.escheduler.common.enums.TaskType; import cn.escheduler.server.worker.task.dependent.DependentTask; import cn.escheduler.server.worker.task.flink.FlinkTask; +import cn.escheduler.server.worker.task.http.HttpTask; import cn.escheduler.server.worker.task.mr.MapReduceTask; import cn.escheduler.server.worker.task.processdure.ProcedureTask; import cn.escheduler.server.worker.task.python.PythonTask; @@ -62,6 +63,8 @@ public class TaskManager { return new PythonTask(props, logger); case DEPENDENT: return new DependentTask(props, logger); + case HTTP: + return new HttpTask(props, logger); default: logger.error("unsupport task type: {}", taskType); throw new IllegalArgumentException("not support task type"); diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/http/HttpTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/http/HttpTask.java new file mode 100644 index 0000000000..552e4bd828 --- /dev/null +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/http/HttpTask.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.server.worker.task.http; + + +import cn.escheduler.common.enums.HttpMethod; +import cn.escheduler.common.enums.HttpParametersType; +import cn.escheduler.common.process.HttpProperty; +import cn.escheduler.common.process.Property; +import cn.escheduler.common.task.AbstractParameters; +import cn.escheduler.common.task.http.HttpParameters; +import cn.escheduler.common.utils.Bytes; +import cn.escheduler.common.utils.DateUtils; +import cn.escheduler.common.utils.ParameterUtils; +import cn.escheduler.dao.DaoFactory; +import cn.escheduler.dao.ProcessDao; +import cn.escheduler.dao.model.ProcessInstance; +import cn.escheduler.server.utils.ParamUtils; +import cn.escheduler.server.worker.task.AbstractTask; +import cn.escheduler.server.worker.task.TaskProps; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.io.Charsets; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.ParseException; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * http task + */ +public class HttpTask extends AbstractTask { + + private HttpParameters httpParameters; + + /** + * process database access + */ + private ProcessDao processDao; + + /** + * Convert mill seconds to second unit + */ + protected static final int MAX_CONNECTION_MILLISECONDS = 60000; + + protected static final String APPLICATION_JSON = "application/json"; + + protected String output; + + + public HttpTask(TaskProps props, Logger logger) { + super(props, logger); + this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); + } + + @Override + public void init() { + logger.info("http task params {}", taskProps.getTaskParams()); + this.httpParameters = JSONObject.parseObject(taskProps.getTaskParams(), HttpParameters.class); + + if (!httpParameters.checkParameters()) { + throw new RuntimeException("http task params is not valid"); + } + } + + @Override + public void handle() throws Exception { + String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskProps.getTaskAppId()); + Thread.currentThread().setName(threadLoggerInfoName); + + long startTime = System.currentTimeMillis(); + String statusCode = null; + String body = null; + try(CloseableHttpClient client = createHttpClient()) { + try(CloseableHttpResponse response = sendRequest(client)) { + statusCode = String.valueOf(getStatusCode(response)); + body = getResponseBody(response); + exitStatusCode = validResponse(body, statusCode); + long costTime = System.currentTimeMillis() - startTime; + logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {}Millisecond, statusCode : {}, body : {}, log : {}", + DateUtils.format2Readable(startTime), httpParameters.getUrl(),httpParameters.getHttpMethod(), costTime, statusCode, body, output); + }catch (Exception e) { + appendMessage(e.toString()); + exitStatusCode = -1; + logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e); + } + } catch (Exception e) { + appendMessage(e.toString()); + exitStatusCode = -1; + logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e); + } + } + + protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException { + RequestBuilder builder = createRequestBuilder(); + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + httpParameters.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + List httpPropertyList = new ArrayList<>(); + if(httpParameters.getHttpParams() != null && httpParameters.getHttpParams().size() > 0){ + for (HttpProperty httpProperty: httpParameters.getHttpParams()) { + String jsonObject = JSONObject.toJSONString(httpProperty); + String params = ParameterUtils.convertParameterPlaceholders(jsonObject,ParamUtils.convert(paramsMap)); + logger.info("http request params:{}",params); + httpPropertyList.add(JSONObject.parseObject(params,HttpProperty.class)); + } + } + addRequestParams(builder,httpPropertyList); + HttpUriRequest request = builder.setUri(httpParameters.getUrl()).build(); + setHeaders(request,httpPropertyList); + return client.execute(request); + } + + protected String getResponseBody(CloseableHttpResponse httpResponse) throws ParseException, IOException { + if (httpResponse == null) { + return null; + } + HttpEntity entity = httpResponse.getEntity(); + if (entity == null) { + return null; + } + String webPage = EntityUtils.toString(entity, Bytes.UTF8_ENCODING); + return webPage; + } + + protected int getStatusCode(CloseableHttpResponse httpResponse) { + int status = httpResponse.getStatusLine().getStatusCode(); + return status; + } + + protected int validResponse(String body, String statusCode){ + int exitStatusCode = 0; + switch (httpParameters.getHttpCheckCondition()) { + case BODY_CONTAINS: + if (StringUtils.isEmpty(body) || !body.contains(httpParameters.getCondition())) { + appendMessage(httpParameters.getUrl() + " doesn contain " + + httpParameters.getCondition()); + exitStatusCode = -1; + } + break; + case BODY_NOT_CONTAINS: + if (StringUtils.isEmpty(body) || body.contains(httpParameters.getCondition())) { + appendMessage(httpParameters.getUrl() + " contains " + + httpParameters.getCondition()); + exitStatusCode = -1; + } + break; + case STATUS_CODE_CUSTOM: + if (!statusCode.equals(httpParameters.getCondition())) { + appendMessage(httpParameters.getUrl() + " statuscode: " + statusCode + ", Must be: " + httpParameters.getCondition()); + exitStatusCode = -1; + } + break; + default: + if (!"200".equals(statusCode)) { + appendMessage(httpParameters.getUrl() + " statuscode: " + statusCode + ", Must be: 200"); + exitStatusCode = -1; + } + break; + } + return exitStatusCode; + } + + public String getOutput() { + return output; + } + + protected void appendMessage(String message) { + if (output == null) { + output = ""; + } + if (message != null && !message.trim().isEmpty()) { + output += message; + } + } + + protected void addRequestParams(RequestBuilder builder,List httpPropertyList) { + if(httpPropertyList != null && httpPropertyList.size() > 0){ + JSONObject jsonParam = new JSONObject(); + for (HttpProperty property: httpPropertyList){ + if(property.getHttpParametersType() != null){ + if (property.getHttpParametersType().equals(HttpParametersType.PARAMETER)){ + builder.addParameter(property.getProp(), property.getValue()); + }else if(property.getHttpParametersType().equals(HttpParametersType.BODY)){ + jsonParam.put(property.getProp(), property.getValue()); + } + } + } + StringEntity postingString = new StringEntity(jsonParam.toString(), Charsets.UTF_8); + postingString.setContentEncoding(Bytes.UTF8_ENCODING); + postingString.setContentType(APPLICATION_JSON); + builder.setEntity(postingString); + } + } + + protected void setHeaders(HttpUriRequest request,List httpPropertyList) { + if(httpPropertyList != null && httpPropertyList.size() > 0){ + for (HttpProperty property: httpPropertyList){ + if(property.getHttpParametersType() != null) { + if (property.getHttpParametersType().equals(HttpParametersType.HEADERS)) { + request.addHeader(property.getProp(), property.getValue()); + } + } + } + } + } + + protected CloseableHttpClient createHttpClient() { + final RequestConfig requestConfig = requestConfig(); + HttpClientBuilder httpClientBuilder; + httpClientBuilder = HttpClients.custom().setDefaultRequestConfig(requestConfig); + return httpClientBuilder.build(); + } + + private RequestConfig requestConfig() { + return RequestConfig.custom().setSocketTimeout(MAX_CONNECTION_MILLISECONDS).setConnectTimeout(MAX_CONNECTION_MILLISECONDS).build(); + } + + protected RequestBuilder createRequestBuilder() { + if (httpParameters.getHttpMethod().equals(HttpMethod.GET)) { + return RequestBuilder.get(); + } else if (httpParameters.getHttpMethod().equals(HttpMethod.POST)) { + return RequestBuilder.post(); + } else if (httpParameters.getHttpMethod().equals(HttpMethod.HEAD)) { + return RequestBuilder.head(); + } else if (httpParameters.getHttpMethod().equals(HttpMethod.PUT)) { + return RequestBuilder.put(); + } else if (httpParameters.getHttpMethod().equals(HttpMethod.DELETE)) { + return RequestBuilder.delete(); + } else { + return null; + } + } + + @Override + public AbstractParameters getParameters() { + return this.httpParameters; + } +} diff --git a/escheduler-ui/package.json b/escheduler-ui/package.json index 014205ba16..3a8d532e01 100644 --- a/escheduler-ui/package.json +++ b/escheduler-ui/package.json @@ -23,7 +23,6 @@ "babel-plugin-syntax-jsx": "^6.18.0", "babel-plugin-transform-class-properties": "^6.24.1", "babel-plugin-transform-object-rest-spread": "^6.26.0", - "babel-plugin-transform-remove-console": "^6.9.4", "babel-plugin-transform-runtime": "^6.23.0", "babel-plugin-transform-vue-jsx": "^3.5.0", "babel-preset-env": "^1.6.1", diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js index 5305bf7476..54c0cdc5f5 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -275,6 +275,10 @@ let tasksType = { 'DEPENDENT': { desc: 'DEPENDENT', color: '#2FBFD8' + }, + 'HTTP': { + desc: 'HTTP', + color: '#E46F13' } } diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index 37d3acaa19..fefdbe670d 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -82,6 +82,9 @@ .icos-DEPENDENT { background: url("../img/toolbar_DEPENDENT.png") no-repeat 50% 50%; } + .icos-HTTP { + background: url("../img/toobar_HTTP.png") no-repeat 50% 50%; + } .toolbar { width: 60px; height: 100%; diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index a46b894d11..53da59643b 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -165,6 +165,12 @@ ref="DEPENDENT" :backfill-item="backfillItem"> + + @@ -189,6 +195,7 @@ import JSP from './../plugIn/jsPlumbHandle' import mProcedure from './tasks/procedure' import mDependent from './tasks/dependent' + import mHttp from './tasks/http' import mSubProcess from './tasks/sub_process' import mSelectInput from './_source/selectInput' import mTimeoutAlarm from './_source/timeoutAlarm' @@ -465,6 +472,7 @@ mFlink, mPython, mDependent, + mHttp, mSelectInput, mTimeoutAlarm, mPriority, diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js index 1897940064..1781455f8d 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js @@ -202,10 +202,26 @@ const sqlTypeList = [ } ] +const positionList = [ + { + id: 'PARAMETER', + code: "Parameter" + }, + { + id: 'BODY', + code: "Body" + }, + { + id: 'HEADERS', + code: "Headers" + } +] + export { cycleList, dateValueList, typeList, directList, - sqlTypeList + sqlTypeList, + positionList } diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/httpParams.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/httpParams.vue new file mode 100644 index 0000000000..343104f286 --- /dev/null +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/httpParams.vue @@ -0,0 +1,242 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + + + + diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/http.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/http.vue new file mode 100644 index 0000000000..3e40fce93c --- /dev/null +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/http.vue @@ -0,0 +1,191 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + diff --git a/escheduler-ui/src/js/conf/home/pages/dag/img/toobar_HTTP.png b/escheduler-ui/src/js/conf/home/pages/dag/img/toobar_HTTP.png new file mode 100644 index 0000000000..a0ca7bd141 Binary files /dev/null and b/escheduler-ui/src/js/conf/home/pages/dag/img/toobar_HTTP.png differ diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue index 1dd6630c41..749529ce3f 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue @@ -253,7 +253,12 @@ let msg = '' this.store.dispatch(api, searchParams).then(res => { - this.previewTimes = res + if (res.length) { + this.previewTimes = res + } else { + this.$message.warning('该时间段无数据') + this.$message.warning(`${i18n.$t('There is no data for this period of time')}`) + } }) } }, diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/defineUserCount.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/defineUserCount.vue index 818765dae1..83eaec2e09 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/defineUserCount.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/defineUserCount.vue @@ -21,7 +21,8 @@ data () { return { isSpin: true, - msg: '' + msg: '', + parameter: {projectId: 0} } }, props: { @@ -54,7 +55,8 @@ }, created () { this.isSpin = true - this.getDefineUserCount(this.projectId).then(res => { + this.parameter.projectId = this.projectId; + this.getDefineUserCount(this.parameter).then(res => { this.defineUserList = [] this._handleDefineUser(res) this.isSpin = false diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index 9efc9f721a..09b39517f3 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -12,31 +12,34 @@ {{$t('Process Name')}} - + {{$t('Run Type')}} - + + {{$t('Scheduling Time')}} + + {{$t('Start Time')}} - + {{$t('End Time')}} - + {{$t('Duration')}}s - + {{$t('Run Times')}} {{$t('host')}} - + {{$t('fault-tolerant sign')}} - + {{$t('State')}} - + {{$t('Operation')}} @@ -46,9 +49,13 @@ {{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}} - {{item.name}} + {{item.name}} {{_rtRunningType(item.commandType)}} + + + {{item.scheduleTime | formatDate}} + {{item.startTime | formatDate}} {{item.endTime | formatDate}} diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue index c41828b39b..37e8563117 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue @@ -12,7 +12,7 @@ {{$t('Process Instance')}} - + {{$t('Node Type')}} diff --git a/escheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/createUdf.vue b/escheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/createUdf.vue index 37be2ebf22..feb7a86c17 100644 --- a/escheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/createUdf.vue +++ b/escheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/createUdf.vue @@ -27,7 +27,7 @@