Browse Source

Merge branch 'dev' into fixbug-#2439

pull/2/head
qiaozhanwei 5 years ago committed by GitHub
parent
commit
aec883ce74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      docker/postgres/docker-entrypoint-initdb/init.sql
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 21
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java
  4. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
  5. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
  6. 29
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java
  7. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  8. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  9. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  10. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  11. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  12. 98
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  13. 107
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
  14. 118
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
  15. 120
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue
  16. 4
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue
  17. 107
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
  18. 4
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
  19. 6
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue
  20. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  21. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  22. 85
      sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
  23. 90
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

6
docker/postgres/docker-entrypoint-initdb/init.sql

@ -234,7 +234,7 @@ CREATE TABLE t_ds_command (
dependence varchar(255) DEFAULT NULL , dependence varchar(255) DEFAULT NULL ,
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
process_instance_priority int DEFAULT NULL , process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' , worker_group varchar(64),
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;
@ -275,7 +275,7 @@ CREATE TABLE t_ds_error_command (
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
dependence text , dependence text ,
process_instance_priority int DEFAULT NULL , process_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' , worker_group varchar(64),
message text , message text ,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
@ -748,7 +748,7 @@ CREATE SEQUENCE t_ds_worker_server_id_sequence;
ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence'); ALTER TABLE t_ds_worker_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_server_id_sequence');
-- Records of t_ds_useruser : admin , password : dolphinscheduler123 -- Records of t_ds_user?user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22'); INSERT INTO t_ds_user(user_name,user_password,user_type,email,phone,tenant_id,create_time,update_time) VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup,dolphinscheduler warning group -- Records of t_ds_alertgroup,dolphinscheduler warning group

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -173,8 +173,10 @@ public class ProcessDefinitionService extends BaseDAGService {
for(TaskNode taskNode : tasks){ for(TaskNode taskNode : tasks){
String taskParameter = taskNode.getParams(); String taskParameter = taskNode.getParams();
AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter); AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter);
Set<Integer> tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet()); if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds.addAll(tempSet); Set<Integer> tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet());
resourceIds.addAll(tempSet);
}
} }
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

21
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java

