From 5169689c3566f67438d2224f914197c09c03d52d Mon Sep 17 00:00:00 2001 From: Yelli Date: Mon, 3 Feb 2020 10:24:24 +0800 Subject: [PATCH 1/6] class overrides equals() and should therefore also override hashCode() (#1882) --- .../apache/dolphinscheduler/api/utils/ZooKeeperState.java | 2 +- .../apache/dolphinscheduler/common/model/DateInterval.java | 5 +++++ .../dolphinscheduler/common/model/TaskNodeRelation.java | 7 +++++++ .../server/master/runner/MasterTaskExecThread.java | 2 +- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java index 968a51ad63..5aa6be858a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java @@ -145,7 +145,7 @@ public class ZooKeeperState { sendThread.setName("FourLetterCmd:" + cmd); sendThread.start(); try { - sendThread.join(waitTimeout * 1000); + sendThread.join(waitTimeout * 1000L); return sendThread.ret; } catch (InterruptedException e) { logger.error("send " + cmd + " to server " + host + ":" + port + " failed!", e); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java index 4ea764cf80..e936a444d0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.common.model; import java.util.Date; +import java.util.Objects; /** * date interval class @@ -62,4 +63,8 @@ public class DateInterval { this.endTime = endTime; } + @Override + public int hashCode() { + return Objects.hash(startTime, endTime); + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java index 2fbc0e1654..e4741574e4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.common.model; +import java.util.Objects; + public class TaskNodeRelation { /** @@ -69,4 +71,9 @@ public class TaskNodeRelation { ", endNode='" + endNode + '\'' + '}'; } + + @Override + public int hashCode() { + return Objects.hash(startNode, endNode); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 1861e8a908..f2ee66b64a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -122,7 +122,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { break; } if(checkTimeout){ - long remainTime = getRemaintime(taskTimeoutParameter.getInterval()*60); + long remainTime = getRemaintime(taskTimeoutParameter.getInterval() * 60L); if (remainTime < 0) { logger.warn("task id: {} execution time out",taskInstance.getId()); // process define From 42b3e5bd10091046a11796a477d6a799bc1b8f7a Mon Sep 17 00:00:00 2001 From: Tboy Date: Mon, 3 Feb 2020 11:06:31 +0800 Subject: [PATCH 2/6] fix api server startup failure (#1886) * move version to parent pom * move version properties to parent pom for easy management * remove freemarker dependency * delete CombinedApplicationServer * #1871 correct spelling * #1873 some updates for TaskQueueZkImpl * #1875 remove unused properties in pom * #1878 1. remove tomcat dependency 2. remove combined_logback.xml in api module 3. format pom.xml for not aligning * #1885 fix api server startup failure 1. add jsp-2.1 dependency 2. remove jasper-runtime dependency --- dolphinscheduler-api/pom.xml | 5 +++++ pom.xml | 23 ++++++++++++----------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml index 4dd2f31f8e..bac74da0b0 100644 --- a/dolphinscheduler-api/pom.xml +++ b/dolphinscheduler-api/pom.xml @@ -183,6 +183,11 @@ servlet-api + + org.mortbay.jetty + jsp-2.1 + + org.springframework.boot diff --git a/pom.xml b/pom.xml index 742c15e636..86d95e44a7 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ 3.5.0 0.1.52 6.1.0.jre8 - 6.1.14 + 6.1.14 3.1.12 3.0.0 0.13 @@ -116,7 +116,6 @@ 2.7 2.21.0 2.0.2 - 5.5.23 2.5 1.9.3 2.9.2 @@ -442,12 +441,6 @@ ${hadoop.version} - - javax.servlet - javax.servlet-api - ${javax.servlet.api.version} - - org.apache.commons commons-collections4 @@ -502,10 +495,12 @@ ${jcip.version} true + + - tomcat - jasper-runtime - ${jasper-runtime.version} + org.mortbay.jetty + jsp-2.1 + ${jsp-2.1.version} @@ -513,6 +508,12 @@ servlet-api ${servlet-api.version} + + + javax.servlet + javax.servlet-api + ${javax.servlet.api.version} + io.springfox springfox-swagger2 From 78bfe918d002cbea06ac8430503bee62dc8fc67a Mon Sep 17 00:00:00 2001 From: Yelli Date: Mon, 3 Feb 2020 11:07:19 +0800 Subject: [PATCH 3/6] change datasource null check position (#1883) --- .../server/worker/task/sql/SqlTask.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index a2f07c8be3..eba05a0d21 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -123,19 +123,21 @@ public class SqlTask extends AbstractTask { } dataSource= processDao.findDataSourceById(sqlParameters.getDatasource()); - logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", - dataSource.getName(), - dataSource.getType(), - dataSource.getNote(), - dataSource.getUserId(), - dataSource.getConnectionParams()); + // data source is null if (dataSource == null){ logger.error("datasource not exists"); exitStatusCode = -1; return; } + logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", + dataSource.getName(), + dataSource.getType(), + dataSource.getNote(), + dataSource.getUserId(), + dataSource.getConnectionParams()); + Connection con = null; List createFuncs = null; try { From dbbc6f58da2595800b154387bc86d869e45abb6f Mon Sep 17 00:00:00 2001 From: dailidong Date: Wed, 5 Feb 2020 09:21:21 +0800 Subject: [PATCH 4/6] jasper-runtime jar is needed when api server start (#1896) * update README about DolphinScheduler * Update issue templates * update * regularize api pom xml update rpc maven compile to 1.8 * regularize api pom xml * change commons.lang3.StringUtils to common.utils.StringUtils * update pom.xml * update * correct equals method * jasper-runtime is needed when api server start * jasper-runtime jar is needed when api server start Co-authored-by: DS --- dolphinscheduler-api/pom.xml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml index bac74da0b0..fd56fb8d9f 100644 --- a/dolphinscheduler-api/pom.xml +++ b/dolphinscheduler-api/pom.xml @@ -27,6 +27,10 @@ ${project.artifactId} jar + + 5.5.23 + + org.apache.dolphinscheduler @@ -178,6 +182,12 @@ hadoop-aws + + tomcat + jasper-runtime + ${jasper-runtime.version} + + javax.servlet servlet-api From f942e5ced397f2f6bf3cef86efd695c43b527ce2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AD=94=E6=96=B9=E4=B8=8D=E5=9C=A8=E6=89=8B?= <39549317+wenhemin@users.noreply.github.com> Date: Thu, 6 Feb 2020 09:29:23 +0800 Subject: [PATCH 5/6] [feature] data synchronization function (#1881) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feature:data synchronization function * feature:data synchronization function * feature:data synchronization function * fix:add license * fix:resource close bug * fix:remove class author * fix:Improve test coverage * fix:add UT * fix:add UT * fix:data sync node renamed to datax * fix:add UT * fix: UT bug * fix:Optimize variable name * fix:pom * fix:Variable name error * fix:optimize the code --- .../conf/env/.dolphinscheduler_env.sh | 1 + .../common/enums/TaskType.java | 4 +- .../common/task/datax/DataxParameters.java | 192 +++++++ .../common/utils/TaskParametersUtils.java | 3 + .../server/utils/DataxUtils.java | 129 +++++ .../server/worker/task/AbstractTask.java | 4 + .../server/worker/task/TaskManager.java | 3 + .../server/worker/task/datax/DataxTask.java | 522 ++++++++++++++++++ .../server/utils/DataxUtilsTest.java | 108 ++++ .../worker/task/datax/DataxTaskTest.java | 352 ++++++++++++ .../js/conf/home/pages/dag/_source/config.js | 4 + .../js/conf/home/pages/dag/_source/dag.scss | 3 + .../pages/dag/_source/formModel/formModel.vue | 10 +- .../dag/_source/formModel/tasks/datax.vue | 292 ++++++++++ .../conf/home/pages/dag/img/toolbar_DATAX.png | Bin 0 -> 571 bytes .../conf/home/pages/dag/img/toolbar_ETL.png | Bin 0 -> 3117 bytes .../src/js/module/i18n/locale/en_US.js | 10 +- .../src/js/module/i18n/locale/zh_CN.js | 10 +- pom.xml | 2 + 19 files changed, 1645 insertions(+), 4 deletions(-) create mode 100755 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java create mode 100755 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java create mode 100755 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java create mode 100755 dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue create mode 100644 dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png create mode 100755 dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png diff --git a/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh b/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh index 960d971dd8..8e842fe28e 100644 --- a/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh +++ b/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh @@ -18,3 +18,4 @@ export PYTHON_HOME=/usr/bin/python export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export PATH=$PYTHON_HOME:$JAVA_HOME/bin:$PATH +export DATAX_HOME=/opt/datax/bin/datax.py \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index 45f36883e3..b996c3aec9 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -33,6 +33,7 @@ public enum TaskType { * 7 DEPENDENT * 8 FLINK * 9 HTTP + * 10 DATAX */ SHELL(0, "shell"), SQL(1, "sql"), @@ -43,7 +44,8 @@ public enum TaskType { PYTHON(6, "python"), DEPENDENT(7, "dependent"), FLINK(8, "flink"), - HTTP(9, "http"); + HTTP(9, "http"), + DATAX(10, "datax"); TaskType(int code, String descp){ this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java new file mode 100755 index 0000000000..95dd505c02 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.datax; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.task.AbstractParameters; + +/** + * DataX parameter + */ +public class DataxParameters extends AbstractParameters { + + /** + * data source type,eg MYSQL, POSTGRES ... + */ + private String dsType; + + /** + * datasource id + */ + private int dataSource; + + /** + * data target type,eg MYSQL, POSTGRES ... + */ + private String dtType; + + /** + * datatarget id + */ + private int dataTarget; + + /** + * sql + */ + private String sql; + + /** + * target table + */ + private String targetTable; + + /** + * Pre Statements + */ + private List preStatements; + + /** + * Post Statements + */ + private List postStatements; + + /** + * speed byte num + */ + private int jobSpeedByte; + + /** + * speed record count + */ + private int jobSpeedRecord; + + public String getDsType() { + return dsType; + } + + public void setDsType(String dsType) { + this.dsType = dsType; + } + + public int getDataSource() { + return dataSource; + } + + public void setDataSource(int dataSource) { + this.dataSource = dataSource; + } + + public String getDtType() { + return dtType; + } + + public void setDtType(String dtType) { + this.dtType = dtType; + } + + public int getDataTarget() { + return dataTarget; + } + + public void setDataTarget(int dataTarget) { + this.dataTarget = dataTarget; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public String getTargetTable() { + return targetTable; + } + + public void setTargetTable(String targetTable) { + this.targetTable = targetTable; + } + + public List getPreStatements() { + return preStatements; + } + + public void setPreStatements(List preStatements) { + this.preStatements = preStatements; + } + + public List getPostStatements() { + return postStatements; + } + + public void setPostStatements(List postStatements) { + this.postStatements = postStatements; + } + + public int getJobSpeedByte() { + return jobSpeedByte; + } + + public void setJobSpeedByte(int jobSpeedByte) { + this.jobSpeedByte = jobSpeedByte; + } + + public int getJobSpeedRecord() { + return jobSpeedRecord; + } + + public void setJobSpeedRecord(int jobSpeedRecord) { + this.jobSpeedRecord = jobSpeedRecord; + } + + @Override + public boolean checkParameters() { + if (!(dataSource != 0 + && dataTarget != 0 + && StringUtils.isNotEmpty(sql) + && StringUtils.isNotEmpty(targetTable))) { + return false; + } + + return true; + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } + + @Override + public String toString() { + return "DataxParameters{" + + "dsType='" + dsType + '\'' + + ", dataSource=" + dataSource + + ", dtType='" + dtType + '\'' + + ", dataTarget=" + dataTarget + + ", sql='" + sql + '\'' + + ", targetTable='" + targetTable + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + + ", jobSpeedByte=" + jobSpeedByte + + ", jobSpeedRecord=" + jobSpeedRecord + + '}'; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index 28e2593359..a2ae6a68e3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.http.HttpParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; @@ -68,6 +69,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, FlinkParameters.class); case HTTP: return JSONUtils.parseObject(parameter, HttpParameters.class); + case DATAX: + return JSONUtils.parseObject(parameter, DataxParameters.class); default: return null; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java new file mode 100755 index 0000000000..930098919b --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + + +import org.apache.dolphinscheduler.common.enums.DbType; + +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; +import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; +import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; +import com.alibaba.druid.sql.parser.SQLStatementParser; + + +public class DataxUtils { + + public static final String DATAX_READER_PLUGIN_MYSQL = "mysqlreader"; + + public static final String DATAX_READER_PLUGIN_POSTGRESQL = "postgresqlreader"; + + public static final String DATAX_READER_PLUGIN_ORACLE = "oraclereader"; + + public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader"; + + public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter"; + + public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter"; + + public static final String DATAX_WRITER_PLUGIN_ORACLE = "oraclewriter"; + + public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter"; + + public static String getReaderPluginName(DbType dbType) { + switch (dbType) { + case MYSQL: + return DATAX_READER_PLUGIN_MYSQL; + case POSTGRESQL: + return DATAX_READER_PLUGIN_POSTGRESQL; + case ORACLE: + return DATAX_READER_PLUGIN_ORACLE; + case SQLSERVER: + return DATAX_READER_PLUGIN_SQLSERVER; + default: + return null; + } + } + + public static String getWriterPluginName(DbType dbType) { + switch (dbType) { + case MYSQL: + return DATAX_WRITER_PLUGIN_MYSQL; + case POSTGRESQL: + return DATAX_WRITER_PLUGIN_POSTGRESQL; + case ORACLE: + return DATAX_WRITER_PLUGIN_ORACLE; + case SQLSERVER: + return DATAX_WRITER_PLUGIN_SQLSERVER; + default: + return null; + } + } + + public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) { + switch (dbType) { + case MYSQL: + return new MySqlStatementParser(sql); + case POSTGRESQL: + return new PGSQLStatementParser(sql); + case ORACLE: + return new OracleStatementParser(sql); + case SQLSERVER: + return new SQLServerStatementParser(sql); + default: + return null; + } + } + + public static String[] convertKeywordsColumns(DbType dbType, String[] columns) { + if (columns == null) { + return null; + } + + String[] toColumns = new String[columns.length]; + for (int i = 0; i < columns.length; i++ ) { + toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]); + } + + return toColumns; + } + + public static String doConvertKeywordsColumn(DbType dbType, String column) { + if (column == null) { + return column; + } + + column = column.trim(); + column = column.replace("`", ""); + column = column.replace("\"", ""); + column = column.replace("'", ""); + + switch (dbType) { + case MYSQL: + return String.format("`%s`", column); + case POSTGRESQL: + return String.format("\"%s\"", column); + case ORACLE: + return String.format("\"%s\"", column); + case SQLSERVER: + return String.format("`%s`", column); + default: + return column; + } + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 999863f76e..f2772d0747 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; @@ -194,6 +195,9 @@ public abstract class AbstractTask { case PYTHON: paramsClass = PythonParameters.class; break; + case DATAX: + paramsClass = DataxParameters.class; + break; default: logger.error("not support this task type: {}", taskType); throw new IllegalArgumentException("not support this task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index 310d9cad9a..67deb7a3fa 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask; +import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.http.HttpTask; import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask; @@ -65,6 +66,8 @@ public class TaskManager { return new DependentTask(props, logger); case HTTP: return new HttpTask(props, logger); + case DATAX: + return new DataxTask(props, logger); default: logger.error("unsupport task type: {}", taskType); throw new IllegalArgumentException("not support task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java new file mode 100755 index 0000000000..0de2bbc7c6 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.datax; + + +import java.io.File; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.job.db.BaseDataSource; +import org.apache.dolphinscheduler.common.job.db.DataSourceFactory; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.utils.DataxUtils; +import org.apache.dolphinscheduler.server.utils.ParamUtils; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.slf4j.Logger; + +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.alibaba.fastjson.JSONObject; + + +/** + * DataX task + */ +public class DataxTask extends AbstractTask { + + /** + * python process(datax only supports version 2.7 by default) + */ + private static final String DATAX_PYTHON = "python2.7"; + + /** + * datax home path + */ + private static final String DATAX_HOME_EVN = "${DATAX_HOME}"; + + /** + * datax channel count + */ + private static final int DATAX_CHANNEL_COUNT = 1; + + /** + * datax parameters + */ + private DataxParameters dataXParameters; + + /** + * task dir + */ + private String taskDir; + + /** + * shell command executor + */ + private ShellCommandExecutor shellCommandExecutor; + + /** + * process database access + */ + private ProcessDao processDao; + + /** + * constructor + * + * @param props + * props + * @param logger + * logger + */ + public DataxTask(TaskProps props, Logger logger) { + super(props, logger); + + this.taskDir = props.getTaskDir(); + logger.info("task dir : {}", taskDir); + + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(), + props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), + props.getTaskTimeout(), logger); + + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); + } + + /** + * init DataX config + */ + @Override + public void init() { + logger.info("datax task params {}", taskProps.getTaskParams()); + dataXParameters = JSONUtils.parseObject(taskProps.getTaskParams(), DataxParameters.class); + + if (!dataXParameters.checkParameters()) { + throw new RuntimeException("datax task params is not valid"); + } + } + + /** + * run DataX process + * + * @throws Exception + */ + @Override + public void handle() + throws Exception { + try { + // set the name of the current thread + String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskProps.getTaskAppId()); + Thread.currentThread().setName(threadLoggerInfoName); + + // run datax process + String jsonFilePath = buildDataxJsonFile(); + String shellCommandFilePath = buildShellCommandFile(jsonFilePath); + exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processDao); + } + catch (Exception e) { + exitStatusCode = -1; + throw e; + } + } + + /** + * cancel DataX process + * + * @param cancelApplication + * @throws Exception + */ + @Override + public void cancelApplication(boolean cancelApplication) + throws Exception { + // cancel process + shellCommandExecutor.cancelApplication(); + } + + /** + * build datax configuration file + * + * @return + * @throws Exception + */ + private String buildDataxJsonFile() + throws Exception { + // generate json + String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId()); + + Path path = new File(fileName).toPath(); + if (Files.exists(path)) { + return fileName; + } + + JSONObject job = new JSONObject(); + job.put("content", buildDataxJobContentJson()); + job.put("setting", buildDataxJobSettingJson()); + + JSONObject root = new JSONObject(); + root.put("job", job); + root.put("core", buildDataxCoreJson()); + + logger.debug("datax job json : {}", root.toString()); + + // create datax json file + FileUtils.writeStringToFile(new File(fileName), root.toString(), Charset.forName("UTF-8")); + return fileName; + } + + /** + * build datax job config + * + * @return + * @throws SQLException + */ + private List buildDataxJobContentJson() + throws SQLException { + DataSource dataSource = processDao.findDataSourceById(dataXParameters.getDataSource()); + BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(), + dataSource.getConnectionParams()); + + DataSource dataTarget = processDao.findDataSourceById(dataXParameters.getDataTarget()); + BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(), + dataTarget.getConnectionParams()); + + List readerConnArr = new ArrayList<>(); + JSONObject readerConn = new JSONObject(); + readerConn.put("querySql", new String[] {dataXParameters.getSql()}); + readerConn.put("jdbcUrl", new String[] {dataSourceCfg.getJdbcUrl()}); + readerConnArr.add(readerConn); + + JSONObject readerParam = new JSONObject(); + readerParam.put("username", dataSourceCfg.getUser()); + readerParam.put("password", dataSourceCfg.getPassword()); + readerParam.put("connection", readerConnArr); + + JSONObject reader = new JSONObject(); + reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType())); + reader.put("parameter", readerParam); + + List writerConnArr = new ArrayList<>(); + JSONObject writerConn = new JSONObject(); + writerConn.put("table", new String[] {dataXParameters.getTargetTable()}); + writerConn.put("jdbcUrl", dataTargetCfg.getJdbcUrl()); + writerConnArr.add(writerConn); + + JSONObject writerParam = new JSONObject(); + writerParam.put("username", dataTargetCfg.getUser()); + writerParam.put("password", dataTargetCfg.getPassword()); + writerParam.put("column", + parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql())); + writerParam.put("connection", writerConnArr); + + if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) { + writerParam.put("preSql", dataXParameters.getPreStatements()); + } + + if (CollectionUtils.isNotEmpty(dataXParameters.getPostStatements())) { + writerParam.put("postSql", dataXParameters.getPostStatements()); + } + + JSONObject writer = new JSONObject(); + writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType())); + writer.put("parameter", writerParam); + + List contentList = new ArrayList<>(); + JSONObject content = new JSONObject(); + content.put("reader", reader); + content.put("writer", writer); + contentList.add(content); + + return contentList; + } + + /** + * build datax setting config + * + * @return + */ + private JSONObject buildDataxJobSettingJson() { + JSONObject speed = new JSONObject(); + speed.put("channel", DATAX_CHANNEL_COUNT); + + if (dataXParameters.getJobSpeedByte() > 0) { + speed.put("byte", dataXParameters.getJobSpeedByte()); + } + + if (dataXParameters.getJobSpeedRecord() > 0) { + speed.put("record", dataXParameters.getJobSpeedRecord()); + } + + JSONObject errorLimit = new JSONObject(); + errorLimit.put("record", 0); + errorLimit.put("percentage", 0); + + JSONObject setting = new JSONObject(); + setting.put("speed", speed); + setting.put("errorLimit", errorLimit); + + return setting; + } + + private JSONObject buildDataxCoreJson() { + JSONObject speed = new JSONObject(); + speed.put("channel", DATAX_CHANNEL_COUNT); + + if (dataXParameters.getJobSpeedByte() > 0) { + speed.put("byte", dataXParameters.getJobSpeedByte()); + } + + if (dataXParameters.getJobSpeedRecord() > 0) { + speed.put("record", dataXParameters.getJobSpeedRecord()); + } + + JSONObject channel = new JSONObject(); + channel.put("speed", speed); + + JSONObject transport = new JSONObject(); + transport.put("channel", channel); + + JSONObject core = new JSONObject(); + core.put("transport", transport); + + return core; + } + + /** + * create command + * + * @return + * @throws Exception + */ + private String buildShellCommandFile(String jobConfigFilePath) + throws Exception { + // generate scripts + String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId()); + Path path = new File(fileName).toPath(); + + if (Files.exists(path)) { + return fileName; + } + + // datax python command + StringBuilder sbr = new StringBuilder(); + sbr.append(DATAX_PYTHON); + sbr.append(" "); + sbr.append(DATAX_HOME_EVN); + sbr.append(" "); + sbr.append(jobConfigFilePath); + String dataxCommand = sbr.toString(); + + // find process instance by task id + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + + // combining local and global parameters + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), dataXParameters.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime()); + if (paramsMap != null) { + dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap)); + } + + logger.debug("raw script : {}", dataxCommand); + + // create shell command file + Set perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X); + FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); + Files.createFile(path, attr); + Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND); + + return fileName; + } + + /** + * parsing synchronized column names in SQL statements + * + * @param dsType + * the database type of the data source + * @param dtType + * the database type of the data target + * @param dataSourceCfg + * the database connection parameters of the data source + * @param sql + * sql for data synchronization + * @return + */ + private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) { + String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(dsType, sql); + + if (columnNames == null || columnNames.length == 0) { + logger.info("try to execute sql analysis query column name"); + columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql); + } + + notNull(columnNames, String.format("parsing sql columns failed : %s", sql)); + + return DataxUtils.convertKeywordsColumns(dtType, columnNames); + } + + /** + * try grammatical parsing column + * + * @param dbType + * database type + * @param sql + * sql for data synchronization + * @return column name array + * @throws RuntimeException + */ + private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) { + String[] columnNames; + + try { + SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql); + notNull(parser, String.format("database driver [%s] is not support", dbType.toString())); + + SQLStatement sqlStatement = parser.parseStatement(); + SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement; + SQLSelect sqlSelect = sqlSelectStatement.getSelect(); + + List selectItemList = null; + if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) { + SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery(); + selectItemList = block.getSelectList(); + } else if (sqlSelect.getQuery() instanceof SQLUnionQuery) { + SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery(); + SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight(); + selectItemList = block.getSelectList(); + } + + notNull(selectItemList, + String.format("select query type [%s] is not support", sqlSelect.getQuery().toString())); + + columnNames = new String[selectItemList.size()]; + for (int i = 0; i < selectItemList.size(); i++ ) { + SQLSelectItem item = selectItemList.get(i); + + String columnName = null; + + if (item.getAlias() != null) { + columnName = item.getAlias(); + } else if (item.getExpr() != null) { + if (item.getExpr() instanceof SQLPropertyExpr) { + SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr(); + columnName = expr.getName(); + } else if (item.getExpr() instanceof SQLIdentifierExpr) { + SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr(); + columnName = expr.getName(); + } + } else { + throw new RuntimeException( + String.format("grammatical analysis sql column [ %s ] failed", item.toString())); + } + + if (columnName == null) { + throw new RuntimeException( + String.format("grammatical analysis sql column [ %s ] failed", item.toString())); + } + + columnNames[i] = columnName; + } + } + catch (Exception e) { + logger.warn(e.getMessage(), e); + return null; + } + + return columnNames; + } + + /** + * try to execute sql to resolve column names + * + * @param baseDataSource + * the database connection parameters + * @param sql + * sql for data synchronization + * @return column name array + */ + public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) { + String[] columnNames; + sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql); + sql = sql.replace(";", ""); + + try ( + Connection connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), baseDataSource.getUser(), + baseDataSource.getPassword()); + PreparedStatement stmt = connection.prepareStatement(sql); + ResultSet resultSet = stmt.executeQuery()) { + + ResultSetMetaData md = resultSet.getMetaData(); + int num = md.getColumnCount(); + columnNames = new String[num]; + for (int i = 1; i <= num; i++ ) { + columnNames[i - 1] = md.getColumnName(i); + } + } + catch (SQLException e) { + logger.warn(e.getMessage(), e); + return null; + } + + return columnNames; + } + + @Override + public AbstractParameters getParameters() { + return dataXParameters; + } + + private void notNull(Object obj, String message) { + if (obj == null) { + throw new RuntimeException(message); + } + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java new file mode 100644 index 0000000000..2720bb8a28 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; +import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; +import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.junit.Assert; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * DataxUtils Tester. + */ +public class DataxUtilsTest { + + /** + * + * Method: getReaderPluginName(DbType dbType) + * + */ + @Test + public void testGetReaderPluginName() { + assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, DataxUtils.getReaderPluginName(DbType.MYSQL)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE)); + assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null); + } + + /** + * + * Method: getWriterPluginName(DbType dbType) + * + */ + @Test + public void testGetWriterPluginName() { + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, DataxUtils.getWriterPluginName(DbType.MYSQL)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE)); + assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null); + } + + /** + * + * Method: getSqlStatementParser(DbType dbType, String sql) + * + */ + @Test + public void testGetSqlStatementParser() throws Exception { + assertTrue(DataxUtils.getSqlStatementParser(DbType.MYSQL, "select 1") instanceof MySqlStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.POSTGRESQL, "select 1") instanceof PGSQLStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.ORACLE, "select 1") instanceof OracleStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.SQLSERVER, "select 1") instanceof SQLServerStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") == null); + } + + /** + * + * Method: convertKeywordsColumns(DbType dbType, String[] columns) + * + */ + @Test + public void testConvertKeywordsColumns() throws Exception { + String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "}; + String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"}; + + String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns); + + assertTrue(fromColumns.length == toColumns.length); + + for (int i = 0; i < toColumns.length; i++) { + assertEquals(targetColumns[i], toColumns[i]); + } + } + + /** + * + * Method: doConvertKeywordsColumn(DbType dbType, String column) + * + */ + @Test + public void testDoConvertKeywordsColumn() throws Exception { + assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" ")); + assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" ")); + assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" ")); + assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" ")); + assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" ")); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java new file mode 100644 index 0000000000..7a6073e05d --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.datax; + + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.job.db.BaseDataSource; +import org.apache.dolphinscheduler.common.job.db.DataSourceFactory; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.utils.DataxUtils; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +/** + * DataxTask Tester. + */ +public class DataxTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(DataxTaskTest.class); + + private DataxTask dataxTask; + + private ProcessDao processDao; + + private ShellCommandExecutor shellCommandExecutor; + + private ApplicationContext applicationContext; + + @Before + public void before() + throws Exception { + processDao = Mockito.mock(ProcessDao.class); + shellCommandExecutor = Mockito.mock(ShellCommandExecutor.class); + + applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessDao.class)).thenReturn(processDao); + + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + props.setEnvFile(".dolphinscheduler_env.sh"); + props.setTaskStartTime(new Date()); + props.setTaskTimeout(0); + props.setTaskParams( + "{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); + dataxTask = PowerMockito.spy(new DataxTask(props, logger)); + dataxTask.init(); + + Mockito.when(processDao.findDataSourceById(1)).thenReturn(getDataSource()); + Mockito.when(processDao.findDataSourceById(2)).thenReturn(getDataSource()); + Mockito.when(processDao.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); + + String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), props.getTaskAppId()); + Mockito.when(shellCommandExecutor.run(fileName, processDao)).thenReturn(0); + } + + private DataSource getDataSource() { + DataSource dataSource = new DataSource(); + dataSource.setType(DbType.MYSQL); + dataSource.setConnectionParams( + "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"); + dataSource.setUserId(1); + return dataSource; + } + + private ProcessInstance getProcessInstance() { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setCommandType(CommandType.START_PROCESS); + processInstance.setScheduleTime(new Date()); + return processInstance; + } + + @After + public void after() + throws Exception {} + + /** + * Method: DataxTask() + */ + @Test + public void testDataxTask() + throws Exception { + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + Assert.assertNotNull(new DataxTask(props, logger)); + } + + /** + * Method: init + */ + @Test + public void testInit() + throws Exception { + try { + dataxTask.init(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: handle() + */ + @Test + public void testHandle() + throws Exception { + try { + dataxTask.handle(); + } catch (RuntimeException e) { + if (e.getMessage().indexOf("process error . exitCode is : -1") < 0) { + Assert.fail(); + } + } + } + + /** + * Method: cancelApplication() + */ + @Test + public void testCancelApplication() + throws Exception { + try { + dataxTask.cancelApplication(true); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource + * dataSourceCfg, String sql) + */ + @Test + public void testParsingSqlColumnNames() + throws Exception { + try { + BaseDataSource dataSource = DataSourceFactory.getDatasource(getDataSource().getType(), + getDataSource().getConnectionParams()); + + Method method = DataxTask.class.getDeclaredMethod("parsingSqlColumnNames", DbType.class, DbType.class, BaseDataSource.class, String.class); + method.setAccessible(true); + String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, DbType.MYSQL, dataSource, "select 1 as a, 2 as `table` from dual"); + + Assert.assertNotNull(columns); + + Assert.assertTrue(columns.length == 2); + + Assert.assertEquals("[`a`, `table`]", Arrays.toString(columns)); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: tryGrammaticalParsingSqlColumnNames(DbType dbType, String sql) + */ + @Test + public void testTryGrammaticalAnalysisSqlColumnNames() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("tryGrammaticalAnalysisSqlColumnNames", DbType.class, String.class); + method.setAccessible(true); + String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, "select t1.a, t1.b from test t1 union all select a, t2.b from (select a, b from test) t2"); + + Assert.assertNotNull(columns); + + Assert.assertTrue(columns.length == 2); + + Assert.assertEquals("[a, b]", Arrays.toString(columns)); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, + * String sql) + */ + @Test + public void testTryExecuteSqlResolveColumnNames() + throws Exception { + // TODO: Test goes here... + } + + /** + * Method: buildDataxJsonFile() + */ + @Test + public void testBuildDataxJsonFile() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); + method.setAccessible(true); + String filePath = (String) method.invoke(dataxTask, null); + Assert.assertNotNull(filePath); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxJobContentJson() + */ + @Test + public void testBuildDataxJobContentJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJobContentJson"); + method.setAccessible(true); + List contentList = (List) method.invoke(dataxTask, null); + Assert.assertNotNull(contentList); + + JSONObject content = contentList.get(0); + JSONObject reader = (JSONObject) content.get("reader"); + Assert.assertNotNull(reader); + + String readerPluginName = (String) reader.get("name"); + Assert.assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, readerPluginName); + + JSONObject writer = (JSONObject) content.get("writer"); + Assert.assertNotNull(writer); + + String writerPluginName = (String) writer.get("name"); + Assert.assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, writerPluginName); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxJobSettingJson() + */ + @Test + public void testBuildDataxJobSettingJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJobSettingJson"); + method.setAccessible(true); + JSONObject setting = (JSONObject) method.invoke(dataxTask, null); + Assert.assertNotNull(setting); + Assert.assertNotNull(setting.get("speed")); + Assert.assertNotNull(setting.get("errorLimit")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxCoreJson() + */ + @Test + public void testBuildDataxCoreJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxCoreJson"); + method.setAccessible(true); + JSONObject coreConfig = (JSONObject) method.invoke(dataxTask, null); + Assert.assertNotNull(coreConfig); + Assert.assertNotNull(coreConfig.get("transport")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildShellCommandFile(String jobConfigFilePath) + */ + @Test + public void testBuildShellCommandFile() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class); + method.setAccessible(true); + Assert.assertNotNull(method.invoke(dataxTask, "test.json")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: getParameters + */ + @Test + public void testGetParameters() + throws Exception { + Assert.assertTrue(dataxTask.getParameters() != null); + } + + /** + * Method: notNull(Object obj, String message) + */ + @Test + public void testNotNull() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("notNull", Object.class, String.class); + method.setAccessible(true); + method.invoke(dataxTask, "abc", "test throw RuntimeException"); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js index 62ca8103e5..e8187043bf 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -279,6 +279,10 @@ let tasksType = { 'HTTP': { desc: 'HTTP', color: '#E46F13' + }, + 'DATAX': { + desc: 'DataX', + color: '#1fc747' } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index fbb4f418d0..420bae8c89 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -101,6 +101,9 @@ .icos-HTTP { background: url("../img/toobar_HTTP.png") no-repeat 50% 50%; } + .icos-DATAX { + background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%; + } .toolbar { width: 60px; height: 100%; diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 3f009eb75c..682dd5b51a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -191,7 +191,13 @@ ref="HTTP" :backfill-item="backfillItem"> - + +
@@ -216,6 +222,7 @@ import mProcedure from './tasks/procedure' import mDependent from './tasks/dependent' import mHttp from './tasks/http' + import mDatax from './tasks/datax' import mSubProcess from './tasks/sub_process' import mSelectInput from './_source/selectInput' import mTimeoutAlarm from './_source/timeoutAlarm' @@ -565,6 +572,7 @@ mPython, mDependent, mHttp, + mDatax, mSelectInput, mTimeoutAlarm, mPriority, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue new file mode 100755 index 0000000000..ce918f49cf --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue @@ -0,0 +1,292 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png new file mode 100644 index 0000000000000000000000000000000000000000..b59c7592081977f8ed36359df9a4467ce9d22f66 GIT binary patch literal 571 zcmeAS@N?(olHy`uVBq!ia0vp^3LwnF3?v&v(vJfvmUKs7M+SzC{oH>NS%G}U;vjb? zhIQv;UIIC)C9V-ADTyViR=N2pnQ4^_Mg~R(x&}tN1|}f}7FGu4R)&_^1_o9J1~w}t z6HzoI7o`IA;nv{$W#L_*2CV>}5LaXO^X>&cW?mP73=r~wk!TF}^OK^dRReYLl?3?( z|7YO#eSPQW@#1#%sVnC^-mq&T(DlZiE{-7@=aU6km=tddv9WzW-KAj0+;RUD1Djx1 z0HZLQMnM~ghmnfIj~pK1gG}r<60{^L9GE6?*f}3^SQKZS$uv&Wh8ss2c{p?CY>HVe*%ju&6gy?ZMurTA)f*m*v@Kxm zz9Fuc9u^atwB%sI6^UaHwj8)ny*1#O)8h+M+t~TIHaD_xMTm)+uo-Od&Ec5D!rh`1 zqLZQHkr8uaL!e~F$0pY^Nxcq?c{2~Rg*vo4YOBngrmr*O#yQ2y7OtBLn4G63Ft<-g zbexi~bHT%$!evi+!kr`}yds4oP6;21jAe@0E6Ee^_t2RITzu9XY;CCm%f1^jaVT>b zGDm)3Xgc3)yQm>>Cd0<-lHn=MlbNHfR4f@7Vv@a1Z#T7i1PoXPPgg&ebxsLQ0Dm98 A_y7O^ literal 0 HcmV?d00001 diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png new file mode 100755 index 0000000000000000000000000000000000000000..913af420c6c76ed051ecf44f27b522419fea7ecc GIT binary patch literal 3117 zcmZ{mc{J2*8^?c%Axj}4*&_-wW8agVvBZ!qWf{Xv%#a!To{+L+k0N3SHCY;iGK6?! ziwY&?5pT*e*^T`jo%fu#_c`Z1zjNK!`F+2i>wDjSUdcArrd;eo>;M38nVT8G8D0Fa z*g%XE7aqRGXv{wPR{8+&nfYto7p6lKVFtGXfbeqw5PKT{wi#0F4*&>)0>GR*0BAh` z0D*u<&9*uW1LSF8Y6Kh{e#I>nSqz00Z*FDGI(_6A6Ps$>ExlC$I96(Iq;K!kQJ5Q% zBJwG!ThGUvwiC5U>z!1OzArScq0w+2ZD938F#gmf+YA>W(u_n#88(}{#L1rRJS_eT zjDV>c&NM=s!_Ckv=a0F=S?-M3bYDLzG4|^E2J+YZFiZfY?LG5fTPH$+_Ji~8frun4>sN9;?U}N5jEP}O|=+Z>%+X&-BR<3ewJozT}H)RfV08q>d8j9eN4FoqY>^C%ntci|FrZXE-h& z(9Hu23w7{6;x<{AIM+{}EfNQg2 zxH5Z0JjY<3!?Rj$6jqXLWS*SOOW;dH3k^B>L3vXrcfVO2tar(5EI!=Yn(9aq(GCxa zP2Z5zx&kHaz@QB5-OUWRQ6GpFjS@RG~2*DAydGJkWH_SB76~dImw<-hiVW# zEwbnA6XK}K&8c{EZYE53IxyDqT(6t%c1ROtpEB6(S-UjHzFt+ck}=%9sjEZnZjpWc zDWGLlf*_gI9|`3>Cf0cHw}4_N;S~iWo&(G5_XeF@rr+V7y^#9J1{DQfcOg42?t~VF zBtMS^O|YdHRRzC; zfLs?@yLF@^VZ@;laC~LTLGE`|nHtrfrrrcy?fnZ`BH`C1` z&`Sr@M8gxZ%gLgo*@!Y;#d9(;`T2!8`IaWmxj{hyfzRO1$V%X3#CE=qrdlwQv~=7h zpjS!J>ZpgSEa&r2VG*#(`;_#M0f{M_`+?{Lx7XVibL-(9b$z*Er`rk?#z?*b#U0A^ zI7Ud(V+x(O!uoO@VkdZeWZk`BBw^a_v34TKpr=93mD+)6VpGAr)v>etz<19$v&ZY{ zXFp~zc}gg@h=v6eb#x@tUhh*wi@N(3PUo%|^Db+Awk*J3FqY!&Qc);ds})HKZ!sr1 zl)5~BlHFcs>7tMaZ?ZI1Gra5w#;+7ooht3;<^X@n);XA5&}d=E&FLatVeN}18p2&r z=7wpga!8`x^huS6TwfhH265vXT(`}nvZU?s?*6Wqn$a1rT zUK|eyjuxeEhHd3IHto=;XEQ7V7shT1k%W5e6TOto)gEF-@A-*YCZgT*`5UU8Riu4do$nQecve~rgdlcVKC z+vi#0qd4CeE&LL`f)rvp=z2`4 zfm)V0+n7~F)UBg1-?_A&NOW=a0!H-ZTq>M46UDbr=G+|;qgwb75mI@*6ZP4HSIg11 zU`Hn~UoGaLjjW}-=-DF6%mJ?&WJ*kohme9Q+;C#KlFX+~-mrW2MSiAw7Ob4vP`z`t zcK`90`HSCcexC5y)*JD5_w-~+bs$eRyUB5VsK*&SQPiOlY~<@|8qIodF1ZT0!LI9x zNV(M=Bk(hw z&L%O%wdw9`L`k}jAGff$=j0HJk~h&2mB}!>Zzcu3b@TduP&}`q%TFvwoh$H1(Y44e zJMFQi%7-OEXzZk>r7?>}aC!1H%p&PY@7z$*@zpNQ6uls`7TW>Sq2 zo7g%L&3AHg_SL7d;Sb8qWrYn2Y@;;=ibWT_W{!63_C8+t-o`cmqu+>)W!z3i^2ob} z?>;BhtVCkn%X+leD{RsRdt^qjakD$J(zBRxu<#je9=R*Nel0XDZ1c~K?03@d{-q=* zC--b7uX0NtU!Lw>l^4%yGNze5Lt19KET-EbX zW>XoQdquz1KAgZL$?Q6%#z*X!nvFJ)FM3dq*{PR%+g_gob7of=LS@@)St;d63a7b0 zIH%pt;pLT6d$&yHl)LYMrxd1UuRvZq(atrCKc!6K^hY{e=gqAcS2Ty$;ydtz{QM~d znKnB!?*Bpkee1We4UVGufkvXXxVQ)#^D{FQ1IY}j!hzwOg zm1OTca2Krk_isI!|hD4Q%imOnaO^lAnyaoNk%eSN#6zU5jaYWjqy_G)_5MG^jzgo3J6>n&UbEe%XOz@}nGWkUBWokP8r z+H+e4+HQi$;A5seffuQ^`NC;!U2=w>IL$JsY92)=oAB^**/dao/mapper/CommandMapperTest.java **/alert/template/AlertTemplateFactoryTest.java **/alert/template/impl/DefaultHTMLTemplateTest.java + **/server/worker/task/datax/DataxTaskTest.java + **/server/utils/DataxUtilsTest.java From c63cd3e237d7a32b7cad529dd99c4051abd8b86d Mon Sep 17 00:00:00 2001 From: dailidong Date: Thu, 6 Feb 2020 11:08:06 +0800 Subject: [PATCH 6/6] combine logback config of master/worker/alert/api server to one (#1898) * combine logback config of master/worker/alert/api server to one logback.xml --- .../conf/combined_logback.xml | 6 +- .../dolphinscheduler/conf/worker_logback.xml | 6 +- .../src/main/resources/alert_logback.xml | 49 ----- .../src/main/resources/apiserver_logback.xml | 60 ------ .../main/resources/application-api.properties | 2 - dolphinscheduler-common/pom.xml | 7 + .../common}/log/MasterLogFilter.java | 2 +- .../common}/log/SensitiveDataConverter.java | 6 +- .../common}/log/TaskLogDiscriminator.java | 4 +- .../common}/log/TaskLogFilter.java | 4 +- .../common}/log/WorkerLogFilter.java | 6 +- .../common}/utils/LoggerUtils.java | 9 +- .../common/utils/SensitiveLogUtils.java | 4 +- .../src/main/resources/logback.xml | 169 +++++++++++++++++ .../common/log/MasterLogFilterTest.java | 118 ++++++++++++ .../log/SensitiveDataConverterTest.java | 179 ++++++++++++++++++ .../common/log/TaskLogDiscriminatorTest.java | 155 +++++++++++++++ .../common/log/TaskLogFilterTest.java | 120 ++++++++++++ .../common/log/WorkerLogFilterTest.java | 119 ++++++++++++ .../common/utils/LoggerUtilsTest.java | 43 +++++ .../common/utils/SensitiveLogUtilsTest.java | 8 +- .../server/utils/ProcessUtils.java | 1 + .../worker/runner/TaskScheduleThread.java | 4 +- .../worker/task/AbstractCommandExecutor.java | 2 +- .../src/main/resources/master_logback.xml | 52 ----- .../src/main/resources/worker_logback.xml | 81 -------- .../log/SensitiveDataConverterTest.java | 92 --------- .../shell/ShellCommandExecutorTest.java | 2 +- .../server/worker/sql/SqlExecutorTest.java | 2 +- pom.xml | 1 + script/dolphinscheduler-daemon.sh | 12 +- 31 files changed, 955 insertions(+), 370 deletions(-) delete mode 100644 dolphinscheduler-alert/src/main/resources/alert_logback.xml delete mode 100644 dolphinscheduler-api/src/main/resources/apiserver_logback.xml rename {dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master => dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common}/log/MasterLogFilter.java (96%) rename {dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker => dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common}/log/SensitiveDataConverter.java (92%) rename {dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker => dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common}/log/TaskLogDiscriminator.java (94%) rename {dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker => dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common}/log/TaskLogFilter.java (93%) rename {dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker => dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common}/log/WorkerLogFilter.java (91%) rename {dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server => dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common}/utils/LoggerUtils.java (91%) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java => dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtils.java (93%) create mode 100644 dolphinscheduler-common/src/main/resources/logback.xml create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/MasterLogFilterTest.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverterTest.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminatorTest.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogFilterTest.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/WorkerLogFilterTest.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java rename dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java => dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtilsTest.java (83%) delete mode 100644 dolphinscheduler-server/src/main/resources/master_logback.xml delete mode 100644 dolphinscheduler-server/src/main/resources/worker_logback.xml delete mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java diff --git a/dockerfile/conf/dolphinscheduler/conf/combined_logback.xml b/dockerfile/conf/dolphinscheduler/conf/combined_logback.xml index 6bdb97cf00..7a9a5b4621 100644 --- a/dockerfile/conf/dolphinscheduler/conf/combined_logback.xml +++ b/dockerfile/conf/dolphinscheduler/conf/combined_logback.xml @@ -31,8 +31,8 @@ INFO - - + + taskAppId ${log.base} @@ -52,7 +52,7 @@ ${log.base}/dolphinscheduler-combined.log - + INFO diff --git a/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml b/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml index 9bbd9615c4..bf4a651e7c 100644 --- a/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml +++ b/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml @@ -31,8 +31,8 @@ INFO - - + + taskAppId ${log.base} @@ -52,7 +52,7 @@ ${log.base}/dolphinscheduler-worker.log - + INFO diff --git a/dolphinscheduler-alert/src/main/resources/alert_logback.xml b/dolphinscheduler-alert/src/main/resources/alert_logback.xml deleted file mode 100644 index 3474df8d2e..0000000000 --- a/dolphinscheduler-alert/src/main/resources/alert_logback.xml +++ /dev/null @@ -1,49 +0,0 @@ - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - ${log.base}/dolphinscheduler-alert.log - - ${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log - 20 - 64MB - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/resources/apiserver_logback.xml b/dolphinscheduler-api/src/main/resources/apiserver_logback.xml deleted file mode 100644 index 4c2ce0d2b3..0000000000 --- a/dolphinscheduler-api/src/main/resources/apiserver_logback.xml +++ /dev/null @@ -1,60 +0,0 @@ - - - - - - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - INFO - - ${log.base}/dolphinscheduler-api-server.log - - ${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log - 168 - 64MB - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/resources/application-api.properties b/dolphinscheduler-api/src/main/resources/application-api.properties index 6ec304a397..21f940baaf 100644 --- a/dolphinscheduler-api/src/main/resources/application-api.properties +++ b/dolphinscheduler-api/src/main/resources/application-api.properties @@ -15,8 +15,6 @@ # limitations under the License. # -logging.config=classpath:apiserver_logback.xml - # server port server.port=12345 diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 955e956251..bd2448eee7 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -29,6 +29,7 @@ jar UTF-8 + 3.1.0 @@ -604,5 +605,11 @@ org.springframework spring-context + + + org.codehaus.janino + janino + ${codehaus.janino.version} + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/MasterLogFilter.java similarity index 96% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/MasterLogFilter.java index d6de484074..7b5d53a032 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/MasterLogFilter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverter.java similarity index 92% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverter.java index 4a98e66c6c..971ce7149c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverter.java @@ -14,14 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.pattern.MessageConverter; import ch.qos.logback.classic.spi.ILoggingEvent; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.SensitiveLogUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -77,7 +77,7 @@ public class SensitiveDataConverter extends MessageConverter { String password = matcher.group(); - String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password); + String maskPassword = SensitiveLogUtils.maskDataSourcePwd(password); matcher.appendReplacement(sb, maskPassword); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminator.java similarity index 94% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminator.java index fa00aed772..fd2b0766a8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminator.java @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.sift.AbstractDiscriminator; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; /** * Task Log Discriminator diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogFilter.java similarity index 93% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogFilter.java index 6398135481..ac258daf20 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogFilter.java @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; /** * task log filter diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/WorkerLogFilter.java similarity index 91% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/WorkerLogFilter.java index 23758f918a..abcc8bc619 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/WorkerLogFilter.java @@ -14,12 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; + +import java.util.Arrays; /** * worker log filter @@ -40,6 +43,7 @@ public class WorkerLogFilter extends Filter { if (event.getThreadName().startsWith("Worker-")){ return FilterReply.ACCEPT; } + return FilterReply.DENY; } public void setLevel(String level) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java similarity index 91% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index 8b40d943c0..fc08eb645b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; import org.slf4j.Logger; @@ -44,6 +44,11 @@ public class LoggerUtils { */ public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo"; + /** + * Task Logger Thread's name + */ + public static final String TASK_APPID_LOG_FORMAT = "[taskAppId="; + /** * build job id * @@ -58,7 +63,7 @@ public class LoggerUtils { int processInstId, int taskId){ // - [taskAppId=TASK_79_4084_15210] - return String.format(" - [taskAppId=%s-%s-%s-%s]",affix, + return String.format(" - %s%s-%s-%s-%s]",TASK_APPID_LOG_FORMAT,affix, processDefId, processInstId, taskId); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtils.java similarity index 93% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtils.java index 948e92cb24..eab6c4f124 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtils.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.common.utils; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.Constants; /** * sensitive log Util */ -public class SensitiveLogUtil { +public class SensitiveLogUtils { /** * @param dataSourcePwd data source password diff --git a/dolphinscheduler-common/src/main/resources/logback.xml b/dolphinscheduler-common/src/main/resources/logback.xml new file mode 100644 index 0000000000..7f634da975 --- /dev/null +++ b/dolphinscheduler-common/src/main/resources/logback.xml @@ -0,0 +1,169 @@ + + + + + + + + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + + + + + + ${log.base}/dolphinscheduler-master.log + + + ${log.base}/dolphinscheduler-master.%d{yyyy-MM-dd_HH}.%i.log + 168 + 200MB + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + + + + + + + + + INFO + + + + taskAppId + ${log.base} + + + + ${log.base}/${taskAppId}.log + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n + + UTF-8 + + true + + + + + ${log.base}/dolphinscheduler-worker.log + + INFO + + + + ${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log + 168 + 200MB + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n + + UTF-8 + + + + + + + + ${log.base}/dolphinscheduler-alert.log + + ${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log + 20 + 64MB + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + + + + + + + ${log.base}/dolphinscheduler-api-server.log + + INFO + + + ${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log + 168 + 64MB + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/MasterLogFilterTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/MasterLogFilterTest.java new file mode 100644 index 0000000000..8cf6cfc2df --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/MasterLogFilterTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import ch.qos.logback.core.spi.FilterReply; +import org.apache.dolphinscheduler.common.Constants; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Marker; +import java.util.Map; + +public class MasterLogFilterTest { + + @Test + public void decide() { + MasterLogFilter masterLogFilter = new MasterLogFilter(); + + + FilterReply filterReply = masterLogFilter.decide(new ILoggingEvent() { + @Override + public String getThreadName() { + return Constants.THREAD_NAME_MASTER_SERVER; + } + + @Override + public Level getLevel() { + return Level.INFO; + } + + @Override + public String getMessage() { + return "master insert into queue success, task : shell2"; +// return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed"; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return "master insert into queue success, task : shell2"; + } + + @Override + public String getLoggerName() { + return null; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + + Assert.assertEquals(FilterReply.ACCEPT, filterReply); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverterTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverterTest.java new file mode 100644 index 0000000000..727ab41002 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverterTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.SensitiveLogUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SensitiveDataConverterTest { + + private final Logger logger = LoggerFactory.getLogger(SensitiveDataConverterTest.class); + + /** + * password pattern + */ + private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX); + + private final String logMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + + "\"database\":\"carbond\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + + "\"user\":\"view\"," + + "\"password\":\"view1\"}"; + + private final String maskLogMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + + "\"database\":\"carbond\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + + "\"user\":\"view\"," + + "\"password\":\"******\"}"; + @Test + public void convert() { + SensitiveDataConverter sensitiveDataConverter = new SensitiveDataConverter(); + String result = sensitiveDataConverter.convert(new ILoggingEvent() { + @Override + public String getThreadName() { + return null; + } + + @Override + public Level getLevel() { + return Level.INFO; + } + + @Override + public String getMessage() { + return null; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return logMsg; + } + + @Override + public String getLoggerName() { + return null; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + + Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg)); + + } + + /** + * mask sensitive logMsg - sql task datasource password + */ + @Test + public void testPwdLogMsgConverter() { + logger.info("parameter : {}", logMsg); + logger.info("parameter : {}", passwordHandler(pwdPattern, logMsg)); + + Assert.assertNotEquals(logMsg, passwordHandler(pwdPattern, logMsg)); + Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg)); + + } + + /** + * password regex test + * + * @param logMsg original log + */ + private static String passwordHandler(Pattern pattern, String logMsg) { + + Matcher matcher = pattern.matcher(logMsg); + + StringBuffer sb = new StringBuffer(logMsg.length()); + + while (matcher.find()) { + + String password = matcher.group(); + + String maskPassword = SensitiveLogUtils.maskDataSourcePwd(password); + + matcher.appendReplacement(sb, maskPassword); + } + matcher.appendTail(sb); + + return sb.toString(); + } + + + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminatorTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminatorTest.java new file mode 100644 index 0000000000..8745a4f6b4 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminatorTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Marker; + +import java.util.Map; + +import static org.junit.Assert.*; + +public class TaskLogDiscriminatorTest { + + /** + * log base + */ + private String logBase = "logs"; + + TaskLogDiscriminator taskLogDiscriminator; + + @Before + public void before(){ + taskLogDiscriminator = new TaskLogDiscriminator(); + taskLogDiscriminator.setLogBase("logs"); + taskLogDiscriminator.setKey("123"); + } + + @Test + public void getDiscriminatingValue() { + String result = taskLogDiscriminator.getDiscriminatingValue(new ILoggingEvent() { + @Override + public String getThreadName() { + return null; + } + + @Override + public Level getLevel() { + return null; + } + + @Override + public String getMessage() { + return null; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return null; + } + + @Override + public String getLoggerName() { + return "[taskAppId=TASK-1-1-1"; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + Assert.assertEquals("1/1/", result); + } + + @Test + public void start() { + taskLogDiscriminator.start(); + Assert.assertEquals(true, taskLogDiscriminator.isStarted()); + } + + @Test + public void getKey() { + Assert.assertEquals("123", taskLogDiscriminator.getKey()); + } + + @Test + public void setKey() { + + taskLogDiscriminator.setKey("123"); + } + + @Test + public void getLogBase() { + Assert.assertEquals("logs", taskLogDiscriminator.getLogBase()); + } + + @Test + public void setLogBase() { + taskLogDiscriminator.setLogBase("logs"); + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogFilterTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogFilterTest.java new file mode 100644 index 0000000000..52767074da --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogFilterTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import ch.qos.logback.core.spi.FilterReply; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Marker; + +import java.util.Map; + + +public class TaskLogFilterTest { + + @Test + public void decide() { + TaskLogFilter taskLogFilter = new TaskLogFilter(); + + + FilterReply filterReply = taskLogFilter.decide(new ILoggingEvent() { + @Override + public String getThreadName() { + return LoggerUtils.TASK_LOGGER_THREAD_NAME; + } + + @Override + public Level getLevel() { + return Level.INFO; + } + + @Override + public String getMessage() { + return "raw script : echo 222"; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return "raw script : echo 222"; + } + + @Override + public String getLoggerName() { + return null; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + + Assert.assertEquals(FilterReply.ACCEPT, filterReply); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/WorkerLogFilterTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/WorkerLogFilterTest.java new file mode 100644 index 0000000000..90b154407f --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/WorkerLogFilterTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import ch.qos.logback.core.spi.FilterReply; +import org.apache.dolphinscheduler.common.Constants; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Marker; + +import java.util.Map; + + +public class WorkerLogFilterTest { + + @Test + public void decide() { + WorkerLogFilter workerLogFilter = new WorkerLogFilter(); + + + FilterReply filterReply = workerLogFilter.decide(new ILoggingEvent() { + @Override + public String getThreadName() { + return Constants.THREAD_NAME_WORKER_SERVER; + } + + @Override + public Level getLevel() { + return Level.INFO; + } + + @Override + public String getMessage() { + return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed"; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed"; + } + + @Override + public String getLoggerName() { + return null; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + + Assert.assertEquals(FilterReply.ACCEPT, filterReply); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java new file mode 100644 index 0000000000..5a80e388ba --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class LoggerUtilsTest { + private Logger logger = LoggerFactory.getLogger(LoggerUtilsTest.class); + + @Test + public void buildTaskId() { + + String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,79,4084,15210); + + Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId); + } + + @Test + public void getAppIds() { + List appIdList = LoggerUtils.getAppIds("Running job: application_1_1",logger); + Assert.assertEquals("application_1_1", appIdList.get(0)); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtilsTest.java similarity index 83% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java rename to dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtilsTest.java index 2e5bfcf3e5..03880b69cc 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtilsTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; @@ -22,7 +22,7 @@ import org.junit.Assert; import org.junit.Test; -public class SensitiveLogUtilTest { +public class SensitiveLogUtilsTest { @Test public void testMaskDataSourcePwd() { @@ -30,8 +30,8 @@ public class SensitiveLogUtilTest { String password = "123456"; String emptyPassword = ""; - Assert.assertEquals(Constants.PASSWORD_DEFAULT, SensitiveLogUtil.maskDataSourcePwd(password)); - Assert.assertEquals("", SensitiveLogUtil.maskDataSourcePwd(emptyPassword)); + Assert.assertEquals(Constants.PASSWORD_DEFAULT, SensitiveLogUtils.maskDataSourcePwd(password)); + Assert.assertEquals("", SensitiveLogUtils.maskDataSourcePwd(emptyPassword)); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 0b621a9bb0..fd0a08cd8e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 5f66c3477d..f179d6344a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -35,8 +35,8 @@ import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.permission.PermissionCheck; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; -import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 8774186bc5..0c62f0648a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.slf4j.Logger; diff --git a/dolphinscheduler-server/src/main/resources/master_logback.xml b/dolphinscheduler-server/src/main/resources/master_logback.xml deleted file mode 100644 index 54c3cf5781..0000000000 --- a/dolphinscheduler-server/src/main/resources/master_logback.xml +++ /dev/null @@ -1,52 +0,0 @@ - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - ${log.base}/dolphinscheduler-master.log - - INFO - - - ${log.base}/dolphinscheduler-master.%d{yyyy-MM-dd_HH}.%i.log - 168 - 200MB - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/resources/worker_logback.xml b/dolphinscheduler-server/src/main/resources/worker_logback.xml deleted file mode 100644 index 7ba0c9b8ab..0000000000 --- a/dolphinscheduler-server/src/main/resources/worker_logback.xml +++ /dev/null @@ -1,81 +0,0 @@ - - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - INFO - - - - taskAppId - ${log.base} - - - - ${log.base}/${taskAppId}.log - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - true - - - - - - ${log.base}/dolphinscheduler-worker.log - - INFO - - - - ${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log - 168 - 200MB - -       - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - -    - - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java deleted file mode 100644 index fb564a22fb..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.worker.log; - - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class SensitiveDataConverterTest { - - private final Logger logger = LoggerFactory.getLogger(SensitiveDataConverterTest.class); - - /** - * password pattern - */ - private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX); - - - /** - * mask sensitive logMsg - sql task datasource password - */ - @Test - public void testPwdLogMsgConverter() { - - String logMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + - "\"database\":\"carbond\"," + - "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + - "\"user\":\"view\"," + - "\"password\":\"view1\"}"; - - String maskLogMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + - "\"database\":\"carbond\"," + - "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + - "\"user\":\"view\"," + - "\"password\":\"******\"}"; - - - logger.info("parameter : {}", logMsg); - logger.info("parameter : {}", passwordHandler(pwdPattern, logMsg)); - - Assert.assertNotEquals(logMsg, passwordHandler(pwdPattern, logMsg)); - Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg)); - - } - - /** - * password regex test - * - * @param logMsg original log - */ - private static String passwordHandler(Pattern pattern, String logMsg) { - - Matcher matcher = pattern.matcher(logMsg); - - StringBuffer sb = new StringBuffer(logMsg.length()); - - while (matcher.find()) { - - String password = matcher.group(); - - String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password); - - matcher.appendReplacement(sb, maskPassword); - } - matcher.appendTail(sb); - - return sb.toString(); - } - - -} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index 1117fe0015..04c844827f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java index 7cf4b874d1..7da3f710b6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; diff --git a/pom.xml b/pom.xml index 0ff2f65f60..59278f73cd 100644 --- a/pom.xml +++ b/pom.xml @@ -675,6 +675,7 @@ **/common/utils/*.java + **/common/log/*.java **/common/threadutils/*.java **/common/graph/*.java **/common/queue/*.java diff --git a/script/dolphinscheduler-daemon.sh b/script/dolphinscheduler-daemon.sh index b3310c384f..d4db103fe1 100644 --- a/script/dolphinscheduler-daemon.sh +++ b/script/dolphinscheduler-daemon.sh @@ -57,16 +57,16 @@ pid=$DOLPHINSCHEDULER_LOG_DIR/dolphinscheduler-$command.pid cd $DOLPHINSCHEDULER_HOME if [ "$command" = "api-server" ]; then - LOG_FILE="-Dlogging.config=classpath:apiserver_logback.xml -Dspring.profiles.active=api" + LOG_FILE="-Dserver=api-server -Dspring.profiles.active=api" CLASS=org.apache.dolphinscheduler.api.ApiApplicationServer elif [ "$command" = "master-server" ]; then - LOG_FILE="-Dlogging.config=classpath:master_logback.xml -Ddruid.mysql.usePingMethod=false" + LOG_FILE="-Dserver=master-server -Ddruid.mysql.usePingMethod=false" CLASS=org.apache.dolphinscheduler.server.master.MasterServer elif [ "$command" = "worker-server" ]; then - LOG_FILE="-Dlogging.config=classpath:worker_logback.xml -Ddruid.mysql.usePingMethod=false" + LOG_FILE="-Dserver=worker-server -Ddruid.mysql.usePingMethod=false" CLASS=org.apache.dolphinscheduler.server.worker.WorkerServer elif [ "$command" = "alert-server" ]; then - LOG_FILE="-Dlogback.configurationFile=conf/alert_logback.xml" + LOG_FILE="-Dserver=alert-server" CLASS=org.apache.dolphinscheduler.alert.AlertServer elif [ "$command" = "logger-server" ]; then CLASS=org.apache.dolphinscheduler.server.rpc.LoggerServer @@ -93,8 +93,8 @@ case $startStop in exec_command="$LOG_FILE $DOLPHINSCHEDULER_OPTS -classpath $DOLPHINSCHEDULER_CONF_DIR:$DOLPHINSCHEDULER_LIB_JARS $CLASS" - echo "nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 < /dev/null &" - nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 < /dev/null & + echo "nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 > /dev/null &" + nohup $JAVA_HOME/bin/java $exec_command > $log 2>&1 > /dev/null & echo $! > $pid ;;