Browse Source

Merge remote-tracking branch 'upstream/dev-1.1.0' into dev-1.1.0

pull/2/head
huyuanming 6 years ago
parent
commit
5d19384f61
  1. 2
      escheduler-alert/pom.xml
  2. 5
      escheduler-api/pom.xml
  3. 10
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java
  4. 7
      escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java
  5. 2
      escheduler-common/pom.xml
  6. 34
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  7. 2
      escheduler-dao/pom.xml
  8. 3
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  9. 18
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java
  10. 15
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java
  11. 2
      escheduler-rpc/pom.xml
  12. 2
      escheduler-server/pom.xml
  13. 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  14. 9
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
  15. 29
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
  16. 4
      escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue
  17. 2
      pom.xml
  18. 2
      sql/soft_version
  19. 1
      sql/upgrade/1.1.0_schema/mysql/escheduler_dml.sql

2
escheduler-alert/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.4-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-alert</artifactId> <artifactId>escheduler-alert</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>

5
escheduler-api/pom.xml

@ -1,10 +1,9 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.4-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-api</artifactId> <artifactId>escheduler-api</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>

10
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java

@ -115,12 +115,18 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult; return checkResult;
} }
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId); ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId);
String workerGroupName = "";
if(processInstance.getWorkerGroupId() == -1){ if(processInstance.getWorkerGroupId() == -1){
processInstance.setWorkerGroupName(DEFAULT); workerGroupName = DEFAULT;
}else{ }else{
WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId()); WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId());
processInstance.setWorkerGroupName(workerGroup.getName()); if(workerGroup != null){
workerGroupName = DEFAULT;
}else{
workerGroupName = workerGroup.getName();
}
} }
processInstance.setWorkerGroupName(workerGroupName);
ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setReceivers(processDefinition.getReceivers()); processInstance.setReceivers(processDefinition.getReceivers());
processInstance.setReceiversCc(processDefinition.getReceiversCc()); processInstance.setReceiversCc(processDefinition.getReceiversCc());

7
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java

@ -369,7 +369,12 @@ public class ResourcesService extends BaseService {
public Map<String, Object> queryResourceList(User loginUser, ResourceType type) { public Map<String, Object> queryResourceList(User loginUser, ResourceType type) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
List<Resource> resourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal()); List<Resource> resourceList;
if(isAdmin(loginUser)){
resourceList = resourcesMapper.listAllResourceByType(type.ordinal());
}else{
resourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal());
}
result.put(Constants.DATA_LIST, resourceList); result.put(Constants.DATA_LIST, resourceList);
putMsg(result,Status.SUCCESS); putMsg(result,Status.SUCCESS);

2
escheduler-common/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.4-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-common</artifactId> <artifactId>escheduler-common</artifactId>
<name>escheduler-common</name> <name>escheduler-common</name>