@ -423,6 +423,7 @@ public class UsersService extends BaseService {
* @param projectIds project id array * @param projectIds project id array
* @return grant result code * @return grant result code
*/ */
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantProject(User loginUser, int userId, String projectIds) { public Map<String, Object> grantProject(User loginUser, int userId, String projectIds) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false); result.put(Constants.STATUS, false);
@ -472,6 +473,7 @@ public class UsersService extends BaseService {
* @param resourceIds resource id array * @param resourceIds resource id array
* @return grant result code * @return grant result code
*/ */
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) { public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
//only admin can operate //only admin can operate
@ -484,17 +486,20 @@ public class UsersService extends BaseService {
return result; return result;
} }
String[] resourceFullIdArr = resourceIds.split(",");
// need authorize resource id set
Set<Integer> needAuthorizeResIds = new HashSet(); Set<Integer> needAuthorizeResIds = new HashSet();
for (String resourceFullId : resourceFullIdArr) { if (StringUtils.isNotBlank(resourceIds)) {
String[] resourceIdArr = resourceFullId.split("-"); String[] resourceFullIdArr = resourceIds.split(",");
for (int i=0;i<=resourceIdArr.length-1;i++) { // need authorize resource id set
int resourceIdValue = Integer.parseInt(resourceIdArr[i]); for (String resourceFullId : resourceFullIdArr) {
needAuthorizeResIds.add(resourceIdValue); String[] resourceIdArr = resourceFullId.split("-");
for (int i=0;i<=resourceIdArr.length-1;i++) {
int resourceIdValue = Integer.parseInt(resourceIdArr[i]);
needAuthorizeResIds.add(resourceIdValue);
}
} }
} }
//get the authorized resource id list by user id //get the authorized resource id list by user id
List<Resource> oldAuthorizedRes = resourceMapper.queryAuthorizedResourceList(userId); List<Resource> oldAuthorizedRes = resourceMapper.queryAuthorizedResourceList(userId);
//if resource type is UDF,need check whether it is bound by UDF functon //if resource type is UDF,need check whether it is bound by UDF functon
@ -565,6 +570,7 @@ public class UsersService extends BaseService {
* @param udfIds udf id array * @param udfIds udf id array
* @return grant result code * @return grant result code
*/ */
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantUDFFunction(User loginUser, int userId, String udfIds) { public Map<String, Object> grantUDFFunction(User loginUser, int userId, String udfIds) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
@ -611,6 +617,7 @@ public class UsersService extends BaseService {
* @param datasourceIds data source id array * @param datasourceIds data source id array
* @return grant result code * @return grant result code
*/ */
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> grantDataSource(User loginUser, int userId, String datasourceIds) { public Map<String, Object> grantDataSource(User loginUser, int userId, String datasourceIds) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
result.put(Constants.STATUS, false); result.put(Constants.STATUS, false);

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java

@ -64,7 +64,7 @@ public class SpringConnectionFactory {
* @return druid dataSource * @return druid dataSource
*/ */
@Bean(destroyMethod="") @Bean(destroyMethod="")
public static DruidDataSource dataSource() { public DruidDataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource(); DruidDataSource druidDataSource = new DruidDataSource();

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java

@ -101,9 +101,9 @@ public class ErrorCommand {
private String message; private String message;
/** /**
* worker group id * worker group
*/ */
private int workerGroupId; private String workerGroup;
public ErrorCommand(){} public ErrorCommand(){}
@ -257,17 +257,25 @@ public class ErrorCommand {
this.updateTime = updateTime; this.updateTime = updateTime;
} }
public int getWorkerGroupId() { public String getWorkerGroup() {
return workerGroupId; return workerGroup;
} }
public void setWorkerGroupId(int workerGroupId) { public void setWorkerGroup(String workerGroup) {
this.workerGroupId = workerGroupId; this.workerGroup = workerGroup;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
} }
@Override @Override
public String toString() { public String toString() {
return "Command{" + return "ErrorCommand{" +
"id=" + id + "id=" + id +
", commandType=" + commandType + ", commandType=" + commandType +
", processDefinitionId=" + processDefinitionId + ", processDefinitionId=" + processDefinitionId +
@ -281,17 +289,8 @@ public class ErrorCommand {
", startTime=" + startTime + ", startTime=" + startTime +
", processInstancePriority=" + processInstancePriority + ", processInstancePriority=" + processInstancePriority +
", updateTime=" + updateTime + ", updateTime=" + updateTime +
", message=" + message + ", message='" + message + '\'' +
", workerGroup='" + workerGroup + '\'' +
'}'; '}';
} }
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
} }

29
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java

@ -16,11 +16,11 @@
*/ */
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -179,6 +179,23 @@ public class UserMapperTest {
return tenant; return tenant;
} }
/**
* insert one Tenant
* @return Tenant
*/
private Tenant insertOneTenant(Queue queue){
Tenant tenant = new Tenant();
tenant.setTenantCode("dolphin");
tenant.setTenantName("dolphin test");
tenant.setDescription("dolphin user use");
tenant.setQueueId(queue.getId());
tenant.setQueue(queue.getQueue());
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
tenantMapper.insert(tenant);
return tenant;
}
/** /**
* insert one Queue * insert one Queue
* @return Queue * @return Queue
@ -291,11 +308,13 @@ public class UserMapperTest {
*/ */
@Test @Test
public void testQueryDetailsById() { public void testQueryDetailsById() {
//insertOne //insertOneQueue and insertOneTenant
User user = insertOne(); Queue queue = insertOneQueue();
Tenant tenant = insertOneTenant(queue);
User user = insertOne(queue,tenant);
//queryDetailsById //queryDetailsById
User queryUser = userMapper.queryDetailsById(user.getId()); User queryUser = userMapper.queryDetailsById(user.getId());
Assert.assertEquals(queryUser.getUserName(), queryUser.getUserName()); Assert.assertEquals(user.getUserName(), queryUser.getUserName());
} }
/** /**

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -53,6 +53,8 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
*/ */
@ -68,7 +70,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* taskUpdateQueue * taskUpdateQueue
*/ */
@Autowired @Autowired
private TaskPriorityQueue taskUpdateQueue; private TaskPriorityQueue taskPriorityQueue;
/** /**
* processService * processService
@ -93,7 +95,7 @@ public class TaskPriorityQueueConsumer extends Thread{
while (Stopper.isRunning()){ while (Stopper.isRunning()){
try { try {
// if not task , blocking here // if not task , blocking here
String taskPriorityInfo = taskUpdateQueue.take(); String taskPriorityInfo = taskPriorityQueue.take();
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
@ -114,13 +116,22 @@ public class TaskPriorityQueueConsumer extends Thread{
private Boolean dispatch(int taskInstanceId){ private Boolean dispatch(int taskInstanceId){
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
try { Boolean result = false;
return dispatcher.dispatch(executionContext); while (Stopper.isRunning()){
} catch (ExecuteException e) { try {
logger.error("execute exception", e); result = dispatcher.dispatch(executionContext);
return false; } catch (ExecuteException e) {
} logger.error("dispatch error",e);
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e1) {}
}
if (result){
break;
}
}
return result;
} }
/** /**

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -31,9 +33,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* task ack processor * task ack processor
*/ */
@ -51,9 +56,16 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/ */
private final TaskInstanceCacheManager taskInstanceCacheManager; private final TaskInstanceCacheManager taskInstanceCacheManager;
/**
* processService
*/
private ProcessService processService;
public TaskAckProcessor(){ public TaskAckProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
/** /**
@ -71,8 +83,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
String workerAddress = ChannelUtils.toAddress(channel).getAddress(); String workerAddress = ChannelUtils.toAddress(channel).getAddress();
ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());
// TaskResponseEvent // TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()), TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,
taskAckCommand.getStartTime(), taskAckCommand.getStartTime(),
workerAddress, workerAddress,
taskAckCommand.getExecutePath(), taskAckCommand.getExecutePath(),
@ -81,6 +95,18 @@ public class TaskAckProcessor implements NettyRequestProcessor {
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
if (taskInstance != null && ackStatus.typeIsRunning()){
break;
}
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
}
} }
} }

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@ -30,9 +32,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* task response processor * task response processor
*/ */
@ -50,9 +55,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/ */
private final TaskInstanceCacheManager taskInstanceCacheManager; private final TaskInstanceCacheManager taskInstanceCacheManager;
/**
* processService
*/
private ProcessService processService;
public TaskResponseProcessor(){ public TaskResponseProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
/** /**
@ -71,6 +82,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand); taskInstanceCacheManager.cacheTaskInstance(responseCommand);
ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus());
// TaskResponseEvent // TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(), responseCommand.getEndTime(),
@ -79,6 +92,18 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getTaskInstanceId()); responseCommand.getTaskInstanceId());
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null && responseStatus.typeIsFinished()){
break;
}
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
}
} }

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -18,9 +18,11 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -93,8 +95,17 @@ public class TaskCallbackService {
} }
logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost()); logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost());
Set<String> masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); Set<String> masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
if(CollectionUtils.isEmpty(masterNodes)){ while (Stopper.isRunning()) {
throw new IllegalStateException("no available master node exception"); if (CollectionUtils.isEmpty(masterNodes)) {
logger.error("no available master node");
try {
Thread.sleep(1000);
}catch (Exception e){
}
}else {
break;
}
} }
for(String masterNode : masterNodes){ for(String masterNode : masterNodes){
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -214,11 +214,8 @@ public class SqlTask extends AbstractTask {
try { try {
// if upload resource is HDFS and kerberos startup // if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf(); CommonUtils.loadKerberosConf();
// create connection // create connection
connection = createConnection(); connection = createConnection();
// create temp function // create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) { if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection,createFuncs); createTempFunction(connection,createFuncs);
@ -226,13 +223,12 @@ public class SqlTask extends AbstractTask {
// pre sql // pre sql
preSql(connection,preStatementsBinds); preSql(connection,preStatementsBinds);
stmt = prepareStatementAndBind(connection, mainSqlBinds); stmt = prepareStatementAndBind(connection, mainSqlBinds);
resultSet = stmt.executeQuery();
// decide whether to executeQuery or executeUpdate based on sqlType // decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send // query statements need to be convert to JsonArray and inserted into Alert to send
resultSet = stmt.executeQuery();
resultProcess(resultSet); resultProcess(resultSet);
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {

98
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -17,21 +17,26 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Test; import org.junit.Test;
@ -47,9 +52,10 @@ import java.util.Date;
* test task call back service * test task call back service
*/ */
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class, @ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class}) ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class,
TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class})
public class TaskCallbackServiceTest { public class TaskCallbackServiceTest {
@Autowired @Autowired
@ -58,12 +64,22 @@ public class TaskCallbackServiceTest {
@Autowired @Autowired
private MasterRegistry masterRegistry; private MasterRegistry masterRegistry;
@Autowired
private TaskAckProcessor taskAckProcessor;
@Autowired
private TaskResponseProcessor taskResponseProcessor;
/**
* send ack test
* @throws Exception
*/
@Test @Test
public void testSendAck(){ public void testSendAck() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig(); final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000); serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start(); nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig(); final NettyClientConfig clientConfig = new NettyClientConfig();
@ -75,22 +91,64 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close(); nettyRemotingServer.close();
nettyRemotingClient.close(); nettyRemotingClient.close();
} }
/**
* send result test
* @throws Exception
*/
@Test
public void testSendResult() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
responseCommand.setTaskInstanceId(1);
responseCommand.setEndTime(new Date());
taskCallbackService.sendResult(1, responseCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close();
nettyRemotingClient.close();
}
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testSendAckWithIllegalArgumentException(){ public void testSendAckWithIllegalArgumentException(){
TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class);
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
Stopper.stop();
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testSendAckWithIllegalStateException1(){ public void testSendAckWithIllegalStateException1(){
masterRegistry.registry();
final NettyServerConfig serverConfig = new NettyServerConfig(); final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000); serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start(); nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig(); final NettyClientConfig clientConfig = new NettyClientConfig();
@ -103,7 +161,21 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
nettyRemotingServer.close(); nettyRemotingServer.close();
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Stopper.stop();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
@ -112,7 +184,7 @@ public class TaskCallbackServiceTest {
final NettyServerConfig serverConfig = new NettyServerConfig(); final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000); serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
nettyRemotingServer.start(); nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig(); final NettyClientConfig clientConfig = new NettyClientConfig();
@ -125,6 +197,20 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
nettyRemotingServer.close(); nettyRemotingServer.close();
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Stopper.stop();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
} }

107
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@ -211,7 +211,9 @@
return { return {
label: node.name label: node.name
} }
} },
allNoResources: [],
noRes: [],
} }
}, },
props: { props: {
@ -300,6 +302,12 @@
return false return false
} }
// noRes
if (this.noRes.length>0) {
this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`)
return false
}
// localParams Subcomponent verification // localParams Subcomponent verification
if (!this.$refs.refLocalParams._verifProp()) { if (!this.$refs.refLocalParams._verifProp()) {
return false return false
@ -339,6 +347,67 @@
} }
delete item.children delete item.children
}, },
searchTree(element, id) {
// id
if (element.id == id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id);
}
return result;
}
return null;
},
dataProcess(backResource) {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return item.id
})
Array.prototype.diff = function(a) {
return this.filter(function(i) {return a.indexOf(i) < 0;});
};
let diffSet = this.resourceList.diff(resourceIdArr);
let optionsCmp = []
if(diffSet.length>0) {
diffSet.forEach(item=>{
backResource.forEach(item1=>{
if(item==item1.id || item==item1.res) {
optionsCmp.push(item1)
}
})
})
}
let noResources = [{
id: -1,
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {
this.allNoResources = optionsCmp
optionsCmp = optionsCmp.map(item=>{
return {id: item.id,name: item.name,fullName: item.res}
})
optionsCmp.forEach(item=>{
item.isNew = true
})
noResources[0].children = optionsCmp
this.mainJarList = this.mainJarList.concat(noResources)
}
}
},
}, },
watch: { watch: {
// Listening type // Listening type
@ -354,15 +423,37 @@
}, },
computed: { computed: {
cacheParams () { cacheParams () {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return {id: item.id,name: item.name,res: item.fullName}
})
}
let result = []
resourceIdArr.forEach(item=>{
this.allNoResources.forEach(item1=>{
if(item.id==item1.id) {
// resultBool = true
result.push(item1)
}
})
})
this.noRes = result
return { return {
mainClass: this.mainClass, mainClass: this.mainClass,
mainJar: { mainJar: {
id: this.mainJar id: this.mainJar
}, },
deployMode: this.deployMode, deployMode: this.deployMode,
resourceList: _.map(this.resourceList, v => { resourceList: resourceIdArr,
return {id: v}
}),
localParams: this.localParams, localParams: this.localParams,
slot: this.slot, slot: this.slot,
taskManager: this.taskManager, taskManager: this.taskManager,
@ -404,20 +495,24 @@
this.programType = o.params.programType || 'SCALA' this.programType = o.params.programType || 'SCALA'
// backfill resourceList // backfill resourceList
let backResource = o.params.resourceList || []
let resourceList = o.params.resourceList || [] let resourceList = o.params.resourceList || []
if (resourceList.length) { if (resourceList.length) {
_.map(resourceList, v => { _.map(resourceList, v => {
if(v.res) { if(!v.id) {
this.store.dispatch('dag/getResourceId',{ this.store.dispatch('dag/getResourceId',{
type: 'FILE', type: 'FILE',
fullName: '/'+v.res fullName: '/'+v.res
}).then(res => { }).then(res => {
this.resourceList.push(res.id) this.resourceList.push(res.id)
this.dataProcess(backResource)
}).catch(e => { }).catch(e => {
this.$message.error(e.msg || '') this.resourceList.push(v.res)
this.dataProcess(backResource)
}) })
} else { } else {
this.resourceList.push(v.id) this.resourceList.push(v.id)
this.dataProcess(backResource)
} }
}) })
this.cacheResourceList = resourceList this.cacheResourceList = resourceList

118
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue

@ -44,7 +44,7 @@
<m-list-box> <m-list-box>
<div slot="text">{{$t('Main jar package')}}</div> <div slot="text">{{$t('Main jar package')}}</div>
<div slot="content"> <div slot="content">
<treeselect v-model="mainJar" :options="mainJarLists" :disable-branch-nodes="true" :normalizer="normalizer" :placeholder="$t('Please enter main jar package')"> <treeselect v-model="mainJar" :options="mainJarLists" :disable-branch-nodes="true" :normalizer="normalizer" :value-consists-of="valueConsistsOf" :placeholder="$t('Please enter main jar package')">
<div slot="value-label" slot-scope="{ node }">{{ node.raw.fullName }}</div> <div slot="value-label" slot-scope="{ node }">{{ node.raw.fullName }}</div>
</treeselect> </treeselect>
</div> </div>
@ -109,6 +109,7 @@
name: 'mr', name: 'mr',
data () { data () {
return { return {
valueConsistsOf: 'LEAF_PRIORITY',
// Main function class // Main function class
mainClass: '', mainClass: '',
// Master jar package // Master jar package
@ -134,7 +135,9 @@
return { return {
label: node.name label: node.name
} }
} },
allNoResources: [],
noRes: []
} }
}, },
props: { props: {
@ -176,9 +179,76 @@
diGuiTree(item) { // Recursive convenience tree structure diGuiTree(item) { // Recursive convenience tree structure
item.forEach(item => { item.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0?         item.children === '' || item.children === undefined || item.children === null || item.children.length === 0?        
delete item.children : this.diGuiTree(item.children); this.operationTree(item) : this.diGuiTree(item.children);
}) })
}, },
operationTree(item) {
if(item.dirctory) {
item.isDisabled =true
}
delete item.children
},
searchTree(element, id) {
// id
if (element.id == id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id);
}
return result;
}
return null;
},
dataProcess(backResource) {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return item.id
})
Array.prototype.diff = function(a) {
return this.filter(function(i) {return a.indexOf(i) < 0;});
};
let diffSet = this.resourceList.diff(resourceIdArr);
let optionsCmp = []
if(diffSet.length>0) {
diffSet.forEach(item=>{
backResource.forEach(item1=>{
if(item==item1.id || item==item1.res) {
optionsCmp.push(item1)
}
})
})
}
let noResources = [{
id: -1,
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {
this.allNoResources = optionsCmp
optionsCmp = optionsCmp.map(item=>{
return {id: item.id,name: item.name,fullName: item.res}
})
optionsCmp.forEach(item=>{
item.isNew = true
})
noResources[0].children = optionsCmp
this.mainJarList = this.mainJarList.concat(noResources)
}
}
},
/** /**
* verification * verification
*/ */
@ -193,6 +263,12 @@
return false return false
} }
// noRes
if (this.noRes.length>0) {
this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`)
return false
}
// localParams Subcomponent verification // localParams Subcomponent verification
if (!this.$refs.refLocalParams._verifProp()) { if (!this.$refs.refLocalParams._verifProp()) {
return false return false
@ -231,14 +307,36 @@
}, },
computed: { computed: {
cacheParams () { cacheParams () {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return {id: item.id,name: item.name,res: item.fullName}
})
}
let result = []
resourceIdArr.forEach(item=>{
this.allNoResources.forEach(item1=>{
if(item.id==item1.id) {
// resultBool = true
result.push(item1)
}
})
})
this.noRes = result
return { return {
mainClass: this.mainClass, mainClass: this.mainClass,
mainJar: { mainJar: {
id: this.mainJar id: this.mainJar
}, },
resourceList: _.map(this.resourceList, v => { resourceList: resourceIdArr,
return {id: v}
}),
localParams: this.localParams, localParams: this.localParams,
mainArgs: this.mainArgs, mainArgs: this.mainArgs,
others: this.others, others: this.others,
@ -273,23 +371,27 @@
let resourceList = o.params.resourceList || [] let resourceList = o.params.resourceList || []
if (resourceList.length) { if (resourceList.length) {
_.map(resourceList, v => { _.map(resourceList, v => {
if(v.res) { if(!v.id) {
this.store.dispatch('dag/getResourceId',{ this.store.dispatch('dag/getResourceId',{
type: 'FILE', type: 'FILE',
fullName: '/'+v.res fullName: '/'+v.res
}).then(res => { }).then(res => {
this.resourceList.push(res.id) this.resourceList.push(res.id)
this.dataProcess(backResource)
}).catch(e => { }).catch(e => {
this.$message.error(e.msg || '') this.resourceList.push(v.res)
this.dataProcess(backResource)
}) })
} else { } else {
this.resourceList.push(v.id) this.resourceList.push(v.id)
this.dataProcess(backResource)
} }
}) })
this.cacheResourceList = resourceList this.cacheResourceList = resourceList
} }
// backfill localParams // backfill localParams
let backResource = o.params.resourceList || []
let localParams = o.params.localParams || [] let localParams = o.params.localParams || []
if (localParams.length) { if (localParams.length) {
this.localParams = localParams this.localParams = localParams

120
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue

@ -80,6 +80,13 @@
// Cache ResourceList // Cache ResourceList
cacheResourceList: [], cacheResourceList: [],
resourceOptions: [], resourceOptions: [],
normalizer(node) {
return {
label: node.name
}
},
allNoResources: [],
noRes: []
} }
}, },
mixins: [disabledState], mixins: [disabledState],
@ -96,9 +103,9 @@
/** /**
* return resourceList * return resourceList
*/ */
_onResourcesData (a) { // _onResourcesData (a) {
this.resourceList = a // this.resourceList = a
}, // },
/** /**
* cache resourceList * cache resourceList
*/ */
@ -120,6 +127,12 @@
return false return false
} }
// noRes
if (this.noRes.length>0) {
this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`)
return false
}
// storage // storage
this.$emit('on-params', { this.$emit('on-params', {
resourceList: _.map(this.resourceList, v => { resourceList: _.map(this.resourceList, v => {
@ -166,6 +179,67 @@
item.isDisabled =true item.isDisabled =true
} }
delete item.children delete item.children
},
searchTree(element, id) {
// id
if (element.id == id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id);
}
return result;
}
return null;
},
dataProcess(backResource) {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.resourceOptions.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return item.id
})
Array.prototype.diff = function(a) {
return this.filter(function(i) {return a.indexOf(i) < 0;});
};
let diffSet = this.resourceList.diff(resourceIdArr);
let optionsCmp = []
if(diffSet.length>0) {
diffSet.forEach(item=>{
backResource.forEach(item1=>{
if(item==item1.id || item==item1.res) {
optionsCmp.push(item1)
}
})
})
}
let noResources = [{
id: -1,
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {
this.allNoResources = optionsCmp
optionsCmp = optionsCmp.map(item=>{
return {id: item.id,name: item.name,fullName: item.res}
})
optionsCmp.forEach(item=>{
item.isNew = true
})
noResources[0].children = optionsCmp
this.resourceOptions = this.resourceOptions.concat(noResources)
}
}
} }
}, },
watch: { watch: {
@ -176,10 +250,32 @@
}, },
computed: { computed: {
cacheParams () { cacheParams () {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.resourceOptions.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return {id: item.id,name: item.name,res: item.fullName}
})
}
let result = []
resourceIdArr.forEach(item=>{
this.allNoResources.forEach(item1=>{
if(item.id==item1.id) {
// resultBool = true
result.push(item1)
}
})
})
this.noRes = result
return { return {
resourceList: _.map(this.resourceList, v => { resourceList: resourceIdArr,
return {id: v}
}),
localParams: this.localParams localParams: this.localParams
} }
} }
@ -187,7 +283,7 @@
created () { created () {
let item = this.store.state.dag.resourcesListS let item = this.store.state.dag.resourcesListS
this.diGuiTree(item) this.diGuiTree(item)
this.options = item this.resourceOptions = item
let o = this.backfillItem let o = this.backfillItem
// Non-null objects represent backfill // Non-null objects represent backfill
@ -195,20 +291,24 @@
this.rawScript = o.params.rawScript || '' this.rawScript = o.params.rawScript || ''
// backfill resourceList // backfill resourceList
let backResource = o.params.resourceList || []
let resourceList = o.params.resourceList || [] let resourceList = o.params.resourceList || []
if (resourceList.length) { if (resourceList.length) {
_.map(resourceList, v => { _.map(resourceList, v => {
if(v.res) { if(!v.id) {
this.store.dispatch('dag/getResourceId',{ this.store.dispatch('dag/getResourceId',{
type: 'FILE', type: 'FILE',
fullName: '/'+v.res fullName: '/'+v.res
}).then(res => { }).then(res => {
this.resourceList.push(res.id) this.resourceList.push(res.id)
this.dataProcess(backResource)
}).catch(e => { }).catch(e => {
this.$message.error(e.msg || '') this.resourceList.push(v.res)
this.dataProcess(backResource)
}) })
} else { } else {
this.resourceList.push(v.id) this.resourceList.push(v.id)
this.dataProcess(backResource)
} }
}) })
this.cacheResourceList = resourceList this.cacheResourceList = resourceList
@ -230,6 +330,6 @@
editor.toTextArea() // Uninstall editor.toTextArea() // Uninstall
editor.off($('.code-python-mirror'), 'keypress', this.keypress) editor.off($('.code-python-mirror'), 'keypress', this.keypress)
}, },
components: { mLocalParams, mListBox, mResources } components: { mLocalParams, mListBox, mResources,Treeselect }
} }
</script> </script>

4
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue

@ -266,8 +266,8 @@
} }
let noResources = [{ let noResources = [{
id: -1, id: -1,
name: $t('No resources'), name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('No resources'), fullName: '/'+$t('Unauthorized or deleted resources'),
children: [] children: []
}] }]
if(optionsCmp.length>0) { if(optionsCmp.length>0) {

107
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

@ -254,7 +254,9 @@
return { return {
label: node.name label: node.name
} }
} },
allNoResources: [],
noRes: []
} }
}, },
props: { props: {
@ -305,6 +307,67 @@
} }
delete item.children delete item.children
}, },
searchTree(element, id) {
// id
if (element.id == id) {
return element;
} else if (element.children != null) {
var i;
var result = null;
for (i = 0; result == null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id);
}
return result;
}
return null;
},
dataProcess(backResource) {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return item.id
})
Array.prototype.diff = function(a) {
return this.filter(function(i) {return a.indexOf(i) < 0;});
};
let diffSet = this.resourceList.diff(resourceIdArr);
let optionsCmp = []
if(diffSet.length>0) {
diffSet.forEach(item=>{
backResource.forEach(item1=>{
if(item==item1.id || item==item1.res) {
optionsCmp.push(item1)
}
})
})
}
let noResources = [{
id: -1,
name: $t('Unauthorized or deleted resources'),
fullName: '/'+$t('Unauthorized or deleted resources'),
children: []
}]
if(optionsCmp.length>0) {
this.allNoResources = optionsCmp
optionsCmp = optionsCmp.map(item=>{
return {id: item.id,name: item.name,fullName: item.res}
})
optionsCmp.forEach(item=>{
item.isNew = true
})
noResources[0].children = optionsCmp
this.mainJarList = this.mainJarList.concat(noResources)
}
}
},
/** /**
* verification * verification
*/ */
@ -324,6 +387,12 @@
return false return false
} }
// noRes
if (this.noRes.length>0) {
this.$message.warning(`${i18n.$t('Please delete all non-existent resources')}`)
return false
}
if (!Number.isInteger(parseInt(this.numExecutors))) { if (!Number.isInteger(parseInt(this.numExecutors))) {
this.$message.warning(`${i18n.$t('The number of Executors should be a positive integer')}`) this.$message.warning(`${i18n.$t('The number of Executors should be a positive integer')}`)
return false return false
@ -400,15 +469,37 @@
}, },
computed: { computed: {
cacheParams () { cacheParams () {
let isResourceId = []
let resourceIdArr = []
if(this.resourceList.length>0) {
this.resourceList.forEach(v=>{
this.mainJarList.forEach(v1=>{
if(this.searchTree(v1,v)) {
isResourceId.push(this.searchTree(v1,v))
}
})
})
resourceIdArr = isResourceId.map(item=>{
return {id: item.id,name: item.name,res: item.fullName}
})
}
let result = []
resourceIdArr.forEach(item=>{
this.allNoResources.forEach(item1=>{
if(item.id==item1.id) {
// resultBool = true
result.push(item1)
}
})
})
this.noRes = result
return { return {
mainClass: this.mainClass, mainClass: this.mainClass,
mainJar: { mainJar: {
id: this.mainJar id: this.mainJar
}, },
deployMode: this.deployMode, deployMode: this.deployMode,
resourceList: _.map(this.resourceList, v => { resourceList: resourceIdArr,
return {id: v}
}),
localParams: this.localParams, localParams: this.localParams,
driverCores: this.driverCores, driverCores: this.driverCores,
driverMemory: this.driverMemory, driverMemory: this.driverMemory,
@ -453,20 +544,24 @@
this.sparkVersion = o.params.sparkVersion || 'SPARK2' this.sparkVersion = o.params.sparkVersion || 'SPARK2'
// backfill resourceList // backfill resourceList
let backResource = o.params.resourceList || []
let resourceList = o.params.resourceList || [] let resourceList = o.params.resourceList || []
if (resourceList.length) { if (resourceList.length) {
_.map(resourceList, v => { _.map(resourceList, v => {
if(v.res) { if(!v.id) {
this.store.dispatch('dag/getResourceId',{ this.store.dispatch('dag/getResourceId',{
type: 'FILE', type: 'FILE',
fullName: '/'+v.res fullName: '/'+v.res
}).then(res => { }).then(res => {
this.resourceList.push(res.id) this.resourceList.push(res.id)
this.dataProcess(backResource)
}).catch(e => { }).catch(e => {
this.$message.error(e.msg || '') this.resourceList.push(v.res)
this.dataProcess(backResource)
}) })
} else { } else {
this.resourceList.push(v.id) this.resourceList.push(v.id)
this.dataProcess(backResource)
} }
}) })
this.cacheResourceList = resourceList this.cacheResourceList = resourceList

4
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue

@ -49,10 +49,10 @@
<th scope="col" width="60"> <th scope="col" width="60">
<span>{{$t('Run Times')}}</span> <span>{{$t('Run Times')}}</span>
</th> </th>
<th scope="col" width="100"> <th scope="col" width="125">
<span>{{$t('host')}}</span> <span>{{$t('host')}}</span>
</th> </th>
<th scope="col" width="60"> <th scope="col" width="55">
<span>{{$t('fault-tolerant sign')}}</span> <span>{{$t('fault-tolerant sign')}}</span>
</th> </th>
<th scope="col" width="30"> <th scope="col" width="30">

6
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue

@ -43,13 +43,13 @@
<th scope="col" width="140"> <th scope="col" width="140">
<span>{{$t('Start Time')}}</span> <span>{{$t('Start Time')}}</span>
</th> </th>
<th scope="col" width="140"> <th scope="col" width="125">
<span>{{$t('End Time')}}</span> <span>{{$t('End Time')}}</span>
</th> </th>
<th scope="col" width="110"> <th scope="col" width="130">
<span>{{$t('host')}}</span> <span>{{$t('host')}}</span>
</th> </th>
<th scope="col" width="74"> <th scope="col" width="70">
<span>{{$t('Duration')}}(s)</span> <span>{{$t('Duration')}}(s)</span>
</th> </th>
<th scope="col" width="84"> <th scope="col" width="84">

2
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -588,6 +588,6 @@ export default {
'Branch flow': 'Branch flow', 'Branch flow': 'Branch flow',
'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow', 'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow',
'Successful branch flow and failed branch flow are required': 'Successful branch flow and failed branch flow are required', 'Successful branch flow and failed branch flow are required': 'Successful branch flow and failed branch flow are required',
'No resources': 'No resources', 'Unauthorized or deleted resources': 'Unauthorized or deleted resources',
'Please delete all non-existent resources': 'Please delete all non-existent resources', 'Please delete all non-existent resources': 'Please delete all non-existent resources',
} }

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -588,6 +588,6 @@ export default {
'Branch flow': '分支流转', 'Branch flow': '分支流转',
'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点', 'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点',
'Successful branch flow and failed branch flow are required': '成功分支流转和失败分支流转必填', 'Successful branch flow and failed branch flow are required': '成功分支流转和失败分支流转必填',
'No resources': '未授权或已删除资源', 'Unauthorized or deleted resources': '未授权或已删除资源',
'Please delete all non-existent resources': '请删除所有未授权或已删除资源', 'Please delete all non-existent resources': '请删除所有未授权或已删除资源',
} }

85
sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql

@ -167,7 +167,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_process_instance_A_worker_group()
AND TABLE_SCHEMA=(SELECT DATABASE()) AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group') AND COLUMN_NAME ='worker_group')
THEN THEN
ALTER TABLE t_ds_process_instance ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; ALTER TABLE t_ds_process_instance ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF; END IF;
END; END;
@ -207,7 +207,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_task_instance_A_worker_group()
AND TABLE_SCHEMA=(SELECT DATABASE()) AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group') AND COLUMN_NAME ='worker_group')
THEN THEN
ALTER TABLE t_ds_task_instance ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; ALTER TABLE t_ds_task_instance ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF; END IF;
END; END;
@ -247,7 +247,7 @@ CREATE PROCEDURE ac_dolphin_T_t_ds_schedules_A_worker_group()
AND TABLE_SCHEMA=(SELECT DATABASE()) AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group') AND COLUMN_NAME ='worker_group')
THEN THEN
ALTER TABLE t_ds_schedules ADD `worker_group` varchar(255) DEFAULT '' COMMENT 'worker group'; ALTER TABLE t_ds_schedules ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF; END IF;
END; END;
@ -277,4 +277,83 @@ delimiter ;
CALL dc_dolphin_T_t_ds_schedules_D_worker_group_id; CALL dc_dolphin_T_t_ds_schedules_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_schedules_D_worker_group_id; DROP PROCEDURE dc_dolphin_T_t_ds_schedules_D_worker_group_id;
-- ac_dolphin_T_t_ds_command_A_worker_group
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_command_A_worker_group;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_command ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_command_A_worker_group;
DROP PROCEDURE ac_dolphin_T_t_ds_command_A_worker_group;
-- dc_dolphin_T_t_ds_command_D_worker_group_id
drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id;
delimiter d//
CREATE PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_command DROP COLUMN worker_group_id;
END IF;
END;
d//
delimiter ;
CALL dc_dolphin_T_t_ds_command_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_command_D_worker_group_id;
-- ac_dolphin_T_t_ds_error_command_A_worker_group
drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group;
delimiter d//
CREATE PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_error_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_error_command ADD `worker_group` varchar(64) DEFAULT '' COMMENT 'worker group';
END IF;
END;
d//
delimiter ;
CALL ac_dolphin_T_t_ds_error_command_A_worker_group;
DROP PROCEDURE ac_dolphin_T_t_ds_error_command_A_worker_group;
-- dc_dolphin_T_t_ds_error_command_D_worker_group_id
drop PROCEDURE if EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id;
delimiter d//
CREATE PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_error_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id;
END IF;
END;
d//
delimiter ;
CALL dc_dolphin_T_t_ds_error_command_D_worker_group_id;
DROP PROCEDURE dc_dolphin_T_t_ds_error_command_D_worker_group_id;

90
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -164,7 +164,7 @@ BEGIN
AND TABLE_NAME='t_ds_process_instance' AND TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='worker_group') AND COLUMN_NAME ='worker_group')
THEN THEN
ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(255) DEFAULT null; ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF; END IF;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
@ -207,7 +207,7 @@ BEGIN
AND TABLE_NAME='t_ds_task_instance' AND TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='worker_group') AND COLUMN_NAME ='worker_group')
THEN THEN
ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(255) DEFAULT null; ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF; END IF;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
@ -249,7 +249,7 @@ BEGIN
AND TABLE_NAME='t_ds_schedules' AND TABLE_NAME='t_ds_schedules'
AND COLUMN_NAME ='worker_group') AND COLUMN_NAME ='worker_group')
THEN THEN
ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(255) DEFAULT null; ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF; END IF;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
@ -279,4 +279,88 @@ delimiter ;
select dc_dolphin_T_t_ds_schedules_D_worker_group_id(); select dc_dolphin_T_t_ds_schedules_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_schedules_D_worker_group_id(); DROP FUNCTION dc_dolphin_T_t_ds_schedules_D_worker_group_id();
-- ac_dolphin_T_t_ds_command_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_command_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_command_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_command'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_command ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_command_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_command_A_worker_group();
-- dc_dolphin_T_t_ds_command_D_worker_group_id
delimiter ;
DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_command_D_worker_group_id();
delimiter d//
CREATE FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_command'
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_command DROP COLUMN worker_group_id;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select dc_dolphin_T_t_ds_command_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_command_D_worker_group_id();
-- ac_dolphin_T_t_ds_error_command_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_error_command_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_error_command'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_error_command ADD COLUMN worker_group varchar(64) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_error_command_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_error_command_A_worker_group();
-- dc_dolphin_T_t_ds_error_command_D_worker_group_id
delimiter ;
DROP FUNCTION IF EXISTS dc_dolphin_T_t_ds_error_command_D_worker_group_id();
delimiter d//
CREATE FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_error_command'
AND COLUMN_NAME ='worker_group_id')
THEN
ALTER TABLE t_ds_error_command DROP COLUMN worker_group_id;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select dc_dolphin_T_t_ds_error_command_D_worker_group_id();
DROP FUNCTION dc_dolphin_T_t_ds_error_command_D_worker_group_id();

Loading…
Cancel
Save