Browse Source

Merge branch 'dev' of https://github.com/apache/incubator-dolphinscheduler into apache-dev

# Conflicts:
#	pom.xml
pull/2/head
lilin 5 years ago
parent
commit
25c63336bf
  1. 53
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  2. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java
  3. 229
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  4. 50
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java
  5. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java
  6. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java
  7. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java
  8. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java
  9. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java
  10. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java
  11. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java
  12. 64
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
  13. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
  14. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java
  15. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
  16. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java
  17. 157
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java
  18. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml
  19. 5
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
  20. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
  21. 77
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java
  22. 97
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java
  23. 50
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java
  24. 21
      dolphinscheduler-server/pom.xml
  25. 50
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  26. 79
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java
  27. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
  28. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  29. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
  30. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
  31. 48
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  32. 154
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
  33. 44
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java
  34. 9
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss
  35. 22
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  36. 2
      dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/list.vue
  37. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue
  38. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue
  39. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue
  40. 2
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue
  41. 2
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue
  42. 2
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/resource/_source/list.vue
  43. 2
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/_source/list.vue
  44. 2
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/list.vue
  45. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  46. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  47. 3
      pom.xml

53
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.server.utils.ScheduleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -499,22 +501,47 @@ public class ExecutorService extends BaseService{
if(commandType == CommandType.COMPLEMENT_DATA){
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
if(runMode == RunMode.RUN_MODE_SERIAL){
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJson(cmdParam));
return processDao.createCommand(command);
}else if (runMode == RunMode.RUN_MODE_PARALLEL){
int runCunt = 0;
while(!start.after(end)){
runCunt += 1;
if(null != start && null != end && start.before(end)){
if(runMode == RunMode.RUN_MODE_SERIAL){
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJson(cmdParam));
processDao.createCommand(command);
start = DateUtils.getSomeDay(start, 1);
return processDao.createCommand(command);
}else if (runMode == RunMode.RUN_MODE_PARALLEL){
List<Schedule> schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
List<Date> listDate = new LinkedList<>();
if(!CollectionUtils.isEmpty(schedules)){
for (Schedule item : schedules) {
List<Date> list = ScheduleUtils.getRecentTriggerTime(item.getCrontab(), start, end);
listDate.addAll(list);
}
}
if(!CollectionUtils.isEmpty(listDate)){
// loop by schedule date
for (Date date : listDate) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
command.setCommandParam(JSONUtils.toJson(cmdParam));
processDao.createCommand(command);
}
return listDate.size();
}else{
// loop by day
int runCunt = 0;
while(!start.after(end)) {
runCunt += 1;
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
command.setCommandParam(JSONUtils.toJson(cmdParam));
processDao.createCommand(command);
start = DateUtils.getSomeDay(start, 1);
}
return runCunt;
}
}
return runCunt;
}else{
logger.error("there is not vaild schedule date for the process definition: id:{},date:{}",
processDefineId, schedule);
}
}else{
command.setCommandParam(JSONUtils.toJson(cmdParam));

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java

@ -139,12 +139,16 @@ public class SessionService extends BaseService{
* @param loginUser login user
*/
public void signOut(String ip, User loginUser) {
/**
* query session by user id and ip
*/
Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(),ip);
try {
/**
* query session by user id and ip
*/
Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(),ip);
//delete session
sessionMapper.deleteById(session.getId());
//delete session
sessionMapper.deleteById(session.getId());
}catch (Exception e){
logger.warn("userId : {} , ip : {} , find more one session",loginUser.getId(),ip);
}
}
}