34
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -22,6 +22,7 @@ import cn.escheduler.common.utils.Bytes;
import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
@ -157,7 +158,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String taskDetail = list.get(i); String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
//向前版本兼容 //forward compatibility 向前版本兼容
if(taskDetailArrs.length >= 4){ if(taskDetailArrs.length >= 4){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
@ -209,17 +210,36 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
while(iterator.hasNext()){ while(iterator.hasNext()){
if(j++ < tasksNum){ if(j++ < tasksNum){
String task = iterator.next(); String task = iterator.next();
String[] taskArray = task.split(Constants.UNDERLINE);
int processInstanceId = Integer.parseInt(taskArray[1]); taskslist.add(getOriginTaskFormat(task));
int taskId = Integer.parseInt(taskArray[3]);
String destTask = taskArray[0]+Constants.UNDERLINE + processInstanceId + Constants.UNDERLINE
+ taskArray[2] + Constants.UNDERLINE + taskId;
taskslist.add(destTask);
} }
} }
return taskslist; return taskslist;
} }
/**
* format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* processInstanceId and task id need to be convert to int.
* @param formatTask
* @return
*/
private String getOriginTaskFormat(String formatTask){
String[] taskArray = formatTask.split(Constants.UNDERLINE);
int processInstanceId = Integer.parseInt(taskArray[1]);
int taskId = Integer.parseInt(taskArray[3]);
StringBuilder sb = new StringBuilder(50);
String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId);
sb.append(destTask);
if(taskArray.length > 4){
for(int index = 4; index < taskArray.length; index++){
sb.append(Constants.UNDERLINE).append(taskArray[index]);
}
}
return sb.toString();
}
@Override @Override
public void removeNode(String key, String nodeValue){ public void removeNode(String key, String nodeValue){

2
escheduler-dao/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.4-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-dao</artifactId> <artifactId>escheduler-dao</artifactId>
<name>escheduler-dao</name> <name>escheduler-dao</name>

3
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -493,7 +493,8 @@ public class ProcessDao extends AbstractBaseDao {
processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
// set process instance priority // set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
processInstance.setWorkerGroupId(command.getWorkerGroupId()); int workerGroupId = command.getWorkerGroupId() == 0 ? -1 : command.getWorkerGroupId();
processInstance.setWorkerGroupId(workerGroupId);
processInstance.setTimeout(processDefinition.getTimeout()); processInstance.setTimeout(processDefinition.getTimeout());
processInstance.setTenantId(processDefinition.getTenantId()); processInstance.setTenantId(processDefinition.getTenantId());
return processInstance; return processInstance;

18
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java

@ -274,5 +274,21 @@ public interface ResourceMapper {
@SelectProvider(type = ResourceMapperProvider.class, method = "queryTenantCodeByResourceName") @SelectProvider(type = ResourceMapperProvider.class, method = "queryTenantCodeByResourceName")
String queryTenantCodeByResourceName(@Param("resName") String resName); String queryTenantCodeByResourceName(@Param("resName") String resName);
/**
* query resource list that the appointed user has permission
* @param type
* @return
*/
@Results(value = {@Result(property = "id", column = "id", id = true, javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "alias", column = "alias", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "fileName", column = "file_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "desc", column = "desc", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "type", column = "type", typeHandler = EnumOrdinalTypeHandler.class, javaType = ResourceType.class, jdbcType = JdbcType.TINYINT),
@Result(property = "size", column = "size", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE)
})
@SelectProvider(type = ResourceMapperProvider.class, method = "listAllResourceByType")
List<Resource> listAllResourceByType(@Param("type") Integer type);
} }

15
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java

@ -295,4 +295,19 @@ public class ResourceMapperProvider {
WHERE("type = #{type} and user_id = #{userId}"); WHERE("type = #{type} and user_id = #{userId}");
}}.toString(); }}.toString();
} }
/**
* list all resource by type
*
* @param parameter
* @return
*/
public String listAllResourceByType(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
WHERE("type = #{type}");
}}.toString();
}
} }

2
escheduler-rpc/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.4-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

2
escheduler-server/pom.xml

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.4-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-server</artifactId> <artifactId>escheduler-server</artifactId>
<name>escheduler-server</name> <name>escheduler-server</name>

2
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -153,7 +153,7 @@ public class FetchTaskThread implements Runnable{
} }
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; String taskInstIdStr = taskStringArray[3];
Date now = new Date(); Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr); Integer taskId = Integer.parseInt(taskInstIdStr);

9
escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java

