oceanos 5 years ago
parent
commit
f01940a113
  1. 6
      docker/postgres/docker-entrypoint-initdb/init.sql
  2. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
  3. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  4. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  5. 21
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java
  6. 7
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
  7. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
  8. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
  9. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
  10. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
  11. 13
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
  12. 29
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java
  13. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  14. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  15. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  16. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  17. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  18. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  19. 98
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  20. 107
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
  21. 118
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
  22. 120
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue
  23. 4
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue
  24. 107
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
  25. 4
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
  26. 6
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue
  27. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  28. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  29. 6
      sql/dolphinscheduler-postgre.sql
  30. 4
      sql/dolphinscheduler_mysql.sql
  31. 85
      sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
  32. 90
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

6
docker/postgres/docker-entrypoint-initdb/init.sql

@ -234,7 +234,7 @@ CREATE TABLE t_ds_command (
dependence varchar(255) DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
worker_group varchar(64),
PRIMARY KEY (id)
) ;
@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command (
update_time timestamp DEFAULT NULL ,
dependence text ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
worker_group varchar(64),
message text ,
PRIMARY KEY (id)
);
@ -748,7 +748,7 @@ CREATE SEQUENCE t_ds_worker_server_id_sequence;
ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence');
-- Records of t_ds_useruser : admin , password : dolphinscheduler123
-- Records of t_ds_user?user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup,dolphinscheduler warning group

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java

@ -106,14 +106,12 @@ public class DataAnalysisService extends BaseService{
List<ExecuteStatusCount> taskInstanceStateCounts =
taskInstanceMapper.countTaskInstanceStateByUser(start, end, projectIds);
if (taskInstanceStateCounts != null && !taskInstanceStateCounts.isEmpty()) {
if (taskInstanceStateCounts != null) {
TaskCountDto taskCountResult = new TaskCountDto(taskInstanceStateCounts);
result.put(Constants.DATA_LIST, taskCountResult);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.TASK_INSTANCE_STATE_COUNT_ERROR);
}
return result;
return result;
}
private void putErrorRequestParamsMsg(Map<String, Object> result) {
@ -153,14 +151,12 @@ public class DataAnalysisService extends BaseService{
processInstanceMapper.countInstanceStateByUser(start, end,
projectIdArray);
if (processInstanceStateCounts != null && !processInstanceStateCounts.isEmpty()) {
if (processInstanceStateCounts != null) {
TaskCountDto taskCountResult = new TaskCountDto(processInstanceStateCounts);
result.put(Constants.DATA_LIST, taskCountResult);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.COUNT_PROCESS_INSTANCE_STATE_ERROR);
}
return result;
return result;
}
@ -234,7 +230,7 @@ public class DataAnalysisService extends BaseService{
// count error command state
List<CommandCount> errorCommandStateCounts =
errorCommandMapper.countCommandState(
start, end, projectIdArray);
start, end, projectIdArray);
//
Map<CommandType,Map<String,Integer>> dataMap = new HashMap<>();

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -225,20 +225,14 @@ public class ExecutorService extends BaseService{
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
processInstance.setCommandType(CommandType.STOP);
processInstance.addHistoryCmd(CommandType.STOP);
processService.updateProcessInstance(processInstance);
result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_STOP);
result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
}
break;
case PAUSE:
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
processInstance.setCommandType(CommandType.PAUSE);
processInstance.addHistoryCmd(CommandType.PAUSE);
processService.updateProcessInstance(processInstance);
result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_PAUSE);
result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
}
break;
default:
@ -308,22 +302,27 @@ public class ExecutorService extends BaseService{
}
/**
* update process instance state
* prepare to update process instance command type and status
*
* @param processInstanceId process instance id
* @param processInstance process instance
* @param commandType command type
* @param executionStatus execute status
* @return update result
*/
private Map<String, Object> updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
Map<String, Object> result = new HashMap<>(5);
int update = processService.updateProcessInstanceState(processInstanceId, executionStatus);
processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType);
processInstance.setState(executionStatus);
int update = processService.updateProcessInstance(processInstance);
// determine whether the process is normal
if (update > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -173,8 +173,10 @@ public class ProcessDefinitionService extends BaseDAGService {
for(TaskNode taskNode : tasks){
String taskParameter = taskNode.getParams();
AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter);
Set<Integer> tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet());
resourceIds.addAll(tempSet);
if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
Set<Integer> tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet());
resourceIds.addAll(tempSet);
}
}
StringBuilder sb = new StringBuilder();