229
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.text.ParseException;
import java.util.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
/**
* test for ExecutorService
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class ExecutorService2Test {
@InjectMocks
private ExecutorService executorService;
@Mock
private ProcessDao processDao;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectService projectService;
private int processDefinitionId = 1;
private int tenantId = 1;
private int userId = 1;
private ProcessDefinition processDefinition = new ProcessDefinition();
private User loginUser = new User();
private String projectName = "projectName";
private Project project = new Project();
private String cronTime;
@Before
public void init(){
// user
loginUser.setId(userId);
// processDefinition
processDefinition.setId(processDefinitionId);
processDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId);
// project
project.setName(projectName);
// cronRangeTime
cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00";
// mock
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition);
Mockito.when(processDao.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processDao.createCommand(any(Command.class))).thenReturn(1);
}
/**
* not complement
* @throws ParseException
*/
@Test
public void testNoComplement() throws ParseException {
try {
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.START_PROCESS,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(1)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
/**
* date error
* @throws ParseException
*/
@Test
public void testDateError() throws ParseException {
try {
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processDao, times(0)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
/**
* serial
* @throws ParseException
*/
@Test
public void testSerial() throws ParseException {
try {
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(1)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
/**
* without schedule
* @throws ParseException
*/
@Test
public void testParallelWithOutSchedule() throws ParseException {
try{
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(31)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
/**
* with schedule
* @throws ParseException
*/
@Test
public void testParallelWithSchedule() throws ParseException {
try{
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(16)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
private List<Schedule> zeroSchedulerList(){
return Collections.EMPTY_LIST;
}
private List<Schedule> oneSchedulerList(){
List<Schedule> schedulerList = new LinkedList<>();
Schedule schedule = new Schedule();
schedule.setCrontab("0 0 0 1/2 * ?");
schedulerList.add(schedule);
return schedulerList;
}
private Map<String, Object> checkProjectAndAuth(){
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
}

50
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
/**
* Authorization type
*/
public enum AuthorizationType {
/**
* 0 RESOURCE_FILE;
* 1 DATASOURCE;
* 2 UDF;
*/
RESOURCE_FILE(0, "resource file"),
DATASOURCE(1, "data source"),
UDF(2, "udf function");
AuthorizationType(int code, String descp){
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,7 +59,7 @@ public class ClickHouseDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,7 +62,7 @@ public class HiveDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), "");
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,7 +57,7 @@ public class MySQLDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
Class.forName(Constants.COM_MYSQL_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,7 +59,7 @@ public class OracleDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
Class.forName(Constants.COM_ORACLE_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,7 +61,7 @@ public class PostgreDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.postgresql.Driver");
Class.forName(Constants.ORG_POSTGRESQL_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,7 +55,7 @@ public class SQLServerDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,7 +64,7 @@ public class SparkDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), "");
} finally {
if (con != null) {

64
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.model.Cron;
import org.apache.commons.lang.ArrayUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.DateInterval;
@ -25,7 +26,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.ArrayUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -44,6 +44,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
@ -462,12 +463,9 @@ public class ProcessDao {
return null;
}
if(null == tenant){
if(tenant == null){
User user = userMapper.selectById(userId);
if (null != user) {
tenant = tenantMapper.queryById(user.getTenantId());
}
tenant = tenantMapper.queryById(user.getTenantId());
}
return tenant;
}
@ -974,6 +972,9 @@ public class ProcessDao {
public Boolean submitTaskToQueue(TaskInstance taskInstance) {
try{
if(taskInstance.isSubProcess()){
return true;
}
if(taskInstance.getState().typeIsFinished()){
logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
@ -1460,6 +1461,15 @@ public class ProcessDao {
return scheduleMapper.selectById(id);
}
/**
* query Schedule by processDefinitionId
* @param processDefinitionId processDefinitionId
* @see Schedule
*/
public List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) {
return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
}
/**
* query need failover process instance
* @param host host
@ -1770,5 +1780,47 @@ public class ProcessDao {
return projectIdList;
}
/**
* list unauthorized udf function
* @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
public <T> List<T> listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType){
List<T> resultList = new ArrayList<T>();
if (!ArrayUtils.isEmpty(needChecks)) {
Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
switch (authorizationType){
case RESOURCE_FILE:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet());
originResSet.removeAll(authorizedResources);
break;
case DATASOURCE:
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedDatasources);
break;
case UDF:
Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedUdfs);
break;
}
resultList.addAll(originResSet);
}
return resultList;
}
/**
* get user by user id
* @param userId user id
* @return User
*/
public User getUserById(int userId){
return userMapper.queryDetailsById(userId);
}
}

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java

@ -77,4 +77,13 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
List<DataSource> listAllDataSourceByType(@Param("type") Integer type);
/**
* list authorized UDF function
* @param userId userId
* @param dataSourceIds data source id array
* @return UDF function list
*/
<T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds);
}

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java

@ -83,4 +83,12 @@ public interface ResourceMapper extends BaseMapper<Resource> {
* @return tenant code
*/
String queryTenantCodeByResourceName(@Param("resName") String resName);
/**
* list authorized resource
* @param userId userId
* @param resNames resource names
* @return resource list
*/
<T> List<Resource> listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java

@ -60,4 +60,11 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
*/
List<Schedule> queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
/**
* query schedule list by process definition id
* @param processDefinitionId
* @return
*/
List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java

@ -78,5 +78,12 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
*/
List<UdfFunc> queryAuthedUdfFunc(@Param("userId") int userId);
/**
* list authorized UDF function
* @param userId userId
* @param udfIds UDF function id array
* @return UDF function list
*/
<T> List<UdfFunc> listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds);
}

157
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.User;
import org.slf4j.Logger;
import java.util.List;
public class PermissionCheck<T> {
/**
* logger
*/
private Logger logger;
/**
* Authorization Type
*/
private AuthorizationType authorizationType;
/**
* Authorization Type
*/
private ProcessDao processDao;
/**
* need check array
*/
private T[] needChecks;
/**
* user id
*/
private int userId;
/**
* permission check
* @param authorizationType authorization type
* @param processDao process dao
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao) {
this.authorizationType = authorizationType;
this.processDao = processDao;
}
/**
* permission check
* @param authorizationType
* @param processDao
* @param needChecks
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId) {
this.authorizationType = authorizationType;
this.processDao = processDao;
this.needChecks = needChecks;
this.userId = userId;
}
/**
* permission check
* @param authorizationType
* @param processDao
* @param needChecks
* @param userId
* @param logger
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId,Logger logger) {
this.authorizationType = authorizationType;
this.processDao = processDao;
this.needChecks = needChecks;
this.userId = userId;
this.logger = logger;
}
public AuthorizationType getAuthorizationType() {
return authorizationType;
}
public void setAuthorizationType(AuthorizationType authorizationType) {
this.authorizationType = authorizationType;
}
public ProcessDao getProcessDao() {
return processDao;
}
public void setProcessDao(ProcessDao processDao) {
this.processDao = processDao;
}
public T[] getNeedChecks() {
return needChecks;
}
public void setNeedChecks(T[] needChecks) {
this.needChecks = needChecks;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
/**
* has permission
* @return true if has permission
*/
public boolean hasPermission(){
try {
checkPermission();
return true;
} catch (Exception e) {
return false;
}
}
/**
* check permission
* @throws Exception exception
*/
public void checkPermission() throws Exception{
if(this.needChecks.length > 0){
// get user type in order to judge whether the user is admin
User user = processDao.getUserById(userId);
if (user.getUserType() != UserType.ADMIN_USER){
List<T> unauthorizedList = processDao.listUnauthorized(userId,needChecks,authorizationType);
// if exist unauthorized resource
if(CollectionUtils.isNotEmpty(unauthorizedList)){
logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList.toString());
throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
}
}
}
}
}

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml

@ -74,6 +74,19 @@
from t_ds_datasource
where type = #{type}
</select>
<select id="listAuthorizedDataSource" resultType="org.apache.dolphinscheduler.dao.entity.DataSource">
select *
from t_ds_datasource
where
id in (select datasource_id from t_ds_relation_datasource_user where user_id=#{userId}
union select id as datasource_id from t_ds_datasource where user_id=#{userId})
<if test="dataSourceIds != null and dataSourceIds != ''">
and id in
<foreach collection="dataSourceIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>

5
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml

@ -55,4 +55,9 @@
from t_ds_schedules
where process_definition_id =#{processDefinitionId}
</select>
<select id="queryReleaseSchedulerListByProcessDefinitionId" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select *
from t_ds_schedules
where process_definition_id =#{processDefinitionId} and release_state = 1
</select>
</mapper>

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml

@ -74,4 +74,17 @@
WHERE u.id = rel.udf_id
AND rel.user_id = #{userId}
</select>
<select id="listAuthorizedUdfFunc" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select *
from t_ds_udfs
where
id in (select udf_id from t_ds_relation_udfs_user where user_id=#{userId}
union select id as udf_id from t_ds_udfs where user_id=#{userId})
<if test="udfIds != null and udfIds != ''">
and id in
<foreach collection="udfIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>

77
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java

@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.DatasourceUser;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.User;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -34,7 +36,9 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import static org.hamcrest.Matchers.*;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
/**
@ -58,6 +62,9 @@ public class DataSourceMapperTest {
@Autowired
DataSourceUserMapper dataSourceUserMapper;
@Autowired
private UserMapper userMapper;
/**
* test insert
*/
@ -244,6 +251,33 @@ public class DataSourceMapperTest {
}
}
@Test
public void testListAuthorizedDataSource(){
//create general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
//create data source
DataSource dataSource = createDataSource(generalUser1.getId(), "ds-1");
DataSource unauthorizdDataSource = createDataSource(generalUser2.getId(), "ds-2");
//data source ids
Integer[] dataSourceIds = new Integer[]{dataSource.getId(),unauthorizdDataSource.getId()};
List<DataSource> authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds);
Assert.assertEquals(generalUser1.getId(),dataSource.getUserId());
Assert.assertNotEquals(generalUser1.getId(),unauthorizdDataSource.getUserId());
Assert.assertFalse(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds)));
//authorize object unauthorizdDataSource to generalUser1
createUserDataSource(generalUser1, unauthorizdDataSource);
authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds);
Assert.assertTrue(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds)));
}
/**
* create datasource relation
* @param userId
@ -289,7 +323,6 @@ public class DataSourceMapperTest {
return dataSourceMap;
}
/**
* create datasource
* @return datasource
@ -330,5 +363,41 @@ public class DataSourceMapperTest {
return dataSource;
}
/**
* create general user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* create the relation of user and data source
* @param user user
* @param dataSource data source
* @return DatasourceUser
*/
private DatasourceUser createUserDataSource(User user,DataSource dataSource){
DatasourceUser datasourceUser = new DatasourceUser();
datasourceUser.setDatasourceId(dataSource.getId());
datasourceUser.setUserId(user.getId());
datasourceUser.setPerm(7);
datasourceUser.setCreateTime(DateUtils.getCurrentDate());
datasourceUser.setUpdateTime(DateUtils.getCurrentDate());
dataSourceUserMapper.insert(datasourceUser);
return datasourceUser;
}
}

97
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java

@ -17,22 +17,36 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.dao.entity.*;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.ResourcesUser;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@Rollback(true)
public class ResourceMapperTest {
@Autowired
@ -61,6 +75,59 @@ public class ResourceMapperTest {
return resource;
}
/**
* create resource by user
* @param user user
* @return Resource
*/
private Resource createResource(User user){
//insertOne
Resource resource = new Resource();
resource.setAlias(String.format("ut resource %s",user.getUserName()));
resource.setType(ResourceType.FILE);
resource.setUserId(user.getId());
resourceMapper.insert(resource);
return resource;
}
/**
* create user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* create resource user
* @return ResourcesUser
*/
private ResourcesUser createResourcesUser(Resource resource,User user){
//insertOne
ResourcesUser resourcesUser = new ResourcesUser();
resourcesUser.setCreateTime(new Date());
resourcesUser.setUpdateTime(new Date());
resourcesUser.setUserId(user.getId());
resourcesUser.setResourcesId(resource.getId());
resourceUserMapper.insert(resourcesUser);
return resourcesUser;
}
@Test
public void testInsert(){
Resource resource = insertOne();
assertNotNull(resource.getId());
assertThat(resource.getId(),greaterThan(0));
}
/**
* test update
*/
@ -230,4 +297,30 @@ public class ResourceMapperTest {
resourceMapper.deleteById(resource.getId());
}
@Test
public void testListAuthorizedResource(){
// create a general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
// create one resource
Resource resource = createResource(generalUser2);
Resource unauthorizedResource = createResource(generalUser2);
// need download resources
String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()};
List<Resource> resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertEquals(generalUser2.getId(),resource.getUserId());
Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
// authorize object unauthorizedResource to generalUser
createResourcesUser(unauthorizedResource,generalUser2);
List<Resource> authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
}
}

50
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java

@ -29,13 +29,20 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@Rollback(true)
public class UdfFuncMapperTest {
@Autowired
@ -133,6 +140,23 @@ public class UdfFuncMapperTest {
return udfUser;
}
/**
* create general user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* test update
*/
@ -268,4 +292,30 @@ public class UdfFuncMapperTest {
udfUserMapper.deleteById(udfUser.getId());
Assert.assertNotEquals(udfFuncList.size(), 0);
}
@Test
public void testListAuthorizedUdfFunc(){
//create general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
//create udf function
UdfFunc udfFunc = insertOne(generalUser1);
UdfFunc unauthorizdUdfFunc = insertOne(generalUser2);
//udf function ids
Integer[] udfFuncIds = new Integer[]{udfFunc.getId(),unauthorizdUdfFunc.getId()};
List<UdfFunc> authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds);
Assert.assertEquals(generalUser1.getId(),udfFunc.getUserId());
Assert.assertNotEquals(generalUser1.getId(),unauthorizdUdfFunc.getUserId());
Assert.assertFalse(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds)));
//authorize object unauthorizdUdfFunc to generalUser1
insertOneUDFUser(generalUser1,unauthorizdUdfFunc);
authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds);
Assert.assertTrue(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds)));
}
}

