Browse Source

Merge pull request #13 from apache/dev

update code from ds
pull/3/MERGE
BoYiZhang 4 years ago committed by GitHub
parent
commit
dffcf1e5bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .github/workflows/ci_e2e.yml
  2. 2
      docker/build/conf/zookeeper/zoo.cfg
  3. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  4. 21
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  5. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  6. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  7. 5
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  8. 34
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  9. 22
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  10. 12
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  11. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  12. 36
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
  13. 23
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
  14. 9
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
  15. 20
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java
  16. 12
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java
  17. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  18. 20
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  19. 62
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  20. 76
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
  21. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  22. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
  23. 66
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
  24. 71
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  25. 108
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/pre_tasks.vue
  26. 24
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/versions.vue
  27. 4
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  28. 4
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  29. 1
      pom.xml
  30. 3
      sql/dolphinscheduler-postgre.sql
  31. 3
      sql/dolphinscheduler_mysql.sql
  32. 26
      sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
  33. 28
      sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql

2
.github/workflows/ci_e2e.yml

@ -59,7 +59,7 @@ jobs:
sudo dpkg -i google-chrome*.deb sudo dpkg -i google-chrome*.deb
sudo apt-get install -f -y sudo apt-get install -f -y
google-chrome -version google-chrome -version
googleVersion=`google-chrome -version | awk '{print $3}'` googleVersion=$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE)
wget -N https://chromedriver.storage.googleapis.com/${googleVersion}/chromedriver_linux64.zip wget -N https://chromedriver.storage.googleapis.com/${googleVersion}/chromedriver_linux64.zip
unzip chromedriver_linux64.zip unzip chromedriver_linux64.zip
sudo mv -f chromedriver /usr/local/share/chromedriver sudo mv -f chromedriver /usr/local/share/chromedriver

2
docker/build/conf/zookeeper/zoo.cfg

@ -43,3 +43,5 @@ clientPort=2181
# Purge task interval in hours # Purge task interval in hours
# Set to "0" to disable auto purge feature # Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1 #autopurge.purgeInterval=1
#Four Letter Words commands:stat,ruok,conf,isro
4lw.commands.whitelist=*

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -193,7 +193,7 @@ public enum Status {
BATCH_COPY_PROCESS_DEFINITION_ERROR(10159, "batch copy process definition error", "复制工作流错误"), BATCH_COPY_PROCESS_DEFINITION_ERROR(10159, "batch copy process definition error", "复制工作流错误"),
BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"), BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"),
QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"), QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"),
DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10162,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
@ -294,4 +294,4 @@ public enum Status {
return this.enMsg; return this.enMsg;
} }
} }
} }

21
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -24,7 +24,6 @@ import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
@ -711,4 +710,24 @@ public class ProcessInstanceService extends BaseService {
return DagHelper.buildDagGraph(processDag); return DagHelper.buildDagGraph(processDag);
} }
/**
* query process instance by processDefinitionId and stateArray
* @param processDefinitionId processDefinitionId
* @param states states array
* @return process instance list
*/
public List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) {
return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states);
}
/**
* query process instance by processDefinitionId
* @param processDefinitionId processDefinitionId
* @param size size
* @return process instance list
*/
public List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size) {
return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size);
}
} }

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
@ -139,6 +140,10 @@ public class ResourcesService extends BaseService {
} }
} }
result.setData(resultMap); result.setData(resultMap);
} catch (DuplicateKeyException e) {
logger.error("resource directory {} has exist, can't recreate", fullName);
putMsg(result, Status.RESOURCE_EXIST);
return result;
} catch (Exception e) { } catch (Exception e) {
logger.error("resource already exists, can't recreate ", e); logger.error("resource already exists, can't recreate ", e);
throw new RuntimeException("resource already exists, can't recreate"); throw new RuntimeException("resource already exists, can't recreate");

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.CheckUtils;
@ -64,7 +65,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@ -134,7 +134,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
private ProcessDefinitionMapper processDefineMapper; private ProcessDefinitionMapper processDefineMapper;
@Autowired @Autowired
private ProcessInstanceMapper processInstanceMapper; private ProcessInstanceService processInstanceService;
@Autowired @Autowired
private TaskInstanceMapper taskInstanceMapper; private TaskInstanceMapper taskInstanceMapper;
@ -488,6 +488,12 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId); putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId);
return result; return result;
} }
// check process instances is already running
List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId, Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL,processInstances.size());
return result;
}
// get the timing according to the process definition // get the timing according to the process definition
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
@ -1247,7 +1253,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
/** /**
* List of process instances * List of process instances
*/ */
List<ProcessInstance> processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineId(processId, limit);
for (ProcessInstance processInstance : processInstanceList) { for (ProcessInstance processInstance : processInstanceList) {
processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime())); processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime()));