21
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java

@ -423,6 +423,7 @@ public class UsersService extends BaseService {
* @param projectIds project id array
* @return grant result code
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantProject(User loginUser, int userId, String projectIds) {
Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false);
@ -472,6 +473,7 @@ public class UsersService extends BaseService {
* @param resourceIds resource id array
* @return grant result code
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) {
Map<String, Object> result = new HashMap<>(5);
//only admin can operate
@ -484,17 +486,20 @@ public class UsersService extends BaseService {
return result;
}
String[] resourceFullIdArr = resourceIds.split(",");
// need authorize resource id set
Set<Integer> needAuthorizeResIds = new HashSet();
for (String resourceFullId : resourceFullIdArr) {
String[] resourceIdArr = resourceFullId.split("-");
for (int i=0;i<=resourceIdArr.length-1;i++) {
int resourceIdValue = Integer.parseInt(resourceIdArr[i]);
needAuthorizeResIds.add(resourceIdValue);
if (StringUtils.isNotBlank(resourceIds)) {
String[] resourceFullIdArr = resourceIds.split(",");
// need authorize resource id set
for (String resourceFullId : resourceFullIdArr) {
String[] resourceIdArr = resourceFullId.split("-");
for (int i=0;i<=resourceIdArr.length-1;i++) {
int resourceIdValue = Integer.parseInt(resourceIdArr[i]);
needAuthorizeResIds.add(resourceIdValue);
}
}
}
//get the authorized resource id list by user id
List<Resource> oldAuthorizedRes = resourceMapper.queryAuthorizedResourceList(userId);
//if resource type is UDF,need check whether it is bound by UDF functon
@ -565,6 +570,7 @@ public class UsersService extends BaseService {
* @param udfIds udf id array
* @return grant result code
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantUDFFunction(User loginUser, int userId, String udfIds) {
Map<String, Object> result = new HashMap<>(5);
@ -611,6 +617,7 @@ public class UsersService extends BaseService {
* @param datasourceIds data source id array
* @return grant result code
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantDataSource(User loginUser, int userId, String datasourceIds) {
Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false);

7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java

@ -114,9 +114,6 @@ public class DataAnalysisServiceTest {
Map<String, Object> result = dataAnalysisService.countTaskStateByProject(user, 2, startDate, endDate);
Assert.assertTrue(result.isEmpty());
// task instance state count error
result = dataAnalysisService.countTaskStateByProject(user, 1, startDate, endDate);
Assert.assertEquals(Status.TASK_INSTANCE_STATE_COUNT_ERROR,result.get(Constants.STATUS));
//SUCCESS
Mockito.when(taskInstanceMapper.countTaskInstanceStateByUser(DateUtils.getScheduleDate(startDate),
@ -137,10 +134,6 @@ public class DataAnalysisServiceTest {
Map<String, Object> result = dataAnalysisService.countProcessInstanceStateByProject(user,2,startDate,endDate);
Assert.assertTrue(result.isEmpty());
//COUNT_PROCESS_INSTANCE_STATE_ERROR
result = dataAnalysisService.countProcessInstanceStateByProject(user,1,startDate,endDate);
Assert.assertEquals(Status.COUNT_PROCESS_INSTANCE_STATE_ERROR,result.get(Constants.STATUS));
//SUCCESS
Mockito.when(processInstanceMapper.countInstanceStateByUser(DateUtils.getScheduleDate(startDate),
DateUtils.getScheduleDate(endDate), new Integer[]{1})).thenReturn(getTaskInstanceStateCounts());

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

@ -223,4 +223,14 @@ public class ThreadUtils {
}
return id + " (" + name + ")";
}
/**
* sleep
* @param millis millis
*/
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ignore) {}
}
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java

@ -64,7 +64,7 @@ public class SpringConnectionFactory {
* @return druid dataSource
*/
@Bean(destroyMethod="")
public static DruidDataSource dataSource() {
public DruidDataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource();

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@ -111,7 +111,7 @@ public class Command {
/**
* worker group
*/
@TableField(exist = false)
@TableField("worker_group")
private String workerGroup;
public Command() {

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java

@ -101,9 +101,9 @@ public class ErrorCommand {
private String message;
/**
* worker group id
* worker group
*/
private int workerGroupId;
private String workerGroup;
public ErrorCommand(){}
@ -257,17 +257,25 @@ public class ErrorCommand {
this.updateTime = updateTime;
}
public int getWorkerGroupId() {
return workerGroupId;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "Command{" +
return "ErrorCommand{" +
"id=" + id +
", commandType=" + commandType +
", processDefinitionId=" + processDefinitionId +
@ -281,17 +289,8 @@ public class ErrorCommand {
", startTime=" + startTime +
", processInstancePriority=" + processInstancePriority +
", updateTime=" + updateTime +
", message=" + message +
", message='" + message + '\'' +
", workerGroup='" + workerGroup + '\'' +
'}';
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

13
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@ -76,7 +76,8 @@ public class CommandMapperTest {
//query
Command actualCommand = commandMapper.selectById(expectedCommand.getId());
assertEquals(expectedCommand, actualCommand);
assertNotNull(actualCommand);
assertEquals(expectedCommand.getProcessDefinitionId(), actualCommand.getProcessDefinitionId());
}
/**
@ -94,7 +95,8 @@ public class CommandMapperTest {
Command actualCommand = commandMapper.selectById(expectedCommand.getId());
assertEquals(expectedCommand,actualCommand);
assertNotNull(actualCommand);
assertEquals(expectedCommand.getUpdateTime(),actualCommand.getUpdateTime());
}
@ -127,13 +129,6 @@ public class CommandMapperTest {
List<Command> actualCommands = commandMapper.selectList(null);
assertThat(actualCommands.size(), greaterThanOrEqualTo(count));
for (Command actualCommand : actualCommands){
Command expectedCommand = commandMap.get(actualCommand.getId());
if (expectedCommand != null){
assertEquals(expectedCommand,actualCommand);
}
}
}
/**

29
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java

@ -16,11 +16,11 @@
*/
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.*;
import org.junit.Assert;
import org.junit.Test;
@ -179,6 +179,23 @@ public class UserMapperTest {
return tenant;
}
/**
* insert one Tenant
* @return Tenant
*/
private Tenant insertOneTenant(Queue queue){
Tenant tenant = new Tenant();
tenant.setTenantCode("dolphin");
tenant.setTenantName("dolphin test");
tenant.setDescription("dolphin user use");
tenant.setQueueId(queue.getId());
tenant.setQueue(queue.getQueue());
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
tenantMapper.insert(tenant);
return tenant;
}
/**
* insert one Queue
* @return Queue
@ -291,11 +308,13 @@ public class UserMapperTest {
*/
@Test
public void testQueryDetailsById() {
//insertOne
User user = insertOne();
//insertOneQueue and insertOneTenant
Queue queue = insertOneQueue();
Tenant tenant = insertOneTenant(queue);
User user = insertOne(queue,tenant);
//queryDetailsById
User queryUser = userMapper.queryDetailsById(user.getId());
Assert.assertEquals(queryUser.getUserName(), queryUser.getUserName());
Assert.assertEquals(user.getUserName(), queryUser.getUserName());
}
/**

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@ -53,6 +54,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* TaskUpdateQueue consumer
*/
@ -68,7 +71,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* taskUpdateQueue
*/
@Autowired
private TaskPriorityQueue taskUpdateQueue;
private TaskPriorityQueue taskPriorityQueue;
/**
* processService
@ -93,7 +96,7 @@ public class TaskPriorityQueueConsumer extends Thread{
while (Stopper.isRunning()){
try {
// if not task , blocking here
String taskPriorityInfo = taskUpdateQueue.take();
String taskPriorityInfo = taskPriorityQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
@ -114,13 +117,20 @@ public class TaskPriorityQueueConsumer extends Thread{
private Boolean dispatch(int taskInstanceId){
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
try {
return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("execute exception", e);
return false;
}
Boolean result = false;
while (Stopper.isRunning()){
try {
result = dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("dispatch error",e);
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
if (result){
break;
}
}
return result;
}
/**

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -19,7 +19,10 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -31,9 +34,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* task ack processor
*/
@ -51,9 +57,16 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
/**
* processService
*/
private ProcessService processService;
public TaskAckProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**
@ -71,8 +84,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()),
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
@ -81,6 +96,15 @@ public class TaskAckProcessor implements NettyRequestProcessor {
taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
if (taskInstance != null && ackStatus.typeIsRunning()){
break;
}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
}
}

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -19,7 +19,10 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@ -30,9 +33,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* task response processor
*/
@ -50,9 +56,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
/**
* processService
*/
private ProcessService processService;
public TaskResponseProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**
@ -71,6 +83,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus());
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
@ -79,6 +93,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getTaskInstanceId());
taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null && responseStatus.typeIsFinished()){
break;
}
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -914,7 +914,7 @@ public class MasterExecThread implements Runnable {
processInstance.getId(), processInstance.getName(),
processInstance.getState(), state,
processInstance.getCommandType());
processInstance.setState(state);
ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
instance.setState(state);
instance.setProcessDefinition(processInstance.getProcessDefinition());

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -21,6 +21,8 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
@ -35,6 +37,8 @@ import org.springframework.stereotype.Service;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
* taks callback service
*/
@ -93,8 +97,13 @@ public class TaskCallbackService {
}
logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost());
Set<String> masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
if(CollectionUtils.isEmpty(masterNodes)){
throw new IllegalStateException("no available master node exception");
while (Stopper.isRunning()) {
if (CollectionUtils.isEmpty(masterNodes)) {
logger.error("no available master node");
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}else {
break;
}
}
for(String masterNode : masterNodes){
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -214,11 +214,8 @@ public class SqlTask extends AbstractTask {
try {
// if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf();
// create connection
connection = createConnection();
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection,createFuncs);
@ -226,13 +223,12 @@ public class SqlTask extends AbstractTask {
// pre sql
preSql(connection,preStatementsBinds);
stmt = prepareStatementAndBind(connection, mainSqlBinds);
resultSet = stmt.executeQuery();
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
resultSet = stmt.executeQuery();
resultProcess(resultSet);
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {

98
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -17,21 +17,26 @@
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Test;
@ -47,9 +52,10 @@ import java.util.Date;
* test task call back service
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class,
@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class})
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class,
TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class})
public class TaskCallbackServiceTest {
@Autowired
@ -58,12 +64,22 @@ public class TaskCallbackServiceTest {
@Autowired
private MasterRegistry masterRegistry;
@Autowired
private TaskAckProcessor taskAckProcessor;
@Autowired
private TaskResponseProcessor taskResponseProcessor;
/**
* send ack test
* @throws Exception
*/
@Test
public void testSendAck(){
public void testSendAck() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
@ -75,22 +91,64 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date());
taskCallbackService.sendAck(1, ackCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close();
nettyRemotingClient.close();
}
/**
* send result test
* @throws Exception
*/
@Test
public void testSendResult() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
responseCommand.setTaskInstanceId(1);
responseCommand.setEndTime(new Date());
taskCallbackService.sendResult(1, responseCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close();
nettyRemotingClient.close();
}
@Test(expected = IllegalArgumentException.class)
public void testSendAckWithIllegalArgumentException(){
TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class);
taskCallbackService.sendAck(1, ackCommand.convert2Command());
Stopper.stop();
}
@Test(expected = IllegalStateException.class)
public void testSendAckWithIllegalStateException1(){
masterRegistry.registry();
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
@ -103,7 +161,21 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date());
nettyRemotingServer.close();
taskCallbackService.sendAck(1, ackCommand.convert2Command());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Stopper.stop();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test(expected = IllegalStateException.class)
@ -112,7 +184,7 @@ public class TaskCallbackServiceTest {
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class));
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
@ -125,6 +197,20 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date());
nettyRemotingServer.close();
taskCallbackService.sendAck(1, ackCommand.convert2Command());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Stopper.stop();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

107
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@ -211,7 +211,9 @@
return {
label: node.name
}
}
},
allNoResources: [],
noRes: [],
}
},
props: {
@ -300,6 +302,12 @@
return false
}
// noRes
if (this.noRes.length>0) {
this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`)
return false
}
// localParams Subcomponent verification
if (!this.$refs.refLocalParams._verifProp()) {
return false
@ -339,6 +347,67 @@
}
delete item.children
},
searchTree(element, id) {
// id
if (element.id == id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id);
}
return result;
}
return null;
},
dataProcess(backResource) {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return item.id
})
Array.prototype.diff = function(a) {
return this.filter(function(i) {return a.indexOf(i) < 0;});
};
let diffSet = this.resourceList.diff(resourceIdArr);
let optionsCmp = []
if(diffSet.length>0) {
diffSet.forEach(item=>{
backResource.forEach(item1=>{
if(item==item1.id || item==item1.res) {
optionsCmp.push(item1)
}
})
})
}
let noResources = [{
id: -1,
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {
this.allNoResources = optionsCmp
optionsCmp = optionsCmp.map(item=>{
return {id: item.id,name: item.name,fullName: item.res}
})
optionsCmp.forEach(item=>{
item.isNew = true
})
noResources[0].children = optionsCmp
this.mainJarList = this.mainJarList.concat(noResources)
}
}
},
},
watch: {
// Listening type
@ -354,15 +423,37 @@
},
computed: {
cacheParams () {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return {id: item.id,name: item.name,res: item.fullName}
})
}
let result = []
resourceIdArr.forEach(item=>{
this.allNoResources.forEach(item1=>{
if(item.id==item1.id) {
// resultBool = true
result.push(item1)
}
})
})
this.noRes = result
return {
mainClass: this.mainClass,
mainJar: {
id: this.mainJar
},
deployMode: this.deployMode,
resourceList: _.map(this.resourceList, v => {
return {id: v}
}),
resourceList: resourceIdArr,
localParams: this.localParams,
slot: this.slot,
taskManager: this.taskManager,
@ -404,20 +495,24 @@
this.programType = o.params.programType || 'SCALA'
// backfill resourceList
let backResource = o.params.resourceList || []
let resourceList = o.params.resourceList || []
if (resourceList.length) {
_.map(resourceList, v => {
if(v.res) {
if(!v.id) {
this.store.dispatch('dag/getResourceId',{
type: 'FILE',
fullName: '/'+v.res
}).then(res => {
this.resourceList.push(res.id)
this.dataProcess(backResource)
}).catch(e => {
this.$message.error(e.msg || '')
this.resourceList.push(v.res)
this.dataProcess(backResource)
})
} else {
this.resourceList.push(v.id)
this.dataProcess(backResource)
}
})
this.cacheResourceList = resourceList

118
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue

@ -44,7 +44,7 @@
<m-list-box>
<div slot="text">{{$t('Main jar package')}}</div>
<div slot="content">
<treeselect v-model="mainJar" :options="mainJarLists" :disable-branch-nodes="true" :normalizer="normalizer" :placeholder="$t('Please enter main jar package')">
<treeselect v-model="mainJar" :options="mainJarLists" :disable-branch-nodes="true" :normalizer="normalizer" :value-consists-of="valueConsistsOf" :placeholder="$t('Please enter main jar package')">
<div slot="value-label" slot-scope="{ node }">{{ node.raw.fullName }}</div>
</treeselect>
</div>
@ -109,6 +109,7 @@
name: 'mr',
data () {
return {
valueConsistsOf: 'LEAF_PRIORITY',
// Main function class
mainClass: '',
// Master jar package
@ -134,7 +135,9 @@
return {
label: node.name
}
}
},
allNoResources: [],
noRes: []
}
},
props: {
@ -176,9 +179,76 @@
diGuiTree(item) { // Recursive convenience tree structure
item.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0?        
delete item.children : this.diGuiTree(item.children);
this.operationTree(item) : this.diGuiTree(item.children);
})
},
operationTree(item) {
if(item.dirctory) {
item.isDisabled =true
}
delete item.children
},
searchTree(element, id) {
// id
if (element.id == id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id);
}
return result;
}
return null;
},
dataProcess(backResource) {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return item.id
})
Array.prototype.diff = function(a) {
return this.filter(function(i) {return a.indexOf(i) < 0;});
};
let diffSet = this.resourceList.diff(resourceIdArr);
let optionsCmp = []
if(diffSet.length>0) {
diffSet.forEach(item=>{
backResource.forEach(item1=>{
if(item==item1.id || item==item1.res) {
optionsCmp.push(item1)
}
})
})
}
let noResources = [{
id: -1,
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {
this.allNoResources = optionsCmp
optionsCmp = optionsCmp.map(item=>{
return {id: item.id,name: item.name,fullName: item.res}
})
optionsCmp.forEach(item=>{
item.isNew = true
})
noResources[0].children = optionsCmp
this.mainJarList = this.mainJarList.concat(noResources)
}
}
},
/**
* verification
*/
@ -193,6 +263,12 @@
return false
}
// noRes
if (this.noRes.length>0) {
this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`)
return false
}
// localParams Subcomponent verification
if (!this.$refs.refLocalParams._verifProp()) {
return false
@ -231,14 +307,36 @@
},
computed: {
cacheParams () {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return {id: item.id,name: item.name,res: item.fullName}
})
}
let result = []
resourceIdArr.forEach(item=>{
this.allNoResources.forEach(item1=>{
if(item.id==item1.id) {
// resultBool = true
result.push(item1)
}
})
})
this.noRes = result
return {
mainClass: this.mainClass,
mainJar: {
id: this.mainJar
},
resourceList: _.map(this.resourceList, v => {
return {id: v}
}),
resourceList: resourceIdArr,
localParams: this.localParams,
mainArgs: this.mainArgs,
others: this.others,
@ -273,23 +371,27 @@
let resourceList = o.params.resourceList || []
if (resourceList.length) {
_.map(resourceList, v => {
if(v.res) {
if(!v.id) {
this.store.dispatch('dag/getResourceId',{
type: 'FILE',
fullName: '/'+v.res
}).then(res => {
this.resourceList.push(res.id)
this.dataProcess(backResource)
}).catch(e => {
this.$message.error(e.msg || '')
this.resourceList.push(v.res)
this.dataProcess(backResource)
})
} else {
this.resourceList.push(v.id)
this.dataProcess(backResource)
}
})
this.cacheResourceList = resourceList
}
// backfill localParams
let backResource = o.params.resourceList || []
let localParams = o.params.localParams || []
if (localParams.length) {
this.localParams = localParams

120
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue

@ -80,6 +80,13 @@
// Cache ResourceList
cacheResourceList: [],
resourceOptions: [],
normalizer(node) {
return {
label: node.name
}
},
allNoResources: [],
noRes: []
}
},
mixins: [disabledState],
@ -96,9 +103,9 @@
/**
* return resourceList
*/
_onResourcesData (a) {
this.resourceList = a
},
// _onResourcesData (a) {
// this.resourceList = a
// },
/**
* cache resourceList
*/
@ -120,6 +127,12 @@
return false
}
// noRes
if (this.noRes.length>0) {
this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`)
return false
}
// storage
this.$emit('on-params', {
resourceList: _.map(this.resourceList, v => {
@ -166,6 +179,67 @@
item.isDisabled =true
}
delete item.children
},
searchTree(element, id) {
// id
if (element.id == id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id);
}
return result;
}
return null;
},
dataProcess(backResource) {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.resourceOptions.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return item.id
})
Array.prototype.diff = function(a) {
return this.filter(function(i) {return a.indexOf(i) < 0;});
};
let diffSet = this.resourceList.diff(resourceIdArr);
let optionsCmp = []
if(diffSet.length>0) {
diffSet.forEach(item=>{
backResource.forEach(item1=>{
if(item==item1.id || item==item1.res) {
optionsCmp.push(item1)
}
})
})
}
let noResources = [{
id: -1,
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {
this.allNoResources = optionsCmp
optionsCmp = optionsCmp.map(item=>{
return {id: item.id,name: item.name,fullName: item.res}
})
optionsCmp.forEach(item=>{
item.isNew = true
})
noResources[0].children = optionsCmp
this.resourceOptions = this.resourceOptions.concat(noResources)
}
}
}
},
watch: {
@ -176,10 +250,32 @@
},
computed: {
cacheParams () {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.resourceOptions.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return {id: item.id,name: item.name,res: item.fullName}
})
}
let result = []
resourceIdArr.forEach(item=>{
this.allNoResources.forEach(item1=>{
if(item.id==item1.id) {
// resultBool = true
result.push(item1)
}
})
})
this.noRes = result
return {
resourceList: _.map(this.resourceList, v => {
return {id: v}
}),
resourceList: resourceIdArr,
localParams: this.localParams
}
}
@ -187,7 +283,7 @@
created () {
let item = this.store.state.dag.resourcesListS
this.diGuiTree(item)
this.options = item
this.resourceOptions = item
let o = this.backfillItem
// Non-null objects represent backfill
@ -195,20 +291,24 @@
this.rawScript = o.params.rawScript || ''
// backfill resourceList
let backResource = o.params.resourceList || []
let resourceList = o.params.resourceList || []
if (resourceList.length) {
_.map(resourceList, v => {
if(v.res) {
if(!v.id) {
this.store.dispatch('dag/getResourceId',{
type: 'FILE',
fullName: '/'+v.res
}).then(res => {
this.resourceList.push(res.id)
this.dataProcess(backResource)
}).catch(e => {
this.$message.error(e.msg || '')
this.resourceList.push(v.res)
this.dataProcess(backResource)
})
} else {
this.resourceList.push(v.id)
this.dataProcess(backResource)
}
})
this.cacheResourceList = resourceList
@ -230,6 +330,6 @@
editor.toTextArea() // Uninstall
editor.off($('.code-python-mirror'), 'keypress', this.keypress)
},
components: { mLocalParams, mListBox, mResources }
components: { mLocalParams, mListBox, mResources,Treeselect }
}
</script>

4
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue

@ -266,8 +266,8 @@
}
let noResources = [{
id: -1,
name: $t('No resources'),
fullName: '/'+$t('No resources'),
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {

107
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

@ -254,7 +254,9 @@
return {
label: node.name
}
}
},
allNoResources: [],
noRes: []
}
},
props: {
@ -305,6 +307,67 @@
}
delete item.children
},
searchTree(element, id) {
// id
if (element.id == id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id);
}
return result;
}
return null;
},
dataProcess(backResource) {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return item.id
})
Array.prototype.diff = function(a) {
return this.filter(function(i) {return a.indexOf(i) < 0;});
};
let diffSet = this.resourceList.diff(resourceIdArr);
let optionsCmp = []
if(diffSet.length>0) {
diffSet.forEach(item=>{
backResource.forEach(item1=>{
if(item==item1.id || item==item1.res) {
optionsCmp.push(item1)
}
})
})
}
let noResources = [{
id: -1,
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {
this.allNoResources = optionsCmp
optionsCmp = optionsCmp.map(item=>{
return {id: item.id,name: item.name,fullName: item.res}
})
optionsCmp.forEach(item=>{
item.isNew = true
})
noResources[0].children = optionsCmp
this.mainJarList = this.mainJarList.concat(noResources)
}
}
},
/**
* verification
*/
@ -324,6 +387,12 @@
return false
}
// noRes
if (this.noRes.length>0) {
this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`)
return false
}
if (!Number.isInteger(parseInt(this.numExecutors))) {
this.$message.warning(`${i18n.$t('The number of Executors should be a positive integer')}`)
return false
@ -400,15 +469,37 @@
},
computed: {
cacheParams () {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return {id: item.id,name: item.name,res: item.fullName}
})
}
let result = []
resourceIdArr.forEach(item=>{
this.allNoResources.forEach(item1=>{
if(item.id==item1.id) {
// resultBool = true
result.push(item1)
}
})
})
this.noRes = result
return {
mainClass: this.mainClass,
mainJar: {
id: this.mainJar
},
deployMode: this.deployMode,
resourceList: _.map(this.resourceList, v => {
return {id: v}
}),
resourceList: resourceIdArr,
localParams: this.localParams,
driverCores: this.driverCores,
driverMemory: this.driverMemory,
@ -453,20 +544,24 @@
this.sparkVersion = o.params.sparkVersion || 'SPARK2'
// backfill resourceList
let backResource = o.params.resourceList || []
let resourceList = o.params.resourceList || []
if (resourceList.length) {
_.map(resourceList, v => {
if(v.res) {
if(!v.id) {
this.store.dispatch('dag/getResourceId',{
type: 'FILE',
fullName: '/'+v.res
}).then(res => {
this.resourceList.push(res.id)
this.dataProcess(backResource)
}).catch(e => {
this.$message.error(e.msg || '')
this.resourceList.push(v.res)
this.dataProcess(backResource)
})
} else {
this.resourceList.push(v.id)
this.dataProcess(backResource)
}
})
this.cacheResourceList = resourceList

4
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue

@ -49,10 +49,10 @@
<th scope="col" width="60">
<span>{{$t('Run Times')}}</span>
</th>
<th scope="col" width="100">
<th scope="col" width="125">
<span>{{$t('host')}}</span>
</th>
<th scope="col" width="60">
<th scope="col" width="55">
<span>{{$t('fault-tolerant sign')}}</span>
</th>
<th scope="col" width="30">

6
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue

@ -43,13 +43,13 @@
<th scope="col" width="140">
<span>{{$t('Start Time')}}</span>
</th>
<th scope="col" width="140">
<th scope="col" width="125">
<span>{{$t('End Time')}}</span>
</th>
<th scope="col" width="110">
<th scope="col" width="130">
<span>{{$t('host')}}</span>
</th>
<th scope="col" width="74">
<th scope="col" width="70">
<span>{{$t('Duration')}}(s)</span>
</th>
<th scope="col" width="84">

2
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -588,6 +588,6 @@ export default {
'Branch flow': 'Branch flow',
'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow',
'Successful branch flow and failed branch flow are required': 'Successful branch flow and failed branch flow are required',
'No resources': 'No resources',
'Unauthorized or deleted resources': 'Unauthorized or deleted resources',
'Please delete all non-existent resources': 'Please delete all non-existent resources',
}

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -588,6 +588,6 @@ export default {
'Branch flow': '分支流转',
'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点',
'Successful branch flow and failed branch flow are required': '成功分支流转和失败分支流转必填',
'No resources': '未授权或已删除资源',
'Unauthorized or deleted resources': '未授权或已删除资源',
'Please delete all non-existent resources': '请删除所有未授权或已删除资源',
}

6
sql/dolphinscheduler-postgre.sql

@ -234,7 +234,7 @@ CREATE TABLE t_ds_command (
dependence varchar(255) DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
worker_group varchar(64),
PRIMARY KEY (id)
) ;
@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command (
update_time timestamp DEFAULT NULL ,
dependence text ,
process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
worker_group varchar(64),
message text ,
PRIMARY KEY (id)
);
@ -748,7 +748,7 @@ CREATE SEQUENCE t_ds_worker_server_id_sequence;
ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence');
-- Records of t_ds_useruser : admin , password : dolphinscheduler123
-- Records of t_ds_user?user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup,dolphinscheduler warning group

4
sql/dolphinscheduler_mysql.sql

@ -333,7 +333,7 @@ CREATE TABLE `t_ds_command` (
`dependence` varchar(255) DEFAULT NULL COMMENT 'dependence',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id',
`worker_group` varchar(64) COMMENT 'worker group',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@ -380,7 +380,7 @@ CREATE TABLE `t_ds_error_command` (
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`dependence` text COMMENT 'dependence',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority, 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id',
`worker_group` varchar(64) COMMENT 'worker group',
`message` text COMMENT 'message',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

85
sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql

@ -167,7 +167,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_process_instance_A_worker_group()
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_process_instance ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group';
ALTER TABLE t_ds_process_instance ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF;
END;
@ -207,7 +207,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_task_instance_A_worker_group()
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_task_instance ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group';
ALTER TABLE t_ds_task_instance ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF;
END;
@ -247,7 +247,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_schedules_A_worker_group()
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_schedules ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group';
ALTER TABLE t_ds_schedules ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF;
END;
@ -277,4 +277,83 @@ delimiter ;
CALL dc_dolphin_T_t_ds_schedules_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_schedules_D_worker_group_id;
-- ac_dolphin_T_t_ds_command_A_worker_group
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_command_A_worker_group;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_command ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_command_A_worker_group;
DROP PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group;
-- dc_dolphin_T_t_ds_command_D_worker_group_id
drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id;
delimiter d//
CREATE PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_command DROP COLUMN worker_group_id;
END IF;
END;
d//
delimiter ;
CALL dc_dolphin_T_t_ds_command_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id;
-- ac_dolphin_T_t_ds_error_command_A_worker_group
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_error_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_error_command ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_error_command_A_worker_group;
DROP PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group;
-- dc_dolphin_T_t_ds_error_command_D_worker_group_id
drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id;
delimiter d//
CREATE PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_error_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id;
END IF;
END;
d//
delimiter ;
CALL dc_dolphin_T_t_ds_error_command_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id;

90
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -164,7 +164,7 @@ BEGIN
AND TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(255) DEFAULT null;
ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
@ -207,7 +207,7 @@ BEGIN
AND TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(255) DEFAULT null;
ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
@ -249,7 +249,7 @@ BEGIN
AND TABLE_NAME='t_ds_schedules'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(255) DEFAULT null;
ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
@ -279,4 +279,88 @@ delimiter ;
select dc_dolphin_T_t_ds_schedules_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_schedules_D_worker_group_id();
-- ac_dolphin_T_t_ds_command_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_command_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_command_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_command'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_command_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_command_A_worker_group();
-- dc_dolphin_T_t_ds_command_D_worker_group_id
delimiter ;
DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id();
delimiter d//
CREATE FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_command'
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_command DROP COLUMN worker_group_id;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select dc_dolphin_T_t_ds_command_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id();
-- ac_dolphin_T_t_ds_error_command_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_error_command'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_error_command_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group();
-- dc_dolphin_T_t_ds_error_command_D_worker_group_id
delimiter ;
DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id();
delimiter d//
CREATE FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_error_command'
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select dc_dolphin_T_t_ds_error_command_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id();

Loading…
Cancel
Save