21
dolphinscheduler-server/pom.xml

@ -111,6 +111,27 @@
<artifactId>dolphinscheduler-alert</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

50
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
@ -29,10 +30,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.server.utils.ScheduleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -203,10 +206,30 @@ public class MasterExecThread implements Runnable {
Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
processDao.saveProcessInstance(processInstance);
Date scheduleDate = processInstance.getScheduleTime();
if(scheduleDate == null){
scheduleDate = startDate;
// get schedules
int processDefinitionId = processInstance.getProcessDefinitionId();
List<Schedule> schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
List<Date> listDate = Lists.newLinkedList();
if(!CollectionUtils.isEmpty(schedules)){
for (Schedule schedule : schedules) {
List<Date> list = ScheduleUtils.getRecentTriggerTime(schedule.getCrontab(), startDate, endDate);
listDate.addAll(list);
}
}
// get first fire date
Iterator<Date> iterator = null;
Date scheduleDate = null;
if(!CollectionUtils.isEmpty(listDate)) {
iterator = listDate.iterator();
scheduleDate = iterator.next();
processInstance.setScheduleTime(scheduleDate);
processDao.updateProcessInstance(processInstance);
}else{
scheduleDate = processInstance.getScheduleTime();
if(scheduleDate == null){
scheduleDate = startDate;
}
}
while(Stopper.isRunning()){
@ -232,11 +255,22 @@ public class MasterExecThread implements Runnable {
}
// current process instance sucess ,next execute
scheduleDate = DateUtils.getSomeDay(scheduleDate, 1);
if(scheduleDate.after(endDate)){
// all success
logger.info("process {} complement completely!", processInstance.getId());
break;
if(null == iterator){
// loop by day
scheduleDate = DateUtils.getSomeDay(scheduleDate, 1);
if(scheduleDate.after(endDate)){
// all success
logger.info("process {} complement completely!", processInstance.getId());
break;
}
}else{
// loop by schedule date
if(!iterator.hasNext()){
// all success
logger.info("process {} complement completely!", processInstance.getId());
break;
}
scheduleDate = iterator.next();
}
logger.info("process {} start to complement {} data",

79
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
/**
* ScheduleUtils
*/
public class ScheduleUtils {
private static final Logger logger = LoggerFactory.getLogger(ScheduleUtils.class);
/**
* Get the execution time of the time interval
* @param cron
* @param from
* @param to
* @return
*/
public static List<Date> getRecentTriggerTime(String cron, Date from, Date to) {
return getRecentTriggerTime(cron, Integer.MAX_VALUE, from, to);
}
/**
* Get the execution time of the time interval
* @param cron
* @param size
* @param from
* @param to
* @return
*/
public static List<Date> getRecentTriggerTime(String cron, int size, Date from, Date to) {
List list = new LinkedList<Date>();
if(to.before(from)){
logger.error("schedule date from:{} must before date to:{}!", from, to);
return list;
}
try {
CronTriggerImpl trigger = new CronTriggerImpl();
trigger.setCronExpression(cron);
trigger.setStartTime(from);
trigger.setEndTime(to);
trigger.computeFirstFireTime(null);
for (int i = 0; i < size; i++) {
Date schedule = trigger.getNextFireTime();
if(null == schedule){
break;
}
list.add(schedule);
trigger.triggered(null);
}
} catch (ParseException e) {
logger.error("cron:{} error:{}", cron, e.getMessage());
}
return java.util.Collections.unmodifiableList(list);
}
}

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -21,6 +21,7 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@ -42,7 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
@ -94,12 +95,15 @@ public class TaskScheduleThread implements Runnable {
// task node
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// get resource files
List<String> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local
copyHdfsToLocal(processDao,
downloadResource(
taskInstance.getExecutePath(),
createProjectResFiles(taskNode),
resourceFiles,
logger);
// get process instance according to tak instance
ProcessInstance processInstance = taskInstance.getProcessInstance();
@ -204,8 +208,8 @@ public class TaskScheduleThread implements Runnable {
}
/**
* get task log path
* @return
* get task log path
* @return log path
*/
private String getTaskLogPath() {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
@ -294,14 +298,14 @@ public class TaskScheduleThread implements Runnable {
}
/**
* copy hdfs file to local
* download resource file
*
* @param processDao
* @param execLocalPath
* @param projectRes
* @param logger
*/
private void copyHdfsToLocal(ProcessDao processDao, String execLocalPath, List<String> projectRes, Logger logger) throws IOException {
private void downloadResource(String execLocalPath, List<String> projectRes, Logger logger) throws Exception {
checkDownloadPermission(projectRes);
for (String res : projectRes) {
File resFile = new File(execLocalPath, res);
if (!resFile.exists()) {
@ -321,4 +325,16 @@ public class TaskScheduleThread implements Runnable {
}
}
}
/**
* check download resource permission
* @param projectRes resource name list
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
int userId = taskInstance.getProcessInstance().getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE,processDao,resNames,userId,logger);
permissionCheck.checkPermission();
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -207,7 +207,7 @@ public abstract class AbstractCommandExecutor {
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// setting up user to run commands
processBuilder.command("sudo", "-u", tenantCode, commandType(), commandFile);
processBuilder.command("sudo", "-u", tenantCode, commandInterpreter(), commandFile);
process = processBuilder.start();
@ -561,7 +561,7 @@ public abstract class AbstractCommandExecutor {
protected abstract String buildCommandFilePath();
protected abstract String commandType();
protected abstract String commandInterpreter();
protected abstract boolean checkFindApp(String line);
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java

@ -113,7 +113,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
* @return python home
*/
@Override
protected String commandType() {
protected String commandInterpreter() {
String pythonHome = getPythonHome(envFile);
if (StringUtils.isEmpty(pythonHome)){
return PYTHON;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java

@ -74,7 +74,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
* @return command type
*/
@Override
protected String commandType() {
protected String commandInterpreter() {
return SH;
}

48
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.UdfType;
@ -38,6 +41,7 @@ import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@ -119,13 +123,6 @@ public class SqlTask extends AbstractTask {
}
dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
if (null == dataSource){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
@ -133,6 +130,12 @@ public class SqlTask extends AbstractTask {
dataSource.getUserId(),
dataSource.getConnectionParams());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
Connection con = null;
List<String> createFuncs = null;
try {
@ -164,6 +167,8 @@ public class SqlTask extends AbstractTask {
for(int i=0;i<ids.length;i++){
idsArray[i]=Integer.parseInt(ids[i]);
}
// check udf permission
checkUdfPermission(ArrayUtils.toObject(idsArray));
List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(idsArray);
createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
}
@ -449,4 +454,33 @@ public class SqlTask extends AbstractTask {
}
logger.info(logPrint.toString());
}
/**
* check udf function permission
* @param udfFunIds udf functions
* @return if has download permission return true else false
*/
private void checkUdfPermission(Integer[] udfFunIds) throws Exception{
// process instance
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<Integer>(AuthorizationType.UDF,processDao,udfFunIds,userId,logger);
permissionCheckUdf.checkPermission();
}
/**
* check data source permission
* @param dataSourceId data source id
* @return if has download permission return true else false
*/
private void checkDataSourcePermission(int dataSourceId) throws Exception{
// process instance
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckDataSource = new PermissionCheck<Integer>(AuthorizationType.DATASOURCE,processDao,new Integer[]{dataSourceId},userId,logger);
permissionCheckDataSource.checkPermission();
}
}

154
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterExecThread;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.context.ApplicationContext;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mock;
/**
* test for MasterExecThread
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({MasterExecThread.class})
public class MasterExecThreadTest {
private MasterExecThread masterExecThread;
private ProcessInstance processInstance;
private ProcessDao processDao;
private int processDefinitionId = 1;
private MasterConfig config;
private ApplicationContext applicationContext;
@Before
public void init() throws Exception{
processDao = mock(ProcessDao.class);
applicationContext = mock(ApplicationContext.class);
config = new MasterConfig();
config.setMasterExecTaskNum(1);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
processInstance = mock(ProcessInstance.class);
Mockito.when(processInstance.getProcessDefinitionId()).thenReturn(processDefinitionId);
Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS);
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString());
Mockito.when(processInstance.getIsSubProcess()).thenReturn(Flag.NO);
Mockito.when(processInstance.getScheduleTime()).thenReturn(DateUtils.stringToDate("2020-01-01 00:00:00"));
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00");
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-31 23:00:00");
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONObject.toJSONString(cmdParam));
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setGlobalParamMap(Collections.EMPTY_MAP);
processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processDao));
// prepareProcess init dag
Field dag = MasterExecThread.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(masterExecThread, new DAG());
PowerMockito.doNothing().when(masterExecThread, "executeProcess");
PowerMockito.doNothing().when(masterExecThread, "postHandle");
PowerMockito.doNothing().when(masterExecThread, "prepareProcess");
PowerMockito.doNothing().when(masterExecThread, "runProcess");
PowerMockito.doNothing().when(masterExecThread, "endProcess");
}
/**
* without schedule
* @throws ParseException
*/
@Test
public void testParallelWithOutSchedule() throws ParseException {
try{
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 1-30 for next save, and last day 31 no save
verify(processDao, times(31)).saveProcessInstance(processInstance);
}catch (Exception e){
e.printStackTrace();
Assert.assertTrue(false);
}
}
/**
* with schedule
* @throws ParseException
*/
@Test
public void testParallelWithSchedule() throws ParseException {
try{
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save
verify(processDao, times(16)).saveProcessInstance(processInstance);
}catch (Exception e){
Assert.assertTrue(false);
}
}
private List<Schedule> zeroSchedulerList(){
return Collections.EMPTY_LIST;
}
private List<Schedule> oneSchedulerList(){
List<Schedule> schedulerList = new LinkedList<>();
Schedule schedule = new Schedule();
schedule.setCrontab("0 0 0 1/2 * ?");
schedulerList.add(schedule);
return schedulerList;
}
}

44
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.junit.Test;
import java.util.Date;
import static org.junit.Assert.assertEquals;
/**
* Test ScheduleUtils
*/
public class ScheduleUtilsTest {
/**
* Test the getRecentTriggerTime method
*/
@Test
public void testGetRecentTriggerTime() {
Date from = DateUtils.stringToDate("2020-01-01 00:00:00");
Date to = DateUtils.stringToDate("2020-01-31 01:00:00");
// test date
assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", to, from).size());
// test error cron
assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * *", from, to).size());
// test cron
assertEquals(31, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", from, to).size());
}
}

9
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss

@ -159,6 +159,9 @@
border-radius: 0 3px 0 0;
.ans-btn-text {
color: #337ab7;
.ans-icon {
font-size: 16px;
}
}
.assist-btn {
position: absolute;
@ -206,7 +209,7 @@
color: #333;
}
&.active {
background: #e1e2e3;
// background: #e1e2e3;
i {
color: #2d8cf0;
}
@ -234,7 +237,9 @@
border-radius: 3px 3px 0px 0px;
}
}
#screen {
margin-right: 5px;
}
.v-modal-custom-log {
z-index: 101;
}

22
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -61,20 +61,28 @@
<span v-if="name" class="copy-name" @click="_copyName" :data-clipboard-text="name"><em class="ans-icon-copy" data-container="body" data-toggle="tooltip" :title="$t('Copy name')" ></em></span>
</div>
<div class="save-btn">
<div class="operation" style="vertical-align: middle;">
<div class="operation" style="vertical-align: middle;">
<a href="javascript:"
v-for="(item,$index) in toolOperList"
:class="_operationClass(item)"
:id="item.code"
:key="$index"
@click="_ckOperation(item,$event)">
<em :class="item.icon" data-toggle="tooltip" :title="item.description" ></em>
<x-button type="text" data-container="body" :icon="item.icon" v-tooltip.light="item.desc"></x-button>
</a>
</div>
<x-button type="text" icon="ans-icon-triangle-solid-right" @click="dagAutomaticLayout"></x-button>
<x-button
type="primary"
v-tooltip.light="$t('Format DAG')"
icon="ans-icon-triangle-solid-right"
size="xsmall"
data-container="body"
v-if="type === 'instance'"
style="vertical-align: middle;"
@click="dagAutomaticLayout">
</x-button>
<x-button
data-toggle="tooltip"
:title="$t('Refresh DAG status')"
v-tooltip.light="$t('Refresh DAG status')"
data-container="body"
style="vertical-align: middle;"
icon="ans-icon-refresh"
@ -189,10 +197,6 @@
Dag.backfill(true)
if (this.type === 'instance') {
this._getTaskState(false).then(res => {})
// Round robin acquisition status
this.setIntervalP = setInterval(() => {
this._getTaskState(true).then(res => {})
}, 90000)
}
} else {
Dag.create()

2
dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/list.vue

@ -64,7 +64,7 @@
</m-tooltips-JSON>
</td>
<td>
<span v-if="item.note" class="ellipsis" v-tooltip.large.top.start="{text: item.note, maxWidth: '500px'}">{{item.note}}</span>
<span v-if="item.note" class="ellipsis" v-tooltip.large.top.start.light="{text: item.note, maxWidth: '500px'}">{{item.note}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue

@ -69,7 +69,7 @@
<span v-else>-</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue

@ -141,7 +141,7 @@
</div>
<div class="clearfix list">
<div class="text">
{{$t('Date')}}
{{$t('Schedule date')}}
</div>
<div class="cont">
<x-datepicker

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue

@ -66,7 +66,7 @@
<span>{{item.instRunningCount}}</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue

@ -52,7 +52,7 @@
</td>
<td><span class="ellipsis">{{item.fileName}}</span></td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue

@ -67,7 +67,7 @@ v-ps<template>
<span>{{item.type}}</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/resource/_source/list.vue

@ -58,7 +58,7 @@
<span>{{_rtSize(item.size)}}</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/_source/list.vue

@ -59,7 +59,7 @@
</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/list.vue

@ -52,7 +52,7 @@
</td>
<td><span>{{item.groupType === 'EMAIL' ? `${$t('Email')}` : `${$t('SMS')}`}}</span></td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -361,6 +361,7 @@ export default {
'Recipient': 'Recipient',
'Cc': 'Cc',
'Whether it is a complement process?': 'Whether it is a complement process?',
'Schedule date': 'Schedule date',
'Mode of execution': 'Mode of execution',
'Serial execution': 'Serial execution',
'Parallel execution': 'Parallel execution',
@ -374,6 +375,7 @@ export default {
'All_1': 'All',
'Toolbar': 'Toolbar',
'View variables': 'View variables',
'Format DAG': 'Format DAG',
'Refresh DAG status': 'Refresh DAG status',
'Return_1': 'Return',
'Please enter format': 'Please enter format',

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -356,6 +356,7 @@ export default {
'Recipient': '收件人',
'Cc': '抄送人',
'Whether it is a complement process?': '是否补数',
'Schedule date': '调度日期',
'Mode of execution': '执行方式',
'Serial execution': '串行执行',
'Parallel execution': '并行执行',
@ -369,6 +370,7 @@ export default {
'All_1': '成功或失败都发',
'Toolbar': '工具栏',
'View variables': '查看变量',
'Format DAG': '格式化DAG',
'Refresh DAG status': '刷新DAG状态',
'Return_1': '返回上一节点',
'Please enter format': '请输入格式为',

3
pom.xml

@ -694,6 +694,7 @@
<include>**/api/service/ProcessDefinitionServiceTest.java</include>
<include>**/api/service/UdfFuncServiceTest.java</include>
<include>**/api/service/ResourcesServiceTest.java</include>
<include>**/api/service/ExecutorService2Test.java</include>
<include>**/api/service/BaseServiceTest.java</include>
<include>**/api/service/BaseDAGServiceTest.java</include>
<include>**/alert/utils/ExcelUtilsTest.java</include>
@ -703,6 +704,8 @@
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ScheduleUtilsTest.java</include>
<include>**/server/master/MasterExecThreadTest.java</include>
<include>**/dao/mapper/AccessTokenMapperTest.java</include>
<include>**/dao/mapper/AlertGroupMapperTest.java</include>
<include>**/dao/mapper/AlertMapperTest.java</include>

Loading…
Cancel
Save