5
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -41,7 +41,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@ -94,7 +93,7 @@ public class ProcessDefinitionServiceTest {
private ProcessService processService; private ProcessService processService;
@Mock @Mock
private ProcessInstanceMapper processInstanceMapper; private ProcessInstanceService processInstanceService;
@Mock @Mock
private TaskInstanceMapper taskInstanceMapper; private TaskInstanceMapper taskInstanceMapper;
@ -734,7 +733,7 @@ public class ProcessDefinitionServiceTest {
//task instance not exist //task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceMapper.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null); Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null);
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10); Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));

34
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -195,7 +195,7 @@ public class HadoopUtils implements Closeable {
*/ */
String appUrl = ""; String appUrl = "";
if (StringUtils.isEmpty(rmHaIds)){ if (StringUtils.isEmpty(rmHaIds)) {
//single resourcemanager enabled //single resourcemanager enabled
appUrl = appAddress; appUrl = appAddress;
yarnEnabled = true; yarnEnabled = true;
@ -206,7 +206,7 @@ public class HadoopUtils implements Closeable {
logger.info("application url : {}", appUrl); logger.info("application url : {}", appUrl);
} }
if(StringUtils.isBlank(appUrl)){ if (StringUtils.isBlank(appUrl)) {
throw new Exception("application url is blank"); throw new Exception("application url is blank");
} }
return String.format(appUrl, applicationId); return String.format(appUrl, applicationId);
@ -417,25 +417,33 @@ public class HadoopUtils implements Closeable {
String applicationUrl = getApplicationUrl(applicationId); String applicationUrl = getApplicationUrl(applicationId);
logger.info("applicationUrl={}", applicationUrl); logger.info("applicationUrl={}", applicationUrl);
String responseContent ; String responseContent;
if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) { if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) {
responseContent = KerberosHttpClient.get(applicationUrl); responseContent = KerberosHttpClient.get(applicationUrl);
} else { } else {
responseContent = HttpUtils.get(applicationUrl); responseContent = HttpUtils.get(applicationUrl);
} }
if (responseContent != null) { if (responseContent != null) {
ObjectNode jsonObject = JSONUtils.parseObject(responseContent); ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
if (!jsonObject.has("app")) {
return ExecutionStatus.FAILURE;
}
result = jsonObject.path("app").path("finalStatus").asText(); result = jsonObject.path("app").path("finalStatus").asText();
} else { } else {
//may be in job history //may be in job history
String jobHistoryUrl = getJobHistoryUrl(applicationId); String jobHistoryUrl = getJobHistoryUrl(applicationId);
logger.info("jobHistoryUrl={}", jobHistoryUrl); logger.info("jobHistoryUrl={}", jobHistoryUrl);
responseContent = HttpUtils.get(jobHistoryUrl); responseContent = HttpUtils.get(jobHistoryUrl);
ObjectNode jsonObject = JSONUtils.parseObject(responseContent); if (null != responseContent) {
if (!jsonObject.has("job")){ ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
if (!jsonObject.has("job")) {
return ExecutionStatus.FAILURE;
}
result = jsonObject.path("job").path("state").asText();
} else {
return ExecutionStatus.FAILURE; return ExecutionStatus.FAILURE;
} }
result = jsonObject.path("job").path("state").asText();
} }
switch (result) { switch (result) {
@ -474,7 +482,7 @@ public class HadoopUtils implements Closeable {
/** /**
* hdfs resource dir * hdfs resource dir
* *
* @param tenantCode tenant code * @param tenantCode tenant code
* @param resourceType resource type * @param resourceType resource type
* @return hdfs resource dir * @return hdfs resource dir
*/ */
@ -679,7 +687,7 @@ public class HadoopUtils implements Closeable {
ObjectNode jsonObject = JSONUtils.parseObject(retStr); ObjectNode jsonObject = JSONUtils.parseObject(retStr);
//get ResourceManager state //get ResourceManager state
if (!jsonObject.has("clusterInfo")){ if (!jsonObject.has("clusterInfo")) {
return null; return null;
} }
return jsonObject.get("clusterInfo").path("haState").asText(); return jsonObject.get("clusterInfo").path("haState").asText();

22
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/** /**
* process instance mapper interface * process instance mapper interface
*/ */
@ -201,9 +204,20 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
* @param endTime * @param endTime
* @return ProcessInstance list * @return ProcessInstance list
*/ */
List<ProcessInstance> queryTopNProcessInstance(@Param("size") int size, List<ProcessInstance> queryTopNProcessInstance(@Param("size") int size,
@Param("startTime") Date startTime, @Param("startTime") Date startTime,
@Param("endTime") Date endTime, @Param("endTime") Date endTime,
@Param("status")ExecutionStatus status); @Param("status")ExecutionStatus status);
/**
* query process instance by processDefinitionId and stateArray
* @param processDefinitionId processDefinitionId
* @param states states array
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineIdAndStatus(
@Param("processDefinitionId") int processDefinitionId,
@Param("states") int[] states);
} }

12
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -191,6 +191,16 @@
</if> </if>
order by end_time desc limit 1 order by end_time desc limit 1
</select> </select>
<select id="queryByProcessDefineIdAndStatus"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select *
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
order by id asc
</select>
</mapper> </mapper>

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -27,7 +27,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
@ -155,11 +154,12 @@ public class NettyRemotingClient {
this.bootstrap this.bootstrap
.group(this.workerGroup) .group(this.workerGroup)
.channel(NioSocketChannel.class) .channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {

36
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

@ -17,17 +17,6 @@
package org.apache.dolphinscheduler.remote; package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -37,15 +26,25 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
/** /**
* remoting netty server * remoting netty server
*/ */
@ -146,17 +145,17 @@ public class NettyRemotingServer {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap this.serverBootstrap
.group(this.bossGroup, this.workGroup) .group(this.bossGroup, this.workGroup)
.channel(NioServerSocketChannel.class) .channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<NioSocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(NioSocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
initNettyChannel(ch); initNettyChannel(ch);
} }
}); });
@ -182,9 +181,8 @@ public class NettyRemotingServer {
* init netty channel * init netty channel
* *
* @param ch socket channel * @param ch socket channel
* @throws Exception
*/ */
private void initNettyChannel(NioSocketChannel ch) throws Exception { private void initNettyChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", encoder); pipeline.addLast("encoder", encoder);
pipeline.addLast("decoder", new NettyDecoder()); pipeline.addLast("decoder", new NettyDecoder());

23
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java

@ -14,22 +14,23 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.config; package org.apache.dolphinscheduler.remote.config;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
/** /**
* netty client config * netty client config
*/ */
public class NettyClientConfig { public class NettyClientConfig {
/** /**
* worker threadsdefault get machine cpus * worker threadsdefault get machine cpus
*/ */
private int workerThreads = Constants.CPUS; private int workerThreads = Constants.CPUS;
/** /**
* whether tpc delay * whether tpc delay
*/ */
private boolean tcpNoDelay = true; private boolean tcpNoDelay = true;
@ -39,15 +40,20 @@ public class NettyClientConfig {
private boolean soKeepalive = true; private boolean soKeepalive = true;
/** /**
* send buffer size * send buffer size
*/ */
private int sendBufferSize = 65535; private int sendBufferSize = 65535;
/** /**
* receive buffer size * receive buffer size
*/ */
private int receiveBufferSize = 65535; private int receiveBufferSize = 65535;
/**
* connect timeout millis
*/
private int connectTimeoutMillis = 3000;
public int getWorkerThreads() { public int getWorkerThreads() {
return workerThreads; return workerThreads;
} }
@ -88,4 +94,11 @@ public class NettyClientConfig {
this.receiveBufferSize = receiveBufferSize; this.receiveBufferSize = receiveBufferSize;
} }
public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
}
} }

9
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -21,7 +22,7 @@ import java.nio.charset.StandardCharsets;
/** /**
* constant * constant
*/ */
public class Constants { public class Constants {
@ -30,12 +31,12 @@ public class Constants {
public static final String SLASH = "/"; public static final String SLASH = "/";
/** /**
* charset * charset
*/ */
public static final Charset UTF8 = StandardCharsets.UTF_8; public static final Charset UTF8 = StandardCharsets.UTF_8;
/** /**
* cpus * cpus
*/ */
public static final int CPUS = Runtime.getRuntime().availableProcessors(); public static final int CPUS = Runtime.getRuntime().availableProcessors();
@ -45,7 +46,7 @@ public class Constants {
/** /**
* netty epoll enable switch * netty epoll enable switch
*/ */
public static final String NETTY_EPOLL_ENABLE = System.getProperty("netty.epoll.enable"); public static final String NETTY_EPOLL_ENABLE = System.getProperty("netty.epoll.enable", "true");
/** /**
* OS Name * OS Name

20
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java

@ -18,6 +18,12 @@
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/** /**
* NettyUtils * NettyUtils
@ -39,4 +45,18 @@ public class NettyUtils {
return Boolean.parseBoolean(enableNettyEpoll); return Boolean.parseBoolean(enableNettyEpoll);
} }
public static Class<? extends ServerSocketChannel> getServerSocketChannelClass() {
if (useEpoll()) {
return EpollServerSocketChannel.class;
}
return NioServerSocketChannel.class;
}
public static Class<? extends SocketChannel> getSocketChannelClass() {
if (useEpoll()) {
return EpollSocketChannel.class;
}
return NioSocketChannel.class;
}
} }

12
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java

@ -17,20 +17,28 @@
package org.apache.dolphinscheduler.remote; package org.apache.dolphinscheduler.remote;
import static org.apache.dolphinscheduler.remote.utils.Constants.OS_NAME;
import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import io.netty.channel.epoll.Epoll;
/** /**
* NettyUtilTest * NettyUtilTest
*/ */
public class NettyUtilTest { public class NettyUtilTest {
@Test @Test
public void testUserEpoll() { public void testUserEpoll() {
System.setProperty("netty.epoll.enable", "false"); if (OS_NAME.toLowerCase().contains("linux") && Epoll.isAvailable()) {
Assert.assertFalse(NettyUtils.useEpoll()); Assert.assertTrue(NettyUtils.useEpoll());
} else {
Assert.assertFalse(NettyUtils.useEpoll());
}
} }
} }

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
@ -123,7 +125,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
} }
private void initTaskParameters() { private void initTaskParameters() {
this.taskInstance.setLogPath(getTaskLogPath(taskInstance)); this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());

20
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import com.fasterxml.jackson.annotation.JsonFormat; import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -24,16 +26,22 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.slf4j.LoggerFactory;
import java.util.*; import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonFormat;
public class DependentTaskExecThread extends MasterBaseTaskExecThread { public class DependentTaskExecThread extends MasterBaseTaskExecThread {
@ -172,7 +180,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
} }
private void initTaskParameters() { private void initTaskParameters() {
taskInstance.setLogPath(getTaskLogPath(taskInstance)); taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());

62
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -36,10 +35,6 @@ import java.util.concurrent.Callable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
/** /**
* master task exec base class * master task exec base class
*/ */
@ -85,11 +80,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* taskUpdateQueue * taskUpdateQueue
*/ */
private TaskPriorityQueue taskUpdateQueue; private TaskPriorityQueue taskUpdateQueue;
/** /**
* constructor of MasterBaseTaskExecThread * constructor of MasterBaseTaskExecThread
* @param taskInstance task instance *
* @param taskInstance task instance
*/ */
public MasterBaseTaskExecThread(TaskInstance taskInstance){ public MasterBaseTaskExecThread(TaskInstance taskInstance) {
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class); this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.cancel = false; this.cancel = false;
@ -100,24 +97,26 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/** /**
* get task instance * get task instance
*
* @return TaskInstance * @return TaskInstance
*/ */
public TaskInstance getTaskInstance(){ public TaskInstance getTaskInstance() {
return this.taskInstance; return this.taskInstance;
} }
/** /**
* kill master base task exec thread * kill master base task exec thread
*/ */
public void kill(){ public void kill() {
this.cancel = true; this.cancel = true;
} }
/** /**
* submit master base task exec thread * submit master base task exec thread
*
* @return TaskInstance * @return TaskInstance
*/ */
protected TaskInstance submit(){ protected TaskInstance submit() {
Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes(); Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
@ -156,14 +155,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
/** /**
* dispatcht task * dispatcht task
*
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return whether submit task success * @return whether submit task success
*/ */
public Boolean dispatchTask(TaskInstance taskInstance) { public Boolean dispatchTask(TaskInstance taskInstance) {
try{ try{
if(taskInstance.isConditionsTask() if(taskInstance.isConditionsTask()
|| taskInstance.isDependTask() || taskInstance.isDependTask()
@ -202,7 +200,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/** /**
* buildTaskPriorityInfo * buildTaskPriorityInfo
* *
* @param processInstancePriority processInstancePriority * @param processInstancePriority processInstancePriority
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
@ -215,7 +213,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
int processInstanceId, int processInstanceId,
int taskInstancePriority, int taskInstancePriority,
int taskInstanceId, int taskInstanceId,
String workerGroup){ String workerGroup) {
return processInstancePriority + return processInstancePriority +
UNDERLINE + UNDERLINE +
processInstanceId + processInstanceId +
@ -229,14 +227,16 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/** /**
* submit wait complete * submit wait complete
*
* @return true * @return true
*/ */
protected Boolean submitWaitComplete(){ protected Boolean submitWaitComplete() {
return true; return true;
} }
/** /**
* call * call
*
* @return boolean * @return boolean
* @throws Exception exception * @throws Exception exception
*/ */
@ -246,34 +246,4 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return submitWaitComplete(); return submitWaitComplete();
} }
/**
* get task log path
* @return log path
*/
public String getTaskLogPath(TaskInstance task) {
String logPath;
try{
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
logPath = baseLog + Constants.SINGLE_SLASH +
task.getProcessDefinitionId() + Constants.SINGLE_SLASH +
task.getProcessInstanceId() + Constants.SINGLE_SLASH +
task.getId() + ".log";
}else{
logPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
task.getProcessDefinitionId() + Constants.SINGLE_SLASH +
task.getProcessInstanceId() + Constants.SINGLE_SLASH +
task.getId() + ".log";
}
}catch (Exception e){
logger.error("logger", e);
logPath = "";
}
return logPath;
}
} }

76
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import javax.transaction.NotSupportedException;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.sift.SiftingAppender;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.spi.AppenderAttachable;
public class LogUtils {
private LogUtils() throws NotSupportedException {
throw new NotSupportedException();
}
/**
* get task log path
*/
@SuppressWarnings("unchecked")
private static String getTaskLogPath(int processDefinitionId, int processInstanceId, int taskInstanceId) {
// Optional.map will be skipped if null
return Optional.of(LoggerFactory.getILoggerFactory())
.map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
.map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
.map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
.map(TaskLogDiscriminator::getLogBase)
.map(e -> Paths.get(e)
.toAbsolutePath()
.resolve(String.valueOf(processDefinitionId))
.resolve(String.valueOf(processInstanceId))
.resolve(taskInstanceId + ".log"))
.map(Path::toString)
.orElse("");
}
/**
* get task log path by TaskInstance
*/
public static String getTaskLogPath(TaskInstance taskInstance) {
return getTaskLogPath(taskInstance.getProcessDefinitionId(), taskInstance.getProcessInstanceId(), taskInstance.getId());
}
/**
* get task log path by TaskExecutionContext
*/
public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
return getTaskLogPath(taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
}
}

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -36,7 +34,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -51,8 +49,6 @@ import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException; import com.github.rholder.retry.RetryException;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import io.netty.channel.Channel; import io.netty.channel.Channel;
/** /**
@ -154,28 +150,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
} }
} }
/**
* get task log path
* @return log path
*/
private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
/** /**
* build ack command * build ack command
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
@ -185,7 +159,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(taskExecutionContext.getStartTime()); ackCommand.setStartTime(taskExecutionContext.getStartTime());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@ -17,14 +17,15 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import java.util.HashSet;
import java.util.Set;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.HashSet;
import java.util.Set;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;

66
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.sift.SiftingAppender;
@RunWith(MockitoJUnitRunner.class)
public class LogUtilsTest {
@Test
public void testGetTaskLogPath() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(100);
taskInstance.setId(1000);
Logger rootLogger = (Logger) LoggerFactory.getILoggerFactory().getLogger("ROOT");
Assert.assertNotNull(rootLogger);
SiftingAppender appender = Mockito.mock(SiftingAppender.class);
// it's a trick to mock logger.getAppend("TASKLOGFILE")
Mockito.when(appender.getName()).thenReturn("TASKLOGFILE");
rootLogger.addAppender(appender);
Path logBase = Paths.get("path").resolve("to").resolve("test");
TaskLogDiscriminator taskLogDiscriminator = Mockito.mock(TaskLogDiscriminator.class);
Mockito.when(taskLogDiscriminator.getLogBase()).thenReturn(logBase.toString());
Mockito.when(appender.getDiscriminator()).thenReturn(taskLogDiscriminator);
Path logPath = Paths.get(".").toAbsolutePath().getParent()
.resolve(logBase)
.resolve("1").resolve("100").resolve("1000.log");
Assert.assertEquals(logPath.toString(), LogUtils.getTaskLogPath(taskInstance));
}
}

71
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -277,6 +277,12 @@
:backfill-item="backfillItem" :backfill-item="backfillItem"
:pre-node="preNode"> :pre-node="preNode">
</m-conditions> </m-conditions>
<!-- Pre-tasks in workflow -->
<m-pre-tasks
v-if="['SHELL', 'SUB_PROCESS'].indexOf(taskType) > -1"
@on-pre-tasks="_onPreTasks"
ref="PRE_TASK"
:backfill-item="backfillItem"></m-pre-tasks>
</div> </div>
</div> </div>
<div class="bottom-box"> <div class="bottom-box">
@ -310,6 +316,7 @@
import mSelectInput from './_source/selectInput' import mSelectInput from './_source/selectInput'
import mTimeoutAlarm from './_source/timeoutAlarm' import mTimeoutAlarm from './_source/timeoutAlarm'
import mWorkerGroups from './_source/workerGroups' import mWorkerGroups from './_source/workerGroups'
import mPreTasks from './tasks/pre_tasks'
import clickoutside from '@/module/util/clickoutside' import clickoutside from '@/module/util/clickoutside'
import disabledState from '@/module/mixin/disabledState' import disabledState from '@/module/mixin/disabledState'
import { isNameExDag, rtBantpl } from './../plugIn/util' import { isNameExDag, rtBantpl } from './../plugIn/util'
@ -369,7 +376,11 @@
value: 'failed', value: 'failed',
label: `${i18n.$t('failed')}` label: `${i18n.$t('failed')}`
} }
] ],
// preTasks
preTaskIdsInWorkflow: [],
preTasksToAdd: [], // pre-taskIds to add, used in jsplumb connects
preTasksToDelete: [], // pre-taskIds to delete, used in jsplumb connects
} }
}, },
/** /**
@ -393,6 +404,14 @@
_onDependent (o) { _onDependent (o) {
this.dependence = Object.assign(this.dependence, {}, o) this.dependence = Object.assign(this.dependence, {}, o)
}, },
/**
* Pre-tasks in workflow
*/
_onPreTasks (o) {
this.preTaskIdsInWorkflow = o.preTasks
this.preTasksToAdd = o.preTasksToAdd
this.preTasksToDelete = o.preTasksToDelete
},
/** /**
* cache dependent * cache dependent
*/ */
@ -543,6 +562,43 @@
if (!this.$refs[this.taskType]._verification()) { if (!this.$refs[this.taskType]._verification()) {
return return
} }
// Verify preTasks and update dag-things
if (this.$refs['PRE_TASK']) {
if (!this.$refs['PRE_TASK']._verification()) {
return
}
else {
// Sync data-targetarr
$(`#${this.id}`).attr(
'data-targetarr', this.preTaskIdsInWorkflow ? this.preTaskIdsInWorkflow.join(',') : '')
// Update JSP connections
let plumbIns = JSP.JspInstance
var targetId = this.id
// Update new connections
this.preTasksToAdd.map(sourceId => {
plumbIns.connect({
source: sourceId,
target: targetId,
type: 'basic',
paintStyle: { strokeWidth: 2, stroke: '#2d8cf0' },
HoverPaintStyle: {stroke: '#ccc', strokeWidth: 3}
})
})
// Update remove connections
let currentConnects = plumbIns.getAllConnections()
let len = currentConnects.length
for (let i = 0; i < len; i++) {
if (this.preTasksToDelete.indexOf(currentConnects[i].sourceId) > -1 && currentConnects[i].targetId == targetId) {
plumbIns.deleteConnection(currentConnects[i])
i -= 1
len -= 1
}
}
}
}
$(`#${this.id}`).find('span').text(this.name) $(`#${this.id}`).find('span').text(this.name)
this.conditionResult.successNode[0] = this.successBranch this.conditionResult.successNode[0] = this.successBranch
@ -684,6 +740,16 @@
} }
this.cacheBackfillItem = JSON.parse(JSON.stringify(o)) this.cacheBackfillItem = JSON.parse(JSON.stringify(o))
this.isContentBox = true this.isContentBox = true
// Init value of preTask selector
let preTaskIds = $(`#${this.id}`).attr('data-targetarr')
if (!_.isEmpty(this.backfillItem)) {
if (preTaskIds && preTaskIds.length) {
this.backfillItem.preTasks = preTaskIds.split(',')
} else {
this.backfillItem.preTasks = []
}
}
}, },
mounted () { mounted () {
let self = this let self = this
@ -745,7 +811,8 @@
mSelectInput, mSelectInput,
mTimeoutAlarm, mTimeoutAlarm,
mPriority, mPriority,
mWorkerGroups mWorkerGroups,
mPreTasks,
} }
} }
</script> </script>

108
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/pre_tasks.vue

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<div class="pre_tasks-model">
<div class="clearfix list">
<div class="text-box">
<span>{{$t('Pre tasks')}}</span>
</div>
<div class="cont-box">
<div class="label-box">
<x-select
ref="preTasksSelector"
style="width: 100%;"
filterable
multiple
v-model="preTasks"
:disabled="isDetails"
:id="preTasksSelectorId">
<x-option
v-for="task in preTaskList"
:key="task.id"
:value="task.id"
:label="task.name">
</x-option>
</x-select>
</div>
</div>
</div>
</div>
</template>
<script>
import disabledState from '@/module/mixin/disabledState'
export default {
name: 'pre_tasks',
mixins: [disabledState],
props: {
backfillItem: Object
},
data () {
return {
preTasksSelectorId: '_preTasksSelectorId', // Refresh target vue-component by changing id
preTasks: [],
preTasksOld: [],
}
},
mounted () {
this.preTasks = this.backfillItem['preTasks'] || this.preTasks
this.preTasksOld = this.preTasks
// Refresh target vue-component by changing id
this.$nextTick(() => {
this.preTasksSelectorId = 'preTasksSelectorId'
})
},
computed: {
preTaskList: function () {
let currentTaskId = this.backfillItem['id'] || this.id
let cacheTasks = Object.assign({}, this.store.state.dag.tasks)
let keys = Object.keys(cacheTasks)
for (let i = 0; i < keys.length; i++) {
let key = keys[i]
if ((!cacheTasks[key].id || !cacheTasks[key].name) || (currentTaskId && cacheTasks[key].id === currentTaskId)) {
// Clean undefined and current task data
delete cacheTasks[key]
}
}
return cacheTasks
},
// preTaskIds used to create new connection
preTasksToAdd: function () {
let toAddTasks = this.preTasks.filter(taskId => {
return (this.preTasksOld.indexOf(taskId) === -1)
})
return toAddTasks
},
// preTaskIds used to delete connection
preTasksToDelete: function () {
return this.preTasksOld.filter(taskId => this.preTasks.indexOf(taskId) === -1)
},
},
methods: {
// Pass data to parent-level to process dag
_verification () {
this.$emit('on-pre-tasks', {
preTasks: this.preTasks,
preTasksToAdd: this.preTasksToAdd,
preTasksToDelete: this.preTasksToDelete,
})
return true
}
}
}
</script>

24
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/versions.vue

@ -25,13 +25,10 @@
<table class="fixed"> <table class="fixed">
<caption><!-- placeHolder --></caption> <caption><!-- placeHolder --></caption>
<tr> <tr>
<th scope="col"> <th scope="col" style="min-width: 40px;text-align: left">
<span>#</span> <span>{{$t('Version')}}</span>
</th> </th>
<th scope="col" style="min-width: 40px"> <th scope="col" style="min-width: 30px">
<span>Version</span>
</th>
<th scope="col" style="min-width: 200px;max-width: 300px;">
<span>{{$t('Description')}}</span> <span>{{$t('Description')}}</span>
</th> </th>
<th scope="col" style="min-width: 50px"> <th scope="col" style="min-width: 50px">
@ -42,9 +39,6 @@
</th> </th>
</tr> </tr>
<tr v-for="(item, $index) in processDefinitionVersions" :key="item.id"> <tr v-for="(item, $index) in processDefinitionVersions" :key="item.id">
<td>
<span>-</span>
</td>
<td> <td>
<span v-if="item.version"> <span v-if="item.version">
<span v-if="item.version === processDefinition.version" style="color: green"><strong>{{item.version}} {{$t('Current Version')}}</strong></span> <span v-if="item.version === processDefinition.version" style="color: green"><strong>{{item.version}} {{$t('Current Version')}}</strong></span>
@ -52,7 +46,7 @@
</span> </span>
<span v-else>-</span> <span v-else>-</span>
</td> </td>
<td> <td style="word-break:break-all;">
<span v-if="item.description">{{item.description}}</span> <span v-if="item.description">{{item.description}}</span>
<span v-else>-</span> <span v-else>-</span>
</td> </td>
@ -64,7 +58,7 @@
<x-poptip <x-poptip
:ref="'poptip-switch-version-' + $index" :ref="'poptip-switch-version-' + $index"
placement="top-end" placement="top-end"
width="90"> width="260">
<p>{{$t('Confirm Switch To This Version?')}}</p> <p>{{$t('Confirm Switch To This Version?')}}</p>
<div style="text-align: right; margin: 0;padding-top: 4px;"> <div style="text-align: right; margin: 0;padding-top: 4px;">
<x-button type="text" size="xsmall" shape="circle" @click="_closeSwitchVersion($index)"> <x-button type="text" size="xsmall" shape="circle" @click="_closeSwitchVersion($index)">
@ -195,18 +189,14 @@
* Close the switch version layer * Close the switch version layer
*/ */
_closeSwitchVersion (i) { _closeSwitchVersion (i) {
if (i > 0) { this.$refs[`poptip-switch-version-${i}`][0].doClose()
this.$refs[`poptip-switch-version-${i}`][0].doClose()
}
}, },
/** /**
* Close the delete layer * Close the delete layer
*/ */
_closeDelete (i) { _closeDelete (i) {
if (i > 0) { this.$refs[`poptip-delete-${i}`][0].doClose()
this.$refs[`poptip-delete-${i}`][0].doClose()
}
}, },
/** /**

4
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -641,5 +641,7 @@ export default {
'Batch copy': 'Batch copy', 'Batch copy': 'Batch copy',
'Related items': 'Related items', 'Related items': 'Related items',
'Project name is required': 'Project name is required', 'Project name is required': 'Project name is required',
'Batch move': 'Batch move' 'Batch move': 'Batch move',
Version: 'Version',
'Pre tasks': 'Pre tasks',
} }

4
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -641,5 +641,7 @@ export default {
'Batch copy': '批量复制', 'Batch copy': '批量复制',
'Related items': '关联项目', 'Related items': '关联项目',
'Project name is required': '项目名称必填', 'Project name is required': '项目名称必填',
'Batch move': '批量移动' 'Batch move': '批量移动',
Version: '版本',
'Pre tasks': '前置任务',
} }

1
pom.xml

@ -833,6 +833,7 @@
<include>**/server/utils/DataxUtilsTest.java</include> <include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include> <include>**/server/utils/ExecutionContextTestUtils.java</include>
<!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>--> <!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
<include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include> <include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include> <include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include> <include>**/server/utils/SparkArgsUtilsTest.java</include>

3
sql/dolphinscheduler-postgre.sql

@ -523,7 +523,8 @@ CREATE TABLE t_ds_resources (
pid int, pid int,
full_name varchar(64), full_name varchar(64),
is_directory int, is_directory int,
PRIMARY KEY (id) PRIMARY KEY (id),
CONSTRAINT t_ds_resources_un UNIQUE (full_name, type)
) ; ) ;

3
sql/dolphinscheduler_mysql.sql

@ -657,7 +657,8 @@ CREATE TABLE `t_ds_resources` (
`pid` int(11) DEFAULT NULL, `pid` int(11) DEFAULT NULL,
`full_name` varchar(64) DEFAULT NULL, `full_name` varchar(64) DEFAULT NULL,
`is_directory` tinyint(4) DEFAULT NULL, `is_directory` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`id`) PRIMARY KEY (`id`),
UNIQUE KEY `t_ds_resources_un` (`full_name`,`type`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ---------------------------- -- ----------------------------

26
sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql

@ -87,3 +87,29 @@ delimiter ;
CALL ct_dolphin_T_t_ds_process_definition_version; CALL ct_dolphin_T_t_ds_process_definition_version;
DROP PROCEDURE ct_dolphin_T_t_ds_process_definition_version; DROP PROCEDURE ct_dolphin_T_t_ds_process_definition_version;
-- add t_ds_resources_un
DROP PROCEDURE IF EXISTS uc_dolphin_T_t_ds_resources_un;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_resources_un()
BEGIN
IF NOT EXISTS (
SELECT * FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_NAME = 't_ds_resources'
AND CONSTRAINT_NAME = 't_ds_resources_un'
)
THEN
ALTER TABLE t_ds_resources ADD CONSTRAINT t_ds_resources_un UNIQUE KEY (full_name,`type`);
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_resources_un();
DROP PROCEDURE IF EXISTS uc_dolphin_T_t_ds_resources_un;

28
sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql

@ -79,4 +79,30 @@ d//
delimiter ; delimiter ;
SELECT ct_dolphin_T_t_ds_process_definition_version(); SELECT ct_dolphin_T_t_ds_process_definition_version();
DROP FUNCTION IF EXISTS ct_dolphin_T_t_ds_process_definition_version(); DROP FUNCTION IF EXISTS ct_dolphin_T_t_ds_process_definition_version();
-- add t_ds_resources_un
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_resources_un() RETURNS void AS $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_NAME = 't_ds_resources'
AND CONSTRAINT_NAME = 't_ds_resources_un'
)
THEN
ALTER TABLE t_ds_resources ADD CONSTRAINT t_ds_resources_un UNIQUE (full_name,"type");
END IF;
END;
$$ LANGUAGE plpgsql;
SELECT uc_dolphin_T_t_ds_resources_un();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_resources_un();

Loading…
Cancel
Save