@ -18,6 +18,7 @@ package cn.escheduler.server.worker.task;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.FileUtils; import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.PropertyUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -71,11 +72,11 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
if (!Files.exists(Paths.get(commandFile))) { if (!Files.exists(Paths.get(commandFile))) {
logger.info("generate command file:{}", commandFile); logger.info("generate command file:{}", commandFile);
StringBuilder sb = new StringBuilder(200); StringBuilder sb = new StringBuilder();
sb.append("#-*- encoding=utf8 -*-\n"); sb.append("#-*- encoding=utf8 -*-\n");
sb.append("\n\n"); sb.append("\n\n");
sb.append(String.format("import py_%s_node\n",taskAppId)); sb.append(execCommand);
logger.info(sb.toString()); logger.info(sb.toString());
// write data to file // write data to file
@ -86,8 +87,8 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
@Override @Override
protected String commandType() { protected String commandType() {
String envPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH + "conf"+ String envPath = PropertyUtils.getString(Constants.ESCHEDULER_ENV_PATH);
Constants.SINGLE_SLASH +"env" + Constants.SINGLE_SLASH + Constants.ESCHEDULER_ENV_SH;
String pythonHome = getPythonHome(envPath); String pythonHome = getPythonHome(envPath);
if (StringUtils.isEmpty(pythonHome)){ if (StringUtils.isEmpty(pythonHome)){
return PYTHON; return PYTHON;

29
escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java

@ -112,14 +112,14 @@ public class PythonTask extends AbstractTask {
*/ */
private String buildCommand() throws Exception { private String buildCommand() throws Exception {
// generate scripts // generate scripts
String fileName = String.format("%s/py_%s_node.py", taskDir, taskProps.getTaskAppId()); // String fileName = String.format("%s/py_%s_node.py", taskDir, taskProps.getTaskAppId());
Path path = new File(fileName).toPath(); // Path path = new File(fileName).toPath();
if (Files.exists(path)) { // if (Files.exists(path)) {
return fileName; // return fileName;
} // }
String rawScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); String rawScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
@ -140,19 +140,20 @@ public class PythonTask extends AbstractTask {
} }
pythonParameters.setRawScript(rawScript); // pythonParameters.setRawScript(rawScript);
logger.info("raw script : {}", pythonParameters.getRawScript()); logger.info("raw script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir); logger.info("task dir : {}", taskDir);
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-xr-x"); // Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-xr-x");
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms); // FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
//
Files.createFile(path, attr); // Files.createFile(path, attr);
//
Files.write(path, pythonParameters.getRawScript().getBytes(), StandardOpenOption.APPEND); // Files.write(path, pythonParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
//
return fileName; // return fileName;
return rawScript;
} }
@Override @Override

4
escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue

@ -21,7 +21,7 @@
</div> </div>
</div> </div>
<div class="clearfix list"> <div class="clearfix list">
<x-button type="info" style="margin-left:20px" shape="circle" :loading="spinnerLoading" @click="preview()" v-ps="['GENERAL_USER']">执行时间</x-button> <x-button type="info" style="margin-left:20px" shape="circle" :loading="spinnerLoading" @click="preview()">执行时间</x-button>
<div class="text"> <div class="text">
{{$t('Timing')}} {{$t('Timing')}}
</div> </div>
@ -136,7 +136,7 @@
</div> </div>
<div class="submit"> <div class="submit">
<x-button type="text" @click="close()"> {{$t('Cancel')}} </x-button> <x-button type="text" @click="close()"> {{$t('Cancel')}} </x-button>
<x-button type="primary" shape="circle" :loading="spinnerLoading" @click="ok()" v-ps="['GENERAL_USER']">{{spinnerLoading ? 'Loading...' : (item.crontab ? $t('Edit') : $t('Create'))}} </x-button> <x-button type="primary" shape="circle" :loading="spinnerLoading" @click="ok()">{{spinnerLoading ? 'Loading...' : (item.crontab ? $t('Edit') : $t('Create'))}} </x-button>
</div> </div>
</div> </div>
</template> </template>

2
pom.xml

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.4-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>escheduler</name> <name>escheduler</name>
<url>http://maven.apache.org</url> <url>http://maven.apache.org</url>

2
sql/soft_version

@ -1 +1 @@
1.0.4 1.1.0

1
sql/upgrade/1.1.0_schema/mysql/escheduler_dml.sql

@ -1 +0,0 @@
INSERT INTO `t_escheduler_version` (`version`) VALUES ('1.1.0');
Loading…
Cancel
Save