diff --git a/README.md b/README.md index 27b5312f32..3fbd6345b6 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ Overload processing: Task queue mechanism, the number of schedulable tasks on a - [**User manual**](https://dolphinscheduler.apache.org/en-us/docs/1.2.0/user_doc/system-manual.html?_blank "System manual") -- [**Upgrade document**](https://dolphinscheduler.apache.org/en-us/docs/1.2.0/release/upgrade.html?_blank "Upgrade document") +- [**Upgrade document**](https://dolphinscheduler.apache.org/en-us/docs/1.2.0/user_doc/upgrade.html?_blank "Upgrade document") - Online Demo @@ -99,7 +99,7 @@ It is because of the shoulders of these open source projects that the birth of t ### Get Help 1. Submit an issue -1. Subscribe the mail list : https://dolphinscheduler.apache.org/en-us/docs/1.2.0/user_doc/subscribe.html. then send mail to dev@dolphinscheduler.apache.org +1. Subscribe the mail list : https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html. then send mail to dev@dolphinscheduler.apache.org 1. Contact WeChat group manager, ID 510570367. This is for Mandarin(CN) discussion. ### License diff --git a/README_zh_CN.md b/README_zh_CN.md index c7173ad9ba..e782c1030d 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -50,7 +50,7 @@ Dolphin Scheduler Official Website - [**使用手册**](https://dolphinscheduler.apache.org/zh-cn/docs/1.2.0/user_doc/system-manual.html?_blank "系统使用手册") -- [**升级文档**](https://dolphinscheduler.apache.org/zh-cn/docs/1.2.0/release/upgrade.html?_blank "升级文档") +- [**升级文档**](https://dolphinscheduler.apache.org/zh-cn/docs/1.2.0/user_doc/upgrade.html?_blank "升级文档") - 我要体验 @@ -83,13 +83,12 @@ dolphinscheduler-dist/dolphinscheduler-src/target/apache-dolphinscheduler-incuba ### 感谢 Dolphin Scheduler使用了很多优秀的开源项目,比如google的guava、guice、grpc,netty,ali的bonecp,quartz,以及apache的众多开源项目等等, -正是由于站在这些开源项目的肩膀上,才有Dolphin Scheduler的诞生的可能。对此我们对使用的所有开源软件表示非常的感谢!我们也希望自己不仅是开源的受益者,也能成为开源的 -贡献者,于是我们决定把易调度贡献出来,并承诺长期维护。也希望对开源有同样热情和信念的伙伴加入进来,一起为开源献出一份力! +正是由于站在这些开源项目的肩膀上,才有Dolphin Scheduler的诞生的可能。对此我们对使用的所有开源软件表示非常的感谢!我们也希望自己不仅是开源的受益者,也能成为开源的贡献者,也希望对开源有同样热情和信念的伙伴加入进来,一起为开源献出一份力! ### 获得帮助 1. 提交issue -1. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/docs/1.2.0/user_doc/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org. +1. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/docs/development/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org. 1. 联系微信群助手(ID:dailidong66). 微信仅用于中国用户讨论. ### 版权 diff --git a/dockerfile/conf/dolphinscheduler/conf/combined_logback.xml b/dockerfile/conf/dolphinscheduler/conf/combined_logback.xml index 6bdb97cf00..7a9a5b4621 100644 --- a/dockerfile/conf/dolphinscheduler/conf/combined_logback.xml +++ b/dockerfile/conf/dolphinscheduler/conf/combined_logback.xml @@ -31,8 +31,8 @@ INFO - - + + taskAppId ${log.base} @@ -52,7 +52,7 @@ ${log.base}/dolphinscheduler-combined.log - + INFO diff --git a/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh b/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh index 960d971dd8..8e842fe28e 100644 --- a/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh +++ b/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh @@ -18,3 +18,4 @@ export PYTHON_HOME=/usr/bin/python export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export PATH=$PYTHON_HOME:$JAVA_HOME/bin:$PATH +export DATAX_HOME=/opt/datax/bin/datax.py \ No newline at end of file diff --git a/dockerfile/conf/dolphinscheduler/conf/quartz.properties b/dockerfile/conf/dolphinscheduler/conf/quartz.properties index 21ebd5e29d..a83abad5bc 100644 --- a/dockerfile/conf/dolphinscheduler/conf/quartz.properties +++ b/dockerfile/conf/dolphinscheduler/conf/quartz.properties @@ -47,7 +47,7 @@ org.quartz.jobStore.dataSource = myDs #============================================================================ # Configure Datasources #============================================================================ -org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.server.quartz.DruidConnectionProvider +org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.dao.quartz.DruidConnectionProvider org.quartz.dataSource.myDs.driver = org.postgresql.Driver org.quartz.dataSource.myDs.URL=jdbc:postgresql://127.0.0.1:5432/dolphinscheduler org.quartz.dataSource.myDs.user=root diff --git a/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml b/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml index 9bbd9615c4..bf4a651e7c 100644 --- a/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml +++ b/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml @@ -31,8 +31,8 @@ INFO - - + + taskAppId ${log.base} @@ -52,7 +52,7 @@ ${log.base}/dolphinscheduler-worker.log - + INFO diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java index d54c2327be..5feb36b60f 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java @@ -101,6 +101,11 @@ public class AlertSender{ }else if (alert.getAlertType() == AlertType.SMS){ retMaps = emailManager.send(getReciversForSMS(users), alert.getTitle(), alert.getContent(),alert.getShowType()); alert.setInfo(retMaps); + } else { + logger.error("AlertType is not defined. code: {}, descp: {}", + alert.getAlertType().getCode(), + alert.getAlertType().getDescp()); + return; } //send flag diff --git a/dolphinscheduler-alert/src/main/resources/alert_logback.xml b/dolphinscheduler-alert/src/main/resources/alert_logback.xml deleted file mode 100644 index 3474df8d2e..0000000000 --- a/dolphinscheduler-alert/src/main/resources/alert_logback.xml +++ /dev/null @@ -1,49 +0,0 @@ - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - ${log.base}/dolphinscheduler-alert.log - - ${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log - 20 - 64MB - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml index ae28a48bb5..c10f443384 100644 --- a/dolphinscheduler-api/pom.xml +++ b/dolphinscheduler-api/pom.xml @@ -31,35 +31,19 @@ org.apache.dolphinscheduler dolphinscheduler-alert - - - - org.apache.dolphinscheduler - dolphinscheduler-server - io.netty - netty - - - io.netty - netty-all - - - com.google - netty - - - leveldbjni-all - org.fusesource.leveldbjni - - - protobuf-java - com.google.protobuf + org.apache.dolphinscheduler + dolphinscheduler-dao + + org.apache.dolphinscheduler + dolphinscheduler-dao + + org.springframework.boot @@ -92,8 +76,6 @@ - - org.springframework.boot spring-boot-starter-aop @@ -193,13 +175,13 @@ - tomcat - jasper-runtime + javax.servlet + servlet-api - javax.servlet - servlet-api + org.mortbay.jetty + jsp-2.1 diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index c1689c5bec..257f15d580 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.server.utils.ScheduleUtils; +import org.apache.dolphinscheduler.dao.utils.cron.CronUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -512,8 +512,7 @@ public class ExecutorService extends BaseService{ List listDate = new LinkedList<>(); if(!CollectionUtils.isEmpty(schedules)){ for (Schedule item : schedules) { - List list = ScheduleUtils.getRecentTriggerTime(item.getCrontab(), start, end); - listDate.addAll(list); + listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab())); } } if(!CollectionUtils.isEmpty(listDate)){ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java index 3c4a42e6cf..bdce9470ca 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java @@ -35,8 +35,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.utils.cron.CronUtils; -import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob; -import org.apache.dolphinscheduler.server.quartz.QuartzExecutors; +import org.apache.dolphinscheduler.dao.quartz.ProcessScheduleJob; +import org.apache.dolphinscheduler.dao.quartz.QuartzExecutors; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.quartz.CronExpression; @@ -167,6 +167,7 @@ public class SchedulerService extends BaseService { processDefinitionMapper.updateById(processDefinition); putMsg(result, Status.SUCCESS); + result.put("scheduleId", scheduleObj.getId()); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java index d41830eff5..b04e773aea 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java @@ -59,23 +59,22 @@ public class FourLetterWordMain { */ public static String send4LetterWord(String host, int port, String cmd, int timeout) throws IOException { - LOG.info("connecting to " + host + " " + port); - Socket sock = new Socket(); + LOG.info("connecting to {} {}", host, port); InetSocketAddress hostaddress= host != null ? new InetSocketAddress(host, port) : new InetSocketAddress(InetAddress.getByName(null), port); - BufferedReader reader = null; - try { + + try (Socket sock = new Socket(); + OutputStream outstream = sock.getOutputStream(); + BufferedReader reader = + new BufferedReader( + new InputStreamReader(sock.getInputStream()))) { sock.setSoTimeout(timeout); sock.connect(hostaddress, timeout); - OutputStream outstream = sock.getOutputStream(); outstream.write(cmd.getBytes()); outstream.flush(); // this replicates NC - close the output stream before reading sock.shutdownOutput(); - reader = - new BufferedReader( - new InputStreamReader(sock.getInputStream())); StringBuilder sb = new StringBuilder(); String line; while((line = reader.readLine()) != null) { @@ -84,11 +83,6 @@ public class FourLetterWordMain { return sb.toString(); } catch (SocketTimeoutException e) { throw new IOException("Exception while executing four letter word: " + cmd, e); - } finally { - sock.close(); - if (reader != null) { - reader.close(); - } } } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java index 968a51ad63..5aa6be858a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java @@ -145,7 +145,7 @@ public class ZooKeeperState { sendThread.setName("FourLetterCmd:" + cmd); sendThread.start(); try { - sendThread.join(waitTimeout * 1000); + sendThread.join(waitTimeout * 1000L); return sendThread.ret; } catch (InterruptedException e) { logger.error("send " + cmd + " to server " + host + ":" + port + " failed!", e); diff --git a/dolphinscheduler-api/src/main/resources/apiserver_logback.xml b/dolphinscheduler-api/src/main/resources/apiserver_logback.xml deleted file mode 100644 index 4c2ce0d2b3..0000000000 --- a/dolphinscheduler-api/src/main/resources/apiserver_logback.xml +++ /dev/null @@ -1,60 +0,0 @@ - - - - - - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - INFO - - ${log.base}/dolphinscheduler-api-server.log - - ${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log - 168 - 64MB - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/resources/application-api.properties b/dolphinscheduler-api/src/main/resources/application-api.properties index 6ec304a397..21f940baaf 100644 --- a/dolphinscheduler-api/src/main/resources/application-api.properties +++ b/dolphinscheduler-api/src/main/resources/application-api.properties @@ -15,8 +15,6 @@ # limitations under the License. # -logging.config=classpath:apiserver_logback.xml - # server port server.port=12345 diff --git a/dolphinscheduler-api/src/main/resources/combined_logback.xml b/dolphinscheduler-api/src/main/resources/combined_logback.xml deleted file mode 100644 index ece13d04b9..0000000000 --- a/dolphinscheduler-api/src/main/resources/combined_logback.xml +++ /dev/null @@ -1,80 +0,0 @@ - - - - - - - - - - %highlight([%level]) %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{10}:[%line] - %msg%n - - UTF-8 - - - - - INFO - - - - taskAppId - ${log.base} - - - - ${log.base}/${taskAppId}.log - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - true - - - - - - ${log.base}/dolphinscheduler-combined.log - - INFO - - - - ${log.base}/dolphinscheduler-combined.%d{yyyy-MM-dd_HH}.%i.log - 168 - 200MB - -       - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - -    - - - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index b4f3e7e31f..66c7a3ebab 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -203,7 +203,7 @@ public class ExecutorService2Test { "", "", RunMode.RUN_MODE_PARALLEL, Priority.LOW, 0, 110); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processDao, times(16)).createCommand(any(Command.class)); + verify(processDao, times(15)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 955e956251..bd2448eee7 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -29,6 +29,7 @@ jar UTF-8 + 3.1.0 @@ -604,5 +605,11 @@ org.springframework spring-context + + + org.codehaus.janino + janino + ${codehaus.janino.version} + diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index 45f36883e3..b996c3aec9 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -33,6 +33,7 @@ public enum TaskType { * 7 DEPENDENT * 8 FLINK * 9 HTTP + * 10 DATAX */ SHELL(0, "shell"), SQL(1, "sql"), @@ -43,7 +44,8 @@ public enum TaskType { PYTHON(6, "python"), DEPENDENT(7, "dependent"), FLINK(8, "flink"), - HTTP(9, "http"); + HTTP(9, "http"), + DATAX(10, "datax"); TaskType(int code, String descp){ this.code = code; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/MasterLogFilter.java similarity index 96% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/MasterLogFilter.java index d6de484074..7b5d53a032 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/log/MasterLogFilter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/MasterLogFilter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverter.java similarity index 92% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverter.java index 4a98e66c6c..971ce7149c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverter.java @@ -14,14 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.pattern.MessageConverter; import ch.qos.logback.classic.spi.ILoggingEvent; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.SensitiveLogUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -77,7 +77,7 @@ public class SensitiveDataConverter extends MessageConverter { String password = matcher.group(); - String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password); + String maskPassword = SensitiveLogUtils.maskDataSourcePwd(password); matcher.appendReplacement(sb, maskPassword); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminator.java similarity index 94% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminator.java index fa00aed772..fd2b0766a8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogDiscriminator.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminator.java @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.sift.AbstractDiscriminator; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; /** * Task Log Discriminator diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogFilter.java similarity index 93% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogFilter.java index 6398135481..ac258daf20 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskLogFilter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/TaskLogFilter.java @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; /** * task log filter diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/WorkerLogFilter.java similarity index 91% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/WorkerLogFilter.java index 23758f918a..abcc8bc619 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/WorkerLogFilter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/WorkerLogFilter.java @@ -14,12 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.log; +package org.apache.dolphinscheduler.common.log; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; + +import java.util.Arrays; /** * worker log filter @@ -40,6 +43,7 @@ public class WorkerLogFilter extends Filter { if (event.getThreadName().startsWith("Worker-")){ return FilterReply.ACCEPT; } + return FilterReply.DENY; } public void setLevel(String level) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java index 3519d5c535..e936a444d0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.common.model; import java.util.Date; +import java.util.Objects; /** * date interval class @@ -35,12 +36,14 @@ public class DateInterval { @Override public boolean equals(Object obj) { - try{ - DateInterval dateInterval = (DateInterval) obj; - return startTime.equals(dateInterval.getStartTime()) && - endTime.equals(dateInterval.getEndTime()); - }catch (Exception e){ + if (obj == null || getClass() != obj.getClass()) { return false; + } else if (this == obj) { + return true; + } else { + DateInterval that = (DateInterval) obj; + return startTime.equals(that.startTime) && + endTime.equals(that.endTime); } } @@ -60,4 +63,8 @@ public class DateInterval { this.endTime = endTime; } + @Override + public int hashCode() { + return Objects.hash(startTime, endTime); + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java index 2fbc0e1654..e4741574e4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.common.model; +import java.util.Objects; + public class TaskNodeRelation { /** @@ -69,4 +71,9 @@ public class TaskNodeRelation { ", endNode='" + endNode + '\'' + '}'; } + + @Override + public int hashCode() { + return Objects.hash(startNode, endNode); + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index 5f834a2004..d442c13ebc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -30,7 +30,7 @@ import java.util.*; /** * A singleton of a task queue implemented with zookeeper - * tasks queue implemention + * tasks queue implementation */ @Service public class TaskQueueZkImpl implements ITaskQueue { @@ -72,7 +72,7 @@ public class TaskQueueZkImpl implements ITaskQueue { } catch (Exception e) { logger.error("get all tasks from tasks queue exception",e); } - return new ArrayList<>(); + return Collections.emptyList(); } /** @@ -196,11 +196,11 @@ public class TaskQueueZkImpl implements ITaskQueue { } } - List taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); + List tasksList = getTasksListFromTreeSet(tasksNum, taskTreeSet); - logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size()); + logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(tasksList.toArray()), size - tasksList.size()); - return taskslist; + return tasksList; }else{ Thread.sleep(Constants.SLEEP_TIME_MILLIS); } @@ -208,7 +208,7 @@ public class TaskQueueZkImpl implements ITaskQueue { } catch (Exception e) { logger.error("add task to tasks queue exception",e); } - return new ArrayList(); + return Collections.emptyList(); } @@ -221,15 +221,15 @@ public class TaskQueueZkImpl implements ITaskQueue { public List getTasksListFromTreeSet(int tasksNum, Set taskTreeSet) { Iterator iterator = taskTreeSet.iterator(); int j = 0; - List taskslist = new ArrayList<>(tasksNum); + List tasksList = new ArrayList<>(tasksNum); while(iterator.hasNext()){ if(j++ >= tasksNum){ break; } String task = iterator.next(); - taskslist.add(getOriginTaskFormat(task)); + tasksList.add(getOriginTaskFormat(task)); } - return taskslist; + return tasksList; } /** @@ -330,22 +330,13 @@ public class TaskQueueZkImpl implements ITaskQueue { */ @Override public Set smembers(String key) { - - Set tasksSet = new HashSet<>(); - try { List list = zookeeperOperator.getChildrenKeys(getTasksPath(key)); - - for (String task : list) { - tasksSet.add(task); - } - - return tasksSet; + return new HashSet<>(list); } catch (Exception e) { logger.error("get all tasks from tasks queue exception",e); } - - return tasksSet; + return Collections.emptySet(); } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java new file mode 100755 index 0000000000..95dd505c02 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.datax; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.task.AbstractParameters; + +/** + * DataX parameter + */ +public class DataxParameters extends AbstractParameters { + + /** + * data source type,eg MYSQL, POSTGRES ... + */ + private String dsType; + + /** + * datasource id + */ + private int dataSource; + + /** + * data target type,eg MYSQL, POSTGRES ... + */ + private String dtType; + + /** + * datatarget id + */ + private int dataTarget; + + /** + * sql + */ + private String sql; + + /** + * target table + */ + private String targetTable; + + /** + * Pre Statements + */ + private List preStatements; + + /** + * Post Statements + */ + private List postStatements; + + /** + * speed byte num + */ + private int jobSpeedByte; + + /** + * speed record count + */ + private int jobSpeedRecord; + + public String getDsType() { + return dsType; + } + + public void setDsType(String dsType) { + this.dsType = dsType; + } + + public int getDataSource() { + return dataSource; + } + + public void setDataSource(int dataSource) { + this.dataSource = dataSource; + } + + public String getDtType() { + return dtType; + } + + public void setDtType(String dtType) { + this.dtType = dtType; + } + + public int getDataTarget() { + return dataTarget; + } + + public void setDataTarget(int dataTarget) { + this.dataTarget = dataTarget; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public String getTargetTable() { + return targetTable; + } + + public void setTargetTable(String targetTable) { + this.targetTable = targetTable; + } + + public List getPreStatements() { + return preStatements; + } + + public void setPreStatements(List preStatements) { + this.preStatements = preStatements; + } + + public List getPostStatements() { + return postStatements; + } + + public void setPostStatements(List postStatements) { + this.postStatements = postStatements; + } + + public int getJobSpeedByte() { + return jobSpeedByte; + } + + public void setJobSpeedByte(int jobSpeedByte) { + this.jobSpeedByte = jobSpeedByte; + } + + public int getJobSpeedRecord() { + return jobSpeedRecord; + } + + public void setJobSpeedRecord(int jobSpeedRecord) { + this.jobSpeedRecord = jobSpeedRecord; + } + + @Override + public boolean checkParameters() { + if (!(dataSource != 0 + && dataTarget != 0 + && StringUtils.isNotEmpty(sql) + && StringUtils.isNotEmpty(targetTable))) { + return false; + } + + return true; + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } + + @Override + public String toString() { + return "DataxParameters{" + + "dsType='" + dsType + '\'' + + ", dataSource=" + dataSource + + ", dtType='" + dtType + '\'' + + ", dataTarget=" + dataTarget + + ", sql='" + sql + '\'' + + ", targetTable='" + targetTable + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + + ", jobSpeedByte=" + jobSpeedByte + + ", jobSpeedRecord=" + jobSpeedRecord + + '}'; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index afc3c44825..541281f793 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -119,7 +119,9 @@ public class HadoopUtils implements Closeable { fsRelatedProps.forEach((key, value) -> configuration.set(key, value)); }else{ logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS ); - throw new RuntimeException("property:{} can not to be empty, please set!"); + throw new RuntimeException( + String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS) + ); } }else{ logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS); @@ -219,10 +221,12 @@ public class HadoopUtils implements Closeable { return null; } - FSDataInputStream in = fs.open(new Path(hdfsFilePath)); - BufferedReader br = new BufferedReader(new InputStreamReader(in)); - Stream stream = br.lines().skip(skipLineNums).limit(limit); - return stream.collect(Collectors.toList()); + try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){ + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + Stream stream = br.lines().skip(skipLineNums).limit(limit); + return stream.collect(Collectors.toList()); + } + } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java similarity index 91% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index 8b40d943c0..fc08eb645b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; import org.slf4j.Logger; @@ -44,6 +44,11 @@ public class LoggerUtils { */ public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo"; + /** + * Task Logger Thread's name + */ + public static final String TASK_APPID_LOG_FORMAT = "[taskAppId="; + /** * build job id * @@ -58,7 +63,7 @@ public class LoggerUtils { int processInstId, int taskId){ // - [taskAppId=TASK_79_4084_15210] - return String.format(" - [taskAppId=%s-%s-%s-%s]",affix, + return String.format(" - %s%s-%s-%s-%s]",TASK_APPID_LOG_FORMAT,affix, processDefId, processInstId, taskId); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java index 529052c7cf..bbc937c89f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java @@ -16,13 +16,17 @@ */ package org.apache.dolphinscheduler.common.utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.io.LineNumberReader; import java.io.Reader; -import java.sql.*; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * Slightly modified version of the com.ibatis.common.jdbc.ScriptRunner class @@ -94,9 +98,7 @@ public class ScriptRunner { } finally { connection.setAutoCommit(originalAutoCommit); } - } catch (IOException e) { - throw e; - } catch (SQLException e) { + } catch (IOException | SQLException e) { throw e; } catch (Exception e) { throw new RuntimeException("Error running script. Cause: " + e, e); @@ -114,9 +116,7 @@ public class ScriptRunner { } finally { connection.setAutoCommit(originalAutoCommit); } - } catch (IOException e) { - throw e; - } catch (SQLException e) { + } catch (IOException | SQLException e) { throw e; } catch (Exception e) { throw new RuntimeException("Error running script. Cause: " + e, e); @@ -161,44 +161,34 @@ public class ScriptRunner { || fullLineDelimiter && trimmedLine.equals(getDelimiter())) { command.append(line.substring(0, line.lastIndexOf(getDelimiter()))); command.append(" "); - Statement statement = conn.createStatement(); - boolean hasResults = false; - logger.info("sql:"+command.toString()); - if (stopOnError) { - hasResults = statement.execute(command.toString()); - } else { - try { - statement.execute(command.toString()); - } catch (SQLException e) { - logger.error(e.getMessage(),e); - throw e; - } - } - - ResultSet rs = statement.getResultSet(); - if (hasResults && rs != null) { - ResultSetMetaData md = rs.getMetaData(); - int cols = md.getColumnCount(); - for (int i = 0; i < cols; i++) { - String name = md.getColumnLabel(i); - logger.info(name + "\t"); - } - logger.info(""); - while (rs.next()) { - for (int i = 0; i < cols; i++) { - String value = rs.getString(i); - logger.info(value + "\t"); - } - logger.info(""); - } - } + logger.info("sql: {}", command); + try (Statement statement = conn.createStatement()) { + statement.execute(command.toString()); + try (ResultSet rs = statement.getResultSet()) { + if (stopOnError && rs != null) { + ResultSetMetaData md = rs.getMetaData(); + int cols = md.getColumnCount(); + for (int i = 0; i < cols; i++) { + String name = md.getColumnLabel(i); + logger.info("{} \t", name); + } + logger.info(""); + while (rs.next()) { + for (int i = 0; i < cols; i++) { + String value = rs.getString(i); + logger.info("{} \t", value); + } + logger.info(""); + } + } + } + } catch (SQLException e) { + logger.error("SQLException", e); + throw e; + } + command = null; - try { - statement.close(); - } catch (Exception e) { - // Ignore to workaround a bug in Jakarta DBCP - } Thread.yield(); } else { command.append(line); @@ -207,11 +197,11 @@ public class ScriptRunner { } } catch (SQLException e) { - logger.error("Error executing: " + command.toString()); + logger.error("Error executing: {}", command); throw e; } catch (IOException e) { e.fillInStackTrace(); - logger.error("Error executing: " + command.toString()); + logger.error("Error executing: {}", command); throw e; } } @@ -243,46 +233,35 @@ public class ScriptRunner { || fullLineDelimiter && trimmedLine.equals(getDelimiter())) { command.append(line.substring(0, line.lastIndexOf(getDelimiter()))); command.append(" "); - Statement statement = conn.createStatement(); - sql = command.toString().replaceAll("\\{\\{APPDB\\}\\}", dbName); - boolean hasResults = false; - logger.info("sql : " + sql); - if (stopOnError) { - hasResults = statement.execute(sql); - } else { - try { - statement.execute(sql); - } catch (SQLException e) { - logger.error(e.getMessage(),e); - throw e; - } - } - - ResultSet rs = statement.getResultSet(); - if (hasResults && rs != null) { - ResultSetMetaData md = rs.getMetaData(); - int cols = md.getColumnCount(); - for (int i = 0; i < cols; i++) { - String name = md.getColumnLabel(i); - logger.info(name + "\t"); - } - logger.info(""); - while (rs.next()) { - for (int i = 0; i < cols; i++) { - String value = rs.getString(i); - logger.info(value + "\t"); - } - logger.info(""); - } - } + logger.info("sql : {}", sql); + + try (Statement statement = conn.createStatement()) { + statement.execute(sql); + try (ResultSet rs = statement.getResultSet()) { + if (stopOnError && rs != null) { + ResultSetMetaData md = rs.getMetaData(); + int cols = md.getColumnCount(); + for (int i = 0; i < cols; i++) { + String name = md.getColumnLabel(i); + logger.info("{} \t", name); + } + logger.info(""); + while (rs.next()) { + for (int i = 0; i < cols; i++) { + String value = rs.getString(i); + logger.info("{} \t", value); + } + logger.info(""); + } + } + } + } catch (SQLException e) { + logger.error("SQLException", e); + throw e; + } command = null; - try { - statement.close(); - } catch (Exception e) { - // Ignore to workaround a bug in Jakarta DBCP - } Thread.yield(); } else { command.append(line); @@ -291,11 +270,10 @@ public class ScriptRunner { } } catch (SQLException e) { - logger.error("Error executing: " + sql); throw e; } catch (IOException e) { e.fillInStackTrace(); - logger.error("Error executing: " + sql); + logger.error("Error executing: {}", sql); throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtils.java similarity index 93% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtils.java index 948e92cb24..eab6c4f124 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtil.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtils.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.common.utils; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.Constants; /** * sensitive log Util */ -public class SensitiveLogUtil { +public class SensitiveLogUtils { /** * @param dataSourcePwd data source password diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index 28e2593359..a2ae6a68e3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.http.HttpParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; @@ -68,6 +69,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, FlinkParameters.class); case HTTP: return JSONUtils.parseObject(parameter, HttpParameters.class); + case DATAX: + return JSONUtils.parseObject(parameter, DataxParameters.class); default: return null; } diff --git a/dolphinscheduler-common/src/main/resources/logback.xml b/dolphinscheduler-common/src/main/resources/logback.xml new file mode 100644 index 0000000000..7f634da975 --- /dev/null +++ b/dolphinscheduler-common/src/main/resources/logback.xml @@ -0,0 +1,169 @@ + + + + + + + + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + + + + + + ${log.base}/dolphinscheduler-master.log + + + ${log.base}/dolphinscheduler-master.%d{yyyy-MM-dd_HH}.%i.log + 168 + 200MB + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + + + + + + + + + INFO + + + + taskAppId + ${log.base} + + + + ${log.base}/${taskAppId}.log + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n + + UTF-8 + + true + + + + + ${log.base}/dolphinscheduler-worker.log + + INFO + + + + ${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log + 168 + 200MB + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n + + UTF-8 + + + + + + + + ${log.base}/dolphinscheduler-alert.log + + ${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log + 20 + 64MB + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + + + + + + + ${log.base}/dolphinscheduler-api-server.log + + INFO + + + ${log.base}/dolphinscheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log + 168 + 64MB + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/resources/quartz.properties b/dolphinscheduler-common/src/main/resources/quartz.properties index ceec9e5d64..2e3a2a0dc1 100644 --- a/dolphinscheduler-common/src/main/resources/quartz.properties +++ b/dolphinscheduler-common/src/main/resources/quartz.properties @@ -53,11 +53,12 @@ org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.jobStore.misfireThreshold = 60000 org.quartz.jobStore.clusterCheckinInterval = 5000 +org.quartz.jobStore.acquireTriggersWithinLock=true org.quartz.jobStore.dataSource = myDs #============================================================================ # Configure Datasources #============================================================================ -org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.server.quartz.DruidConnectionProvider +org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.dao.quartz.DruidConnectionProvider org.quartz.dataSource.myDs.maxConnections = 10 org.quartz.dataSource.myDs.validationQuery = select 1 \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/MasterLogFilterTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/MasterLogFilterTest.java new file mode 100644 index 0000000000..8cf6cfc2df --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/MasterLogFilterTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import ch.qos.logback.core.spi.FilterReply; +import org.apache.dolphinscheduler.common.Constants; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Marker; +import java.util.Map; + +public class MasterLogFilterTest { + + @Test + public void decide() { + MasterLogFilter masterLogFilter = new MasterLogFilter(); + + + FilterReply filterReply = masterLogFilter.decide(new ILoggingEvent() { + @Override + public String getThreadName() { + return Constants.THREAD_NAME_MASTER_SERVER; + } + + @Override + public Level getLevel() { + return Level.INFO; + } + + @Override + public String getMessage() { + return "master insert into queue success, task : shell2"; +// return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed"; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return "master insert into queue success, task : shell2"; + } + + @Override + public String getLoggerName() { + return null; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + + Assert.assertEquals(FilterReply.ACCEPT, filterReply); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverterTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverterTest.java new file mode 100644 index 0000000000..727ab41002 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/SensitiveDataConverterTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.SensitiveLogUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SensitiveDataConverterTest { + + private final Logger logger = LoggerFactory.getLogger(SensitiveDataConverterTest.class); + + /** + * password pattern + */ + private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX); + + private final String logMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + + "\"database\":\"carbond\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + + "\"user\":\"view\"," + + "\"password\":\"view1\"}"; + + private final String maskLogMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + + "\"database\":\"carbond\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + + "\"user\":\"view\"," + + "\"password\":\"******\"}"; + @Test + public void convert() { + SensitiveDataConverter sensitiveDataConverter = new SensitiveDataConverter(); + String result = sensitiveDataConverter.convert(new ILoggingEvent() { + @Override + public String getThreadName() { + return null; + } + + @Override + public Level getLevel() { + return Level.INFO; + } + + @Override + public String getMessage() { + return null; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return logMsg; + } + + @Override + public String getLoggerName() { + return null; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + + Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg)); + + } + + /** + * mask sensitive logMsg - sql task datasource password + */ + @Test + public void testPwdLogMsgConverter() { + logger.info("parameter : {}", logMsg); + logger.info("parameter : {}", passwordHandler(pwdPattern, logMsg)); + + Assert.assertNotEquals(logMsg, passwordHandler(pwdPattern, logMsg)); + Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg)); + + } + + /** + * password regex test + * + * @param logMsg original log + */ + private static String passwordHandler(Pattern pattern, String logMsg) { + + Matcher matcher = pattern.matcher(logMsg); + + StringBuffer sb = new StringBuffer(logMsg.length()); + + while (matcher.find()) { + + String password = matcher.group(); + + String maskPassword = SensitiveLogUtils.maskDataSourcePwd(password); + + matcher.appendReplacement(sb, maskPassword); + } + matcher.appendTail(sb); + + return sb.toString(); + } + + + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminatorTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminatorTest.java new file mode 100644 index 0000000000..8745a4f6b4 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogDiscriminatorTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Marker; + +import java.util.Map; + +import static org.junit.Assert.*; + +public class TaskLogDiscriminatorTest { + + /** + * log base + */ + private String logBase = "logs"; + + TaskLogDiscriminator taskLogDiscriminator; + + @Before + public void before(){ + taskLogDiscriminator = new TaskLogDiscriminator(); + taskLogDiscriminator.setLogBase("logs"); + taskLogDiscriminator.setKey("123"); + } + + @Test + public void getDiscriminatingValue() { + String result = taskLogDiscriminator.getDiscriminatingValue(new ILoggingEvent() { + @Override + public String getThreadName() { + return null; + } + + @Override + public Level getLevel() { + return null; + } + + @Override + public String getMessage() { + return null; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return null; + } + + @Override + public String getLoggerName() { + return "[taskAppId=TASK-1-1-1"; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + Assert.assertEquals("1/1/", result); + } + + @Test + public void start() { + taskLogDiscriminator.start(); + Assert.assertEquals(true, taskLogDiscriminator.isStarted()); + } + + @Test + public void getKey() { + Assert.assertEquals("123", taskLogDiscriminator.getKey()); + } + + @Test + public void setKey() { + + taskLogDiscriminator.setKey("123"); + } + + @Test + public void getLogBase() { + Assert.assertEquals("logs", taskLogDiscriminator.getLogBase()); + } + + @Test + public void setLogBase() { + taskLogDiscriminator.setLogBase("logs"); + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogFilterTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogFilterTest.java new file mode 100644 index 0000000000..52767074da --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/TaskLogFilterTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import ch.qos.logback.core.spi.FilterReply; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Marker; + +import java.util.Map; + + +public class TaskLogFilterTest { + + @Test + public void decide() { + TaskLogFilter taskLogFilter = new TaskLogFilter(); + + + FilterReply filterReply = taskLogFilter.decide(new ILoggingEvent() { + @Override + public String getThreadName() { + return LoggerUtils.TASK_LOGGER_THREAD_NAME; + } + + @Override + public Level getLevel() { + return Level.INFO; + } + + @Override + public String getMessage() { + return "raw script : echo 222"; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return "raw script : echo 222"; + } + + @Override + public String getLoggerName() { + return null; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + + Assert.assertEquals(FilterReply.ACCEPT, filterReply); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/WorkerLogFilterTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/WorkerLogFilterTest.java new file mode 100644 index 0000000000..90b154407f --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/WorkerLogFilterTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.log; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.LoggerContextVO; +import ch.qos.logback.core.spi.FilterReply; +import org.apache.dolphinscheduler.common.Constants; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Marker; + +import java.util.Map; + + +public class WorkerLogFilterTest { + + @Test + public void decide() { + WorkerLogFilter workerLogFilter = new WorkerLogFilter(); + + + FilterReply filterReply = workerLogFilter.decide(new ILoggingEvent() { + @Override + public String getThreadName() { + return Constants.THREAD_NAME_WORKER_SERVER; + } + + @Override + public Level getLevel() { + return Level.INFO; + } + + @Override + public String getMessage() { + return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed"; + } + + @Override + public Object[] getArgumentArray() { + return new Object[0]; + } + + @Override + public String getFormattedMessage() { + return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed"; + } + + @Override + public String getLoggerName() { + return null; + } + + @Override + public LoggerContextVO getLoggerContextVO() { + return null; + } + + @Override + public IThrowableProxy getThrowableProxy() { + return null; + } + + @Override + public StackTraceElement[] getCallerData() { + return new StackTraceElement[0]; + } + + @Override + public boolean hasCallerData() { + return false; + } + + @Override + public Marker getMarker() { + return null; + } + + @Override + public Map getMDCPropertyMap() { + return null; + } + + @Override + public Map getMdc() { + return null; + } + + @Override + public long getTimeStamp() { + return 0; + } + + @Override + public void prepareForDeferredProcessing() { + + } + }); + + Assert.assertEquals(FilterReply.ACCEPT, filterReply); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java new file mode 100644 index 0000000000..5a80e388ba --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class LoggerUtilsTest { + private Logger logger = LoggerFactory.getLogger(LoggerUtilsTest.class); + + @Test + public void buildTaskId() { + + String taskId = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,79,4084,15210); + + Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId); + } + + @Test + public void getAppIds() { + List appIdList = LoggerUtils.getAppIds("Running job: application_1_1",logger); + Assert.assertEquals("application_1_1", appIdList.get(0)); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtilsTest.java similarity index 83% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java rename to dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtilsTest.java index 2e5bfcf3e5..03880b69cc 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SensitiveLogUtilTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SensitiveLogUtilsTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; @@ -22,7 +22,7 @@ import org.junit.Assert; import org.junit.Test; -public class SensitiveLogUtilTest { +public class SensitiveLogUtilsTest { @Test public void testMaskDataSourcePwd() { @@ -30,8 +30,8 @@ public class SensitiveLogUtilTest { String password = "123456"; String emptyPassword = ""; - Assert.assertEquals(Constants.PASSWORD_DEFAULT, SensitiveLogUtil.maskDataSourcePwd(password)); - Assert.assertEquals("", SensitiveLogUtil.maskDataSourcePwd(emptyPassword)); + Assert.assertEquals(Constants.PASSWORD_DEFAULT, SensitiveLogUtils.maskDataSourcePwd(password)); + Assert.assertEquals("", SensitiveLogUtils.maskDataSourcePwd(emptyPassword)); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/DruidConnectionProvider.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/DruidConnectionProvider.java similarity index 99% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/DruidConnectionProvider.java rename to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/DruidConnectionProvider.java index 05100ac374..8a4ceba927 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/DruidConnectionProvider.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/DruidConnectionProvider.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.quartz; +package org.apache.dolphinscheduler.dao.quartz; import com.alibaba.druid.pool.DruidDataSource; import org.quartz.SchedulerException; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/ProcessScheduleJob.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/ProcessScheduleJob.java similarity index 98% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/ProcessScheduleJob.java rename to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/ProcessScheduleJob.java index 9a19233cba..ac461296a9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/ProcessScheduleJob.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.quartz; +package org.apache.dolphinscheduler.dao.quartz; import org.apache.dolphinscheduler.common.Constants; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/QuartzExecutors.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/QuartzExecutors.java similarity index 99% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/QuartzExecutors.java rename to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/QuartzExecutors.java index d4ad08faec..054d7903fc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/quartz/QuartzExecutors.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/QuartzExecutors.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.quartz; +package org.apache.dolphinscheduler.dao.quartz; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.JSONUtils; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 26d0f1e8e2..ac38ddd2e8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -96,7 +96,13 @@ public class DagHelper { for (String startNodeName : startNodeList) { TaskNode startNode = findNodeByName(taskNodeList, startNodeName); List childNodeList = new ArrayList<>(); - if (TaskDependType.TASK_POST == taskDependType) { + if (startNode == null) { + logger.error("start node name [{}] is not in task node list [{}] ", + startNodeName, + taskNodeList + ); + continue; + } else if (TaskDependType.TASK_POST == taskDependType) { childNodeList = getFlowNodeListPost(startNode, taskNodeList); } else if (TaskDependType.TASK_PRE == taskDependType) { childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList); @@ -129,7 +135,6 @@ public class DagHelper { if (null != depList && null != startNode && depList.contains(startNode.getName())) { resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList)); } - } resultList.add(startNode); return resultList; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java index 8649462110..8a9087a33c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java @@ -156,6 +156,23 @@ public class CronUtils { return dateList; } + /** + * gets all scheduled times for a period of time based on self dependency + * @param startTime startTime + * @param endTime endTime + * @param cron cron + * @return date list + */ + public static List getSelfFireDateList(Date startTime, Date endTime, String cron) { + CronExpression cronExpression = null; + try { + cronExpression = parse2CronExpression(cron); + }catch (ParseException e){ + logger.error(e.getMessage(), e); + return Collections.EMPTY_LIST; + } + return getSelfFireDateList(startTime, endTime, cronExpression); + } /** * get expiration time diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java index c375143d7b..1135cf20f5 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.cron; import org.apache.dolphinscheduler.common.enums.CycleEnum; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.utils.cron.CronUtils; import com.cronutils.builder.CronBuilder; import com.cronutils.model.Cron; @@ -31,10 +32,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.ParseException; +import java.util.Date; import static com.cronutils.model.field.expression.FieldExpressionFactory.*; /** + * CronUtilsTest */ public class CronUtilsTest { @@ -55,8 +58,9 @@ public class CronUtilsTest { .withSecond(on(0)) .instance(); // Obtain the string expression - String cronAsString = cron.asString(); // 0 */5 * * * ? * Every five minutes(once every 5 minutes) + String cronAsString = cron.asString(); + // 0 */5 * * * ? * Every five minutes(once every 5 minutes) Assert.assertEquals(cronAsString, "0 */5 * * * ? *"); } @@ -68,9 +72,6 @@ public class CronUtilsTest { @Test public void testCronParse() throws ParseException { String strCrontab = "0 1 2 3 * ? *"; - strCrontab = "0/50 0/59 * * * ? *"; - strCrontab = "3/5 * 0/5 * * ? *"; - strCrontab = "1/5 3/5 1/5 3/30 * ? *"; Cron depCron = CronUtils.parse2Cron(strCrontab); Assert.assertEquals(depCron.retrieve(CronFieldName.SECOND).getExpression().asString(), "0"); @@ -87,12 +88,14 @@ public class CronUtilsTest { */ @Test public void testScheduleType() throws ParseException { - - CycleEnum cycleEnum = CronUtils.getMaxCycle("0 */1 * * * ? *"); + CycleEnum cycleEnum = CronUtils.getMaxCycle(CronUtils.parse2Cron("0 */1 * * * ? *")); Assert.assertEquals(cycleEnum.name(), "MINUTE"); CycleEnum cycleEnum2 = CronUtils.getMaxCycle("0 * * * * ? *"); Assert.assertEquals(cycleEnum2.name(), "MINUTE"); + + CycleEnum cycleEnum3 = CronUtils.getMiniCycle(CronUtils.parse2Cron("0 * * * * ? *")); + Assert.assertEquals(cycleEnum3.name(), "MINUTE"); } /** @@ -109,26 +112,9 @@ public class CronUtilsTest { .withMinute(every(5)) .withSecond(on(0)) .instance(); - - String cronAsString = cron1.asString(); // 0 */5 * * * ? * once every 5 minutes - //logger.info(cronAsString); - // Obtain the string expression - //String minCrontab = "0 0 * * * ? *"; - //String minCrontab = "0 0 10,14,16 * * ?"; - //String minCrontab = "0 0-5 14 * * ? *"; - //String minCrontab = "0 0 2 ? * SUN *"; - //String minCrontab = "* 0,3 2 SUN * 1#1 *"; - //String minCrontab = "* 0,3 * 1W * ? *"; - //cron = CronUtils.parse2Cron("0 * * * * ? *"); - // month cycle - /*String[] cronArayy = new String[]{"* 0,3 * 1W * ? *","* 0 0 1W * ? *", - "0 0 0 L 3/5 ? *","0 0 0 ? 3/5 2/2 *"};*/ // minute cycle String[] cronArayy = new String[]{"* * * * * ? *","* 0 * * * ? *", "* 5 * * 3/5 ? *","0 0 * * * ? *"}; - // week cycle - /*String[] cronArayy = new String[]{"* * * ? * 2/1 *","0 *//*5 * ? * 2/1 *", - "* * *//*5 ? * 2/1 *"};*/ for(String minCrontab:cronArayy){ if (!org.quartz.CronExpression.isValidExpression(minCrontab)) { throw new RuntimeException(minCrontab+" verify failure, cron expression not valid"); @@ -171,7 +157,6 @@ public class CronUtilsTest { logger.info("dayOfWeekField instanceof And:"+(dayOfWeekField.getExpression() instanceof And)); logger.info("dayOfWeekField instanceof QuestionMark:"+(dayOfWeekField.getExpression() instanceof QuestionMark)); - CycleEnum cycleEnum = CronUtils.getMaxCycle(minCrontab); if(cycleEnum !=null){ logger.info(cycleEnum.name()); @@ -180,4 +165,34 @@ public class CronUtilsTest { } } } -} + + @Test + public void getSelfFireDateList() throws ParseException{ + Date from = DateUtils.stringToDate("2020-01-01 00:00:00"); + Date to = DateUtils.stringToDate("2020-01-31 00:00:00"); + // test date + Assert.assertEquals(0, CronUtils.getSelfFireDateList(to, from, "0 0 0 * * ? ").size()); + // test error cron + Assert.assertEquals(0, CronUtils.getSelfFireDateList(from, to, "0 0 0 * *").size()); + // test cron + Assert.assertEquals(29, CronUtils.getSelfFireDateList(from, to, "0 0 0 * * ? ").size()); + // test other + Assert.assertEquals(30, CronUtils.getFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? ")).size()); + Assert.assertEquals(5, CronUtils.getSelfFireDateList(from, to, CronUtils.parse2CronExpression("0 0 0 * * ? "), 5).size()); + } + + @Test + public void getExpirationTime(){ + Date startTime = DateUtils.stringToDate("2020-02-07 18:30:00"); + Date expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.HOUR); + Assert.assertEquals("2020-02-07 19:30:00", DateUtils.dateToString(expirationTime)); + expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.DAY); + Assert.assertEquals("2020-02-07 23:59:59", DateUtils.dateToString(expirationTime)); + expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.WEEK); + Assert.assertEquals("2020-02-07 23:59:59", DateUtils.dateToString(expirationTime)); + expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.MONTH); + Assert.assertEquals("2020-02-07 23:59:59", DateUtils.dateToString(expirationTime)); + expirationTime = CronUtils.getExpirationTime(startTime, CycleEnum.YEAR); + Assert.assertEquals("2020-02-07 18:30:00", DateUtils.dateToString(expirationTime)); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 65c5607af7..0647b9450b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; -import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob; -import org.apache.dolphinscheduler.server.quartz.QuartzExecutors; +import org.apache.dolphinscheduler.dao.quartz.ProcessScheduleJob; +import org.apache.dolphinscheduler.dao.quartz.QuartzExecutors; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.quartz.SchedulerException; import org.slf4j.Logger; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 84b1114b84..2b1ff4d23f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -33,9 +33,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.DagHelper; +import org.apache.dolphinscheduler.dao.utils.cron.CronUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; -import org.apache.dolphinscheduler.server.utils.ScheduleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -187,7 +187,7 @@ public class MasterExecThread implements Runnable { /** * execute process - * @throws Exception excpetion + * @throws Exception exception */ private void executeProcess() throws Exception { prepareProcess(); @@ -197,7 +197,7 @@ public class MasterExecThread implements Runnable { /** * execute complement process - * @throws Exception excpetion + * @throws Exception exception */ private void executeComplementProcess() throws Exception { @@ -213,8 +213,7 @@ public class MasterExecThread implements Runnable { List listDate = Lists.newLinkedList(); if(!CollectionUtils.isEmpty(schedules)){ for (Schedule schedule : schedules) { - List list = ScheduleUtils.getRecentTriggerTime(schedule.getCrontab(), startDate, endDate); - listDate.addAll(list); + listDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedule.getCrontab())); } } // get first fire date @@ -247,7 +246,7 @@ public class MasterExecThread implements Runnable { // execute process ,waiting for end runProcess(); - // process instace failure ,no more complements + // process instance failure ,no more complements if(!processInstance.getState().typeIsSuccess()){ logger.info("process {} state {}, complement not completely!", processInstance.getId(), processInstance.getState()); @@ -304,7 +303,7 @@ public class MasterExecThread implements Runnable { /** * prepare process parameter - * @throws Exception excpetion + * @throws Exception exception */ private void prepareProcess() throws Exception { // init task queue @@ -332,7 +331,7 @@ public class MasterExecThread implements Runnable { /** * generate process dag - * @throws Exception excpetion + * @throws Exception exception */ private void buildFlowDag() throws Exception { recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); @@ -609,7 +608,7 @@ public class MasterExecThread implements Runnable { /** * query task instance by complete state * @param state state - * @return task isntance list + * @return task instance list */ private List getCompleteTaskByState(ExecutionStatus state){ List resultList = new ArrayList<>(); @@ -804,7 +803,7 @@ public class MasterExecThread implements Runnable { } /** - * add task to standy list + * add task to standby list * @param taskInstance task instance */ private void addTaskToStandByList(TaskInstance taskInstance){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 1861e8a908..f2ee66b64a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -122,7 +122,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { break; } if(checkTimeout){ - long remainTime = getRemaintime(taskTimeoutParameter.getInterval()*60); + long remainTime = getRemaintime(taskTimeoutParameter.getInterval() * 60L); if (remainTime < 0) { logger.warn("task id: {} execution time out",taskInstance.getId()); // process define diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java new file mode 100755 index 0000000000..930098919b --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + + +import org.apache.dolphinscheduler.common.enums.DbType; + +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; +import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; +import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; +import com.alibaba.druid.sql.parser.SQLStatementParser; + + +public class DataxUtils { + + public static final String DATAX_READER_PLUGIN_MYSQL = "mysqlreader"; + + public static final String DATAX_READER_PLUGIN_POSTGRESQL = "postgresqlreader"; + + public static final String DATAX_READER_PLUGIN_ORACLE = "oraclereader"; + + public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader"; + + public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter"; + + public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter"; + + public static final String DATAX_WRITER_PLUGIN_ORACLE = "oraclewriter"; + + public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter"; + + public static String getReaderPluginName(DbType dbType) { + switch (dbType) { + case MYSQL: + return DATAX_READER_PLUGIN_MYSQL; + case POSTGRESQL: + return DATAX_READER_PLUGIN_POSTGRESQL; + case ORACLE: + return DATAX_READER_PLUGIN_ORACLE; + case SQLSERVER: + return DATAX_READER_PLUGIN_SQLSERVER; + default: + return null; + } + } + + public static String getWriterPluginName(DbType dbType) { + switch (dbType) { + case MYSQL: + return DATAX_WRITER_PLUGIN_MYSQL; + case POSTGRESQL: + return DATAX_WRITER_PLUGIN_POSTGRESQL; + case ORACLE: + return DATAX_WRITER_PLUGIN_ORACLE; + case SQLSERVER: + return DATAX_WRITER_PLUGIN_SQLSERVER; + default: + return null; + } + } + + public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) { + switch (dbType) { + case MYSQL: + return new MySqlStatementParser(sql); + case POSTGRESQL: + return new PGSQLStatementParser(sql); + case ORACLE: + return new OracleStatementParser(sql); + case SQLSERVER: + return new SQLServerStatementParser(sql); + default: + return null; + } + } + + public static String[] convertKeywordsColumns(DbType dbType, String[] columns) { + if (columns == null) { + return null; + } + + String[] toColumns = new String[columns.length]; + for (int i = 0; i < columns.length; i++ ) { + toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]); + } + + return toColumns; + } + + public static String doConvertKeywordsColumn(DbType dbType, String column) { + if (column == null) { + return column; + } + + column = column.trim(); + column = column.replace("`", ""); + column = column.replace("\"", ""); + column = column.replace("'", ""); + + switch (dbType) { + case MYSQL: + return String.format("`%s`", column); + case POSTGRESQL: + return String.format("\"%s\"", column); + case ORACLE: + return String.format("\"%s\"", column); + case SQLSERVER: + return String.format("`%s`", column); + default: + return column; + } + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 0b621a9bb0..fd0a08cd8e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java deleted file mode 100644 index 11730b9545..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.utils; - -import org.quartz.impl.triggers.CronTriggerImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.text.ParseException; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; - -/** - * ScheduleUtils - */ -public class ScheduleUtils { - - private static final Logger logger = LoggerFactory.getLogger(ScheduleUtils.class); - - /** - * Get the execution time of the time interval - * @param cron - * @param from - * @param to - * @return - */ - public static List getRecentTriggerTime(String cron, Date from, Date to) { - return getRecentTriggerTime(cron, Integer.MAX_VALUE, from, to); - } - - /** - * Get the execution time of the time interval - * @param cron - * @param size - * @param from - * @param to - * @return - */ - public static List getRecentTriggerTime(String cron, int size, Date from, Date to) { - List list = new LinkedList(); - if(to.before(from)){ - logger.error("schedule date from:{} must before date to:{}!", from, to); - return list; - } - try { - CronTriggerImpl trigger = new CronTriggerImpl(); - trigger.setCronExpression(cron); - trigger.setStartTime(from); - trigger.setEndTime(to); - trigger.computeFirstFireTime(null); - for (int i = 0; i < size; i++) { - Date schedule = trigger.getNextFireTime(); - if(null == schedule){ - break; - } - list.add(schedule); - trigger.triggered(null); - } - } catch (ParseException e) { - logger.error("cron:{} error:{}", cron, e.getMessage()); - } - return java.util.Collections.unmodifiableList(list); - } -} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 5f66c3477d..f179d6344a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -35,8 +35,8 @@ import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.permission.PermissionCheck; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; -import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 11934dea4c..04098215dd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -23,17 +23,14 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.slf4j.Logger; import java.io.*; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -150,9 +147,6 @@ public abstract class AbstractCommandExecutor { // get process id int pid = getProcessId(process); - // task instance id - int taskInstId = Integer.parseInt(taskAppId.split("_")[2]); - processDao.updatePidByTaskInstId(taskInstId, pid, ""); logger.info("process start, process id is: {}", pid); @@ -207,7 +201,14 @@ public abstract class AbstractCommandExecutor { // merge error information to standard output stream processBuilder.redirectErrorStream(true); // setting up user to run commands - processBuilder.command("sudo", "-u", tenantCode, commandInterpreter(), commandFile); + List command = new LinkedList<>(); + command.add("sudo"); + command.add("-u"); + command.add(tenantCode); + command.add(commandInterpreter()); + command.addAll(commandOptions()); + command.add(commandFile); + processBuilder.command(command); process = processBuilder.start(); @@ -559,7 +560,9 @@ public abstract class AbstractCommandExecutor { } } - + protected List commandOptions() { + return Collections.emptyList(); + } protected abstract String buildCommandFilePath(); protected abstract String commandInterpreter(); protected abstract boolean checkFindApp(String line); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 999863f76e..f2772d0747 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; @@ -194,6 +195,9 @@ public abstract class AbstractTask { case PYTHON: paramsClass = PythonParameters.class; break; + case DATAX: + paramsClass = DataxParameters.class; + break; default: logger.error("not support this task type: {}", taskType); throw new IllegalArgumentException("not support this task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index c943e5d9d7..a673134488 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -26,6 +26,7 @@ import java.io.*; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.function.Consumer; @@ -108,6 +109,16 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { } } + /** + * get command options + * @return command options list + */ + @Override + protected List commandOptions() { + // unbuffered binary stdout and stderr + return Collections.singletonList("-u"); + } + /** * get python home * @return python home diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index 310d9cad9a..67deb7a3fa 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask; +import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.http.HttpTask; import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask; @@ -65,6 +66,8 @@ public class TaskManager { return new DependentTask(props, logger); case HTTP: return new HttpTask(props, logger); + case DATAX: + return new DataxTask(props, logger); default: logger.error("unsupport task type: {}", taskType); throw new IllegalArgumentException("not support task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java new file mode 100755 index 0000000000..0de2bbc7c6 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.datax; + + +import java.io.File; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.job.db.BaseDataSource; +import org.apache.dolphinscheduler.common.job.db.DataSourceFactory; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.utils.DataxUtils; +import org.apache.dolphinscheduler.server.utils.ParamUtils; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.slf4j.Logger; + +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.alibaba.fastjson.JSONObject; + + +/** + * DataX task + */ +public class DataxTask extends AbstractTask { + + /** + * python process(datax only supports version 2.7 by default) + */ + private static final String DATAX_PYTHON = "python2.7"; + + /** + * datax home path + */ + private static final String DATAX_HOME_EVN = "${DATAX_HOME}"; + + /** + * datax channel count + */ + private static final int DATAX_CHANNEL_COUNT = 1; + + /** + * datax parameters + */ + private DataxParameters dataXParameters; + + /** + * task dir + */ + private String taskDir; + + /** + * shell command executor + */ + private ShellCommandExecutor shellCommandExecutor; + + /** + * process database access + */ + private ProcessDao processDao; + + /** + * constructor + * + * @param props + * props + * @param logger + * logger + */ + public DataxTask(TaskProps props, Logger logger) { + super(props, logger); + + this.taskDir = props.getTaskDir(); + logger.info("task dir : {}", taskDir); + + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(), + props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), + props.getTaskTimeout(), logger); + + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); + } + + /** + * init DataX config + */ + @Override + public void init() { + logger.info("datax task params {}", taskProps.getTaskParams()); + dataXParameters = JSONUtils.parseObject(taskProps.getTaskParams(), DataxParameters.class); + + if (!dataXParameters.checkParameters()) { + throw new RuntimeException("datax task params is not valid"); + } + } + + /** + * run DataX process + * + * @throws Exception + */ + @Override + public void handle() + throws Exception { + try { + // set the name of the current thread + String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskProps.getTaskAppId()); + Thread.currentThread().setName(threadLoggerInfoName); + + // run datax process + String jsonFilePath = buildDataxJsonFile(); + String shellCommandFilePath = buildShellCommandFile(jsonFilePath); + exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processDao); + } + catch (Exception e) { + exitStatusCode = -1; + throw e; + } + } + + /** + * cancel DataX process + * + * @param cancelApplication + * @throws Exception + */ + @Override + public void cancelApplication(boolean cancelApplication) + throws Exception { + // cancel process + shellCommandExecutor.cancelApplication(); + } + + /** + * build datax configuration file + * + * @return + * @throws Exception + */ + private String buildDataxJsonFile() + throws Exception { + // generate json + String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId()); + + Path path = new File(fileName).toPath(); + if (Files.exists(path)) { + return fileName; + } + + JSONObject job = new JSONObject(); + job.put("content", buildDataxJobContentJson()); + job.put("setting", buildDataxJobSettingJson()); + + JSONObject root = new JSONObject(); + root.put("job", job); + root.put("core", buildDataxCoreJson()); + + logger.debug("datax job json : {}", root.toString()); + + // create datax json file + FileUtils.writeStringToFile(new File(fileName), root.toString(), Charset.forName("UTF-8")); + return fileName; + } + + /** + * build datax job config + * + * @return + * @throws SQLException + */ + private List buildDataxJobContentJson() + throws SQLException { + DataSource dataSource = processDao.findDataSourceById(dataXParameters.getDataSource()); + BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(), + dataSource.getConnectionParams()); + + DataSource dataTarget = processDao.findDataSourceById(dataXParameters.getDataTarget()); + BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(), + dataTarget.getConnectionParams()); + + List readerConnArr = new ArrayList<>(); + JSONObject readerConn = new JSONObject(); + readerConn.put("querySql", new String[] {dataXParameters.getSql()}); + readerConn.put("jdbcUrl", new String[] {dataSourceCfg.getJdbcUrl()}); + readerConnArr.add(readerConn); + + JSONObject readerParam = new JSONObject(); + readerParam.put("username", dataSourceCfg.getUser()); + readerParam.put("password", dataSourceCfg.getPassword()); + readerParam.put("connection", readerConnArr); + + JSONObject reader = new JSONObject(); + reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType())); + reader.put("parameter", readerParam); + + List writerConnArr = new ArrayList<>(); + JSONObject writerConn = new JSONObject(); + writerConn.put("table", new String[] {dataXParameters.getTargetTable()}); + writerConn.put("jdbcUrl", dataTargetCfg.getJdbcUrl()); + writerConnArr.add(writerConn); + + JSONObject writerParam = new JSONObject(); + writerParam.put("username", dataTargetCfg.getUser()); + writerParam.put("password", dataTargetCfg.getPassword()); + writerParam.put("column", + parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql())); + writerParam.put("connection", writerConnArr); + + if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) { + writerParam.put("preSql", dataXParameters.getPreStatements()); + } + + if (CollectionUtils.isNotEmpty(dataXParameters.getPostStatements())) { + writerParam.put("postSql", dataXParameters.getPostStatements()); + } + + JSONObject writer = new JSONObject(); + writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType())); + writer.put("parameter", writerParam); + + List contentList = new ArrayList<>(); + JSONObject content = new JSONObject(); + content.put("reader", reader); + content.put("writer", writer); + contentList.add(content); + + return contentList; + } + + /** + * build datax setting config + * + * @return + */ + private JSONObject buildDataxJobSettingJson() { + JSONObject speed = new JSONObject(); + speed.put("channel", DATAX_CHANNEL_COUNT); + + if (dataXParameters.getJobSpeedByte() > 0) { + speed.put("byte", dataXParameters.getJobSpeedByte()); + } + + if (dataXParameters.getJobSpeedRecord() > 0) { + speed.put("record", dataXParameters.getJobSpeedRecord()); + } + + JSONObject errorLimit = new JSONObject(); + errorLimit.put("record", 0); + errorLimit.put("percentage", 0); + + JSONObject setting = new JSONObject(); + setting.put("speed", speed); + setting.put("errorLimit", errorLimit); + + return setting; + } + + private JSONObject buildDataxCoreJson() { + JSONObject speed = new JSONObject(); + speed.put("channel", DATAX_CHANNEL_COUNT); + + if (dataXParameters.getJobSpeedByte() > 0) { + speed.put("byte", dataXParameters.getJobSpeedByte()); + } + + if (dataXParameters.getJobSpeedRecord() > 0) { + speed.put("record", dataXParameters.getJobSpeedRecord()); + } + + JSONObject channel = new JSONObject(); + channel.put("speed", speed); + + JSONObject transport = new JSONObject(); + transport.put("channel", channel); + + JSONObject core = new JSONObject(); + core.put("transport", transport); + + return core; + } + + /** + * create command + * + * @return + * @throws Exception + */ + private String buildShellCommandFile(String jobConfigFilePath) + throws Exception { + // generate scripts + String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId()); + Path path = new File(fileName).toPath(); + + if (Files.exists(path)) { + return fileName; + } + + // datax python command + StringBuilder sbr = new StringBuilder(); + sbr.append(DATAX_PYTHON); + sbr.append(" "); + sbr.append(DATAX_HOME_EVN); + sbr.append(" "); + sbr.append(jobConfigFilePath); + String dataxCommand = sbr.toString(); + + // find process instance by task id + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + + // combining local and global parameters + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), dataXParameters.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime()); + if (paramsMap != null) { + dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap)); + } + + logger.debug("raw script : {}", dataxCommand); + + // create shell command file + Set perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X); + FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); + Files.createFile(path, attr); + Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND); + + return fileName; + } + + /** + * parsing synchronized column names in SQL statements + * + * @param dsType + * the database type of the data source + * @param dtType + * the database type of the data target + * @param dataSourceCfg + * the database connection parameters of the data source + * @param sql + * sql for data synchronization + * @return + */ + private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) { + String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(dsType, sql); + + if (columnNames == null || columnNames.length == 0) { + logger.info("try to execute sql analysis query column name"); + columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql); + } + + notNull(columnNames, String.format("parsing sql columns failed : %s", sql)); + + return DataxUtils.convertKeywordsColumns(dtType, columnNames); + } + + /** + * try grammatical parsing column + * + * @param dbType + * database type + * @param sql + * sql for data synchronization + * @return column name array + * @throws RuntimeException + */ + private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) { + String[] columnNames; + + try { + SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql); + notNull(parser, String.format("database driver [%s] is not support", dbType.toString())); + + SQLStatement sqlStatement = parser.parseStatement(); + SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement; + SQLSelect sqlSelect = sqlSelectStatement.getSelect(); + + List selectItemList = null; + if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) { + SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery(); + selectItemList = block.getSelectList(); + } else if (sqlSelect.getQuery() instanceof SQLUnionQuery) { + SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery(); + SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight(); + selectItemList = block.getSelectList(); + } + + notNull(selectItemList, + String.format("select query type [%s] is not support", sqlSelect.getQuery().toString())); + + columnNames = new String[selectItemList.size()]; + for (int i = 0; i < selectItemList.size(); i++ ) { + SQLSelectItem item = selectItemList.get(i); + + String columnName = null; + + if (item.getAlias() != null) { + columnName = item.getAlias(); + } else if (item.getExpr() != null) { + if (item.getExpr() instanceof SQLPropertyExpr) { + SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr(); + columnName = expr.getName(); + } else if (item.getExpr() instanceof SQLIdentifierExpr) { + SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr(); + columnName = expr.getName(); + } + } else { + throw new RuntimeException( + String.format("grammatical analysis sql column [ %s ] failed", item.toString())); + } + + if (columnName == null) { + throw new RuntimeException( + String.format("grammatical analysis sql column [ %s ] failed", item.toString())); + } + + columnNames[i] = columnName; + } + } + catch (Exception e) { + logger.warn(e.getMessage(), e); + return null; + } + + return columnNames; + } + + /** + * try to execute sql to resolve column names + * + * @param baseDataSource + * the database connection parameters + * @param sql + * sql for data synchronization + * @return column name array + */ + public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) { + String[] columnNames; + sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql); + sql = sql.replace(";", ""); + + try ( + Connection connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), baseDataSource.getUser(), + baseDataSource.getPassword()); + PreparedStatement stmt = connection.prepareStatement(sql); + ResultSet resultSet = stmt.executeQuery()) { + + ResultSetMetaData md = resultSet.getMetaData(); + int num = md.getColumnCount(); + columnNames = new String[num]; + for (int i = 1; i <= num; i++ ) { + columnNames[i - 1] = md.getColumnName(i); + } + } + catch (SQLException e) { + logger.warn(e.getMessage(), e); + return null; + } + + return columnNames; + } + + @Override + public AbstractParameters getParameters() { + return dataXParameters; + } + + private void notNull(Object obj, String message) { + if (obj == null) { + throw new RuntimeException(message); + } + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index e8a97fecc5..eba05a0d21 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -105,7 +105,7 @@ public class SqlTask extends AbstractTask { // set the name of the current thread String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); - logger.info(sqlParameters.toString()); + logger.info("Full sql parameters: {}", sqlParameters); logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}", sqlParameters.getType(), sqlParameters.getDatasource(), @@ -123,19 +123,21 @@ public class SqlTask extends AbstractTask { } dataSource= processDao.findDataSourceById(sqlParameters.getDatasource()); - logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", - dataSource.getName(), - dataSource.getType(), - dataSource.getNote(), - dataSource.getUserId(), - dataSource.getConnectionParams()); + // data source is null if (dataSource == null){ logger.error("datasource not exists"); exitStatusCode = -1; return; } + logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", + dataSource.getName(), + dataSource.getType(), + dataSource.getNote(), + dataSource.getUserId(), + dataSource.getConnectionParams()); + Connection con = null; List createFuncs = null; try { @@ -289,12 +291,12 @@ public class SqlTask extends AbstractTask { } } - try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds)) { + try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds); + ResultSet resultSet = stmt.executeQuery()) { // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send JSONArray resultJSONArray = new JSONArray(); - ResultSet resultSet = stmt.executeQuery(); ResultSetMetaData md = resultSet.getMetaData(); int num = md.getColumnCount(); @@ -305,11 +307,10 @@ public class SqlTask extends AbstractTask { } resultJSONArray.add(mapOfColValues); } - resultSet.close(); logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); // if there is a result set - if (resultJSONArray.size() > 0) { + if ( !resultJSONArray.isEmpty() ) { if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { sendAttachment(sqlParameters.getTitle(), JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); @@ -337,6 +338,12 @@ public class SqlTask extends AbstractTask { } catch (Exception e) { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage()); + } finally { + try { + connection.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } } return connection; } @@ -349,22 +356,23 @@ public class SqlTask extends AbstractTask { * @throws Exception */ private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception { - PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql()); // is the timeout set boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; - if(timeoutFlag){ - stmt.setQueryTimeout(taskProps.getTaskTimeout()); - } - Map params = sqlBinds.getParamsMap(); - if(params != null) { - for (Map.Entry entry : params.entrySet()) { - Property prop = entry.getValue(); - ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); + try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) { + if(timeoutFlag){ + stmt.setQueryTimeout(taskProps.getTaskTimeout()); + } + Map params = sqlBinds.getParamsMap(); + if(params != null) { + for (Map.Entry entry : params.entrySet()) { + Property prop = entry.getValue(); + ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); + } } + logger.info("prepare statement replace sql : {} ", stmt); + return stmt; } - logger.info("prepare statement replace sql : {} ",stmt.toString()); - return stmt; } /** @@ -452,7 +460,7 @@ public class SqlTask extends AbstractTask { for(int i=1;i<=sqlParamsMap.size();i++){ logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")"); } - logger.info(logPrint.toString()); + logger.info("Sql Params are {}", logPrint); } /** diff --git a/dolphinscheduler-server/src/main/resources/master_logback.xml b/dolphinscheduler-server/src/main/resources/master_logback.xml deleted file mode 100644 index 54c3cf5781..0000000000 --- a/dolphinscheduler-server/src/main/resources/master_logback.xml +++ /dev/null @@ -1,52 +0,0 @@ - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - ${log.base}/dolphinscheduler-master.log - - INFO - - - ${log.base}/dolphinscheduler-master.%d{yyyy-MM-dd_HH}.%i.log - 168 - 200MB - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/resources/worker_logback.xml b/dolphinscheduler-server/src/main/resources/worker_logback.xml deleted file mode 100644 index 7ba0c9b8ab..0000000000 --- a/dolphinscheduler-server/src/main/resources/worker_logback.xml +++ /dev/null @@ -1,81 +0,0 @@ - - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - INFO - - - - taskAppId - ${log.base} - - - - ${log.base}/${taskAppId}.log - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - true - - - - - - ${log.base}/dolphinscheduler-worker.log - - INFO - - - - ${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log - 168 - 200MB - -       - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - -    - - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index 6f31e66213..d7c3de13a5 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -134,7 +134,7 @@ public class MasterExecThreadTest { method.setAccessible(true); method.invoke(masterExecThread); // one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save - verify(processDao, times(16)).saveProcessInstance(processInstance); + verify(processDao, times(15)).saveProcessInstance(processInstance); }catch (Exception e){ Assert.assertTrue(false); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java new file mode 100644 index 0000000000..2720bb8a28 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; +import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; +import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.junit.Assert; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * DataxUtils Tester. + */ +public class DataxUtilsTest { + + /** + * + * Method: getReaderPluginName(DbType dbType) + * + */ + @Test + public void testGetReaderPluginName() { + assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, DataxUtils.getReaderPluginName(DbType.MYSQL)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE)); + assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null); + } + + /** + * + * Method: getWriterPluginName(DbType dbType) + * + */ + @Test + public void testGetWriterPluginName() { + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, DataxUtils.getWriterPluginName(DbType.MYSQL)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE)); + assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null); + } + + /** + * + * Method: getSqlStatementParser(DbType dbType, String sql) + * + */ + @Test + public void testGetSqlStatementParser() throws Exception { + assertTrue(DataxUtils.getSqlStatementParser(DbType.MYSQL, "select 1") instanceof MySqlStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.POSTGRESQL, "select 1") instanceof PGSQLStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.ORACLE, "select 1") instanceof OracleStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.SQLSERVER, "select 1") instanceof SQLServerStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") == null); + } + + /** + * + * Method: convertKeywordsColumns(DbType dbType, String[] columns) + * + */ + @Test + public void testConvertKeywordsColumns() throws Exception { + String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "}; + String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"}; + + String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns); + + assertTrue(fromColumns.length == toColumns.length); + + for (int i = 0; i < toColumns.length; i++) { + assertEquals(targetColumns[i], toColumns[i]); + } + } + + /** + * + * Method: doConvertKeywordsColumn(DbType dbType, String column) + * + */ + @Test + public void testDoConvertKeywordsColumn() throws Exception { + assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" ")); + assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" ")); + assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" ")); + assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" ")); + assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" ")); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java deleted file mode 100644 index 4fbbdab70f..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.junit.Test; -import java.util.Date; -import static org.junit.Assert.assertEquals; - -/** - * Test ScheduleUtils - */ -public class ScheduleUtilsTest { - - /** - * Test the getRecentTriggerTime method - */ - @Test - public void testGetRecentTriggerTime() { - Date from = DateUtils.stringToDate("2020-01-01 00:00:00"); - Date to = DateUtils.stringToDate("2020-01-31 01:00:00"); - // test date - assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", to, from).size()); - // test error cron - assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * *", from, to).size()); - // test cron - assertEquals(31, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", from, to).size()); - } -} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java deleted file mode 100644 index fb564a22fb..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverterTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.worker.log; - - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class SensitiveDataConverterTest { - - private final Logger logger = LoggerFactory.getLogger(SensitiveDataConverterTest.class); - - /** - * password pattern - */ - private final Pattern pwdPattern = Pattern.compile(Constants.DATASOURCE_PASSWORD_REGEX); - - - /** - * mask sensitive logMsg - sql task datasource password - */ - @Test - public void testPwdLogMsgConverter() { - - String logMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + - "\"database\":\"carbond\"," + - "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + - "\"user\":\"view\"," + - "\"password\":\"view1\"}"; - - String maskLogMsg = "{\"address\":\"jdbc:mysql://192.168.xx.xx:3306\"," + - "\"database\":\"carbond\"," + - "\"jdbcUrl\":\"jdbc:mysql://192.168.xx.xx:3306/ods\"," + - "\"user\":\"view\"," + - "\"password\":\"******\"}"; - - - logger.info("parameter : {}", logMsg); - logger.info("parameter : {}", passwordHandler(pwdPattern, logMsg)); - - Assert.assertNotEquals(logMsg, passwordHandler(pwdPattern, logMsg)); - Assert.assertEquals(maskLogMsg, passwordHandler(pwdPattern, logMsg)); - - } - - /** - * password regex test - * - * @param logMsg original log - */ - private static String passwordHandler(Pattern pattern, String logMsg) { - - Matcher matcher = pattern.matcher(logMsg); - - StringBuffer sb = new StringBuffer(logMsg.length()); - - while (matcher.find()) { - - String password = matcher.group(); - - String maskPassword = SensitiveLogUtil.maskDataSourcePwd(password); - - matcher.appendReplacement(sb, maskPassword); - } - matcher.appendTail(sb); - - return sb.toString(); - } - - -} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index 1117fe0015..04c844827f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java index 7cf4b874d1..7da3f710b6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java new file mode 100644 index 0000000000..7a6073e05d --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.datax; + + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.job.db.BaseDataSource; +import org.apache.dolphinscheduler.common.job.db.DataSourceFactory; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.utils.DataxUtils; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +/** + * DataxTask Tester. + */ +public class DataxTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(DataxTaskTest.class); + + private DataxTask dataxTask; + + private ProcessDao processDao; + + private ShellCommandExecutor shellCommandExecutor; + + private ApplicationContext applicationContext; + + @Before + public void before() + throws Exception { + processDao = Mockito.mock(ProcessDao.class); + shellCommandExecutor = Mockito.mock(ShellCommandExecutor.class); + + applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessDao.class)).thenReturn(processDao); + + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + props.setEnvFile(".dolphinscheduler_env.sh"); + props.setTaskStartTime(new Date()); + props.setTaskTimeout(0); + props.setTaskParams( + "{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); + dataxTask = PowerMockito.spy(new DataxTask(props, logger)); + dataxTask.init(); + + Mockito.when(processDao.findDataSourceById(1)).thenReturn(getDataSource()); + Mockito.when(processDao.findDataSourceById(2)).thenReturn(getDataSource()); + Mockito.when(processDao.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); + + String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), props.getTaskAppId()); + Mockito.when(shellCommandExecutor.run(fileName, processDao)).thenReturn(0); + } + + private DataSource getDataSource() { + DataSource dataSource = new DataSource(); + dataSource.setType(DbType.MYSQL); + dataSource.setConnectionParams( + "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"); + dataSource.setUserId(1); + return dataSource; + } + + private ProcessInstance getProcessInstance() { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setCommandType(CommandType.START_PROCESS); + processInstance.setScheduleTime(new Date()); + return processInstance; + } + + @After + public void after() + throws Exception {} + + /** + * Method: DataxTask() + */ + @Test + public void testDataxTask() + throws Exception { + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + Assert.assertNotNull(new DataxTask(props, logger)); + } + + /** + * Method: init + */ + @Test + public void testInit() + throws Exception { + try { + dataxTask.init(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: handle() + */ + @Test + public void testHandle() + throws Exception { + try { + dataxTask.handle(); + } catch (RuntimeException e) { + if (e.getMessage().indexOf("process error . exitCode is : -1") < 0) { + Assert.fail(); + } + } + } + + /** + * Method: cancelApplication() + */ + @Test + public void testCancelApplication() + throws Exception { + try { + dataxTask.cancelApplication(true); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource + * dataSourceCfg, String sql) + */ + @Test + public void testParsingSqlColumnNames() + throws Exception { + try { + BaseDataSource dataSource = DataSourceFactory.getDatasource(getDataSource().getType(), + getDataSource().getConnectionParams()); + + Method method = DataxTask.class.getDeclaredMethod("parsingSqlColumnNames", DbType.class, DbType.class, BaseDataSource.class, String.class); + method.setAccessible(true); + String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, DbType.MYSQL, dataSource, "select 1 as a, 2 as `table` from dual"); + + Assert.assertNotNull(columns); + + Assert.assertTrue(columns.length == 2); + + Assert.assertEquals("[`a`, `table`]", Arrays.toString(columns)); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: tryGrammaticalParsingSqlColumnNames(DbType dbType, String sql) + */ + @Test + public void testTryGrammaticalAnalysisSqlColumnNames() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("tryGrammaticalAnalysisSqlColumnNames", DbType.class, String.class); + method.setAccessible(true); + String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, "select t1.a, t1.b from test t1 union all select a, t2.b from (select a, b from test) t2"); + + Assert.assertNotNull(columns); + + Assert.assertTrue(columns.length == 2); + + Assert.assertEquals("[a, b]", Arrays.toString(columns)); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, + * String sql) + */ + @Test + public void testTryExecuteSqlResolveColumnNames() + throws Exception { + // TODO: Test goes here... + } + + /** + * Method: buildDataxJsonFile() + */ + @Test + public void testBuildDataxJsonFile() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); + method.setAccessible(true); + String filePath = (String) method.invoke(dataxTask, null); + Assert.assertNotNull(filePath); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxJobContentJson() + */ + @Test + public void testBuildDataxJobContentJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJobContentJson"); + method.setAccessible(true); + List contentList = (List) method.invoke(dataxTask, null); + Assert.assertNotNull(contentList); + + JSONObject content = contentList.get(0); + JSONObject reader = (JSONObject) content.get("reader"); + Assert.assertNotNull(reader); + + String readerPluginName = (String) reader.get("name"); + Assert.assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, readerPluginName); + + JSONObject writer = (JSONObject) content.get("writer"); + Assert.assertNotNull(writer); + + String writerPluginName = (String) writer.get("name"); + Assert.assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, writerPluginName); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxJobSettingJson() + */ + @Test + public void testBuildDataxJobSettingJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJobSettingJson"); + method.setAccessible(true); + JSONObject setting = (JSONObject) method.invoke(dataxTask, null); + Assert.assertNotNull(setting); + Assert.assertNotNull(setting.get("speed")); + Assert.assertNotNull(setting.get("errorLimit")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxCoreJson() + */ + @Test + public void testBuildDataxCoreJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxCoreJson"); + method.setAccessible(true); + JSONObject coreConfig = (JSONObject) method.invoke(dataxTask, null); + Assert.assertNotNull(coreConfig); + Assert.assertNotNull(coreConfig.get("transport")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildShellCommandFile(String jobConfigFilePath) + */ + @Test + public void testBuildShellCommandFile() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class); + method.setAccessible(true); + Assert.assertNotNull(method.invoke(dataxTask, "test.json")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: getParameters + */ + @Test + public void testGetParameters() + throws Exception { + Assert.assertTrue(dataxTask.getParameters() != null); + } + + /** + * Method: notNull(Object obj, String message) + */ + @Test + public void testNotNull() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("notNull", Object.class, String.class); + method.setAccessible(true); + method.invoke(dataxTask, "abc", "test throw RuntimeException"); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js index 62ca8103e5..e8187043bf 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -279,6 +279,10 @@ let tasksType = { 'HTTP': { desc: 'HTTP', color: '#E46F13' + }, + 'DATAX': { + desc: 'DataX', + color: '#1fc747' } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index fbb4f418d0..420bae8c89 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -101,6 +101,9 @@ .icos-HTTP { background: url("../img/toobar_HTTP.png") no-repeat 50% 50%; } + .icos-DATAX { + background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%; + } .toolbar { width: 60px; height: 100%; diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 3f009eb75c..682dd5b51a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -191,7 +191,13 @@ ref="HTTP" :backfill-item="backfillItem"> - + +
@@ -216,6 +222,7 @@ import mProcedure from './tasks/procedure' import mDependent from './tasks/dependent' import mHttp from './tasks/http' + import mDatax from './tasks/datax' import mSubProcess from './tasks/sub_process' import mSelectInput from './_source/selectInput' import mTimeoutAlarm from './_source/timeoutAlarm' @@ -565,6 +572,7 @@ mPython, mDependent, mHttp, + mDatax, mSelectInput, mTimeoutAlarm, mPriority, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue new file mode 100755 index 0000000000..ce918f49cf --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue @@ -0,0 +1,292 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png new file mode 100644 index 0000000000..b59c759208 Binary files /dev/null and b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png differ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png new file mode 100755 index 0000000000..913af420c6 Binary files /dev/null and b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png differ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue index c50d392bdb..783eeadab5 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue @@ -46,11 +46,11 @@ {{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}} - + {{item.alias}} - {{item.fileName}} + {{item.fileName}} {{item.description}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/createUdf.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/createUdf.vue index 13a337d557..01d8d22650 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/createUdf.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/createUdf.vue @@ -76,7 +76,8 @@ filterable v-model="resourceId" :disabled="isUpdate" - style="width: 200px"> + :add-title="true" + style="width: 261px"> {{$index + 1}} - + {{item.funcName}} @@ -142,7 +142,7 @@ v-ps