Browse Source

Fix [Bug] process definition json worker group convert #2794 (#2795)

* add LoggerServerTest UT

* add LoggerServerTest UT

* add LoggerServerTest UT
add RemoveTaskLogRequestCommandTest UT
add RemoveTaskLogResponseCommandTest

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* add not worker log and remove worker invalid property

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

* process definition json worker group convert #2794

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/3/MERGE
qiaozhanwei 5 years ago committed by GitHub
parent
commit
0130da2bdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  2. 49
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java
  3. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java
  4. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
  5. 92
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
  6. 48
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  7. 65
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java
  8. 40
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
  9. 65
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java
  10. 36
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java
  11. 51
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java
  12. 4
      pom.xml

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -124,6 +124,11 @@ public class TaskNode {
*/
private String workerGroup;
/**
* worker group id
*/
private Integer workerGroupId;
/**
* task time out
@ -341,4 +346,12 @@ public class TaskNode {
public void setConditionResult(String conditionResult) {
this.conditionResult = conditionResult;
}
public Integer getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(Integer workerGroupId) {
this.workerGroupId = workerGroupId;
}
}

49
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java

@ -33,47 +33,12 @@ public class ShellExecutorTest {
@Test
public void execCommand() throws InterruptedException {
ThreadPoolExecutors executors = ThreadPoolExecutors.getInstance();
CountDownLatch latch = new CountDownLatch(200);
executors.execute(new Runnable() {
@Override
public void run() {
try {
int i =0;
while(i++ <= 100){
String res = ShellExecutor.execCommand("groups");
logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0,5));
Thread.sleep(100l);
latch.countDown();
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});
executors.execute(new Runnable() {
@Override
public void run() {
try {
int i =0;
while(i++ <= 100){
String res = ShellExecutor.execCommand("whoami");
logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result2:" + res);
Thread.sleep(100l);
latch.countDown();
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});
latch.await();
try {
String res = ShellExecutor.execCommand("groups");
logger.info("thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0, 5));
} catch (Exception e) {
e.printStackTrace();
}
}
}

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java

@ -102,4 +102,14 @@ public class ProcessData {
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
@Override
public String toString() {
return "ProcessData{" +
"tasks=" + tasks +
", globalParams=" + globalParams +
", timeout=" + timeout +
", tenantId=" + tenantId +
'}';
}
}

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java

@ -16,9 +16,6 @@
*/
package org.apache.dolphinscheduler.dao.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.util.Date;
import java.util.List;

92
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
public class ProcessDefinitionDao {
public static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionDao.class);
/**
* queryAllProcessDefinition
* @param conn jdbc connection
* @return ProcessDefinition Json List
*/
public Map<Integer,String> queryAllProcessDefinition(Connection conn){
Map<Integer,String> processDefinitionJsonMap = new HashMap<>();
String sql = String.format("SELECT id,process_definition_json FROM t_ds_process_definition");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()){
Integer id = rs.getInt(1);
String processDefinitionJson = rs.getString(2);
processDefinitionJsonMap.put(id,processDefinitionJson);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return processDefinitionJsonMap;
}
/**
* updateProcessDefinitionJson
* @param conn jdbc connection
* @param processDefinitionJsonMap processDefinitionJsonMap
*/
public void updateProcessDefinitionJson(Connection conn,Map<Integer,String> processDefinitionJsonMap){
String sql = "UPDATE t_ds_process_definition SET process_definition_json=? where id=?";
try {
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()){
try(PreparedStatement pstmt= conn.prepareStatement(sql)) {
pstmt.setString(1,entry.getValue());
pstmt.setInt(2,entry.getKey());
pstmt.executeUpdate();
}
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

48
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -16,14 +16,12 @@
*/
package org.apache.dolphinscheduler.dao.upgrade;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,6 +32,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.*;
public abstract class UpgradeDao extends AbstractBaseDao {
@ -44,6 +43,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
protected static final DataSource dataSource = getDataSource();
private static final DbType dbType = getCurrentDbType();
@Override
protected void init() {
@ -119,6 +119,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
// Execute the dolphinscheduler DML, it can be rolled back
runInitDML(initSqlPath);
}
/**
@ -256,6 +257,43 @@ public abstract class UpgradeDao extends AbstractBaseDao {
upgradeDolphinSchedulerDML(schemaDir);
updateProcessDefinitionJsonWorkerGroup();
}
/**
* updateProcessDefinitionJsonWorkerGroup
*/
protected void updateProcessDefinitionJsonWorkerGroup(){
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
ProcessData processData = JSONUtils.parseObject(entry.getValue(), ProcessData.class);
List<TaskNode> tasks = processData.getTasks();
for (TaskNode taskNode : tasks){
Integer workerGroupId = taskNode.getWorkerGroupId();
if (workerGroupId == -1){
taskNode.setWorkerGroup("default");
}else {
taskNode.setWorkerGroup(oldWorkerGroupMap.get(workerGroupId));
}
}
replaceProcessDefinitionMap.put(entry.getKey(),JSONUtils.toJson(processData));
}
if (replaceProcessDefinitionMap.size() > 0){
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
}
}catch (Exception e){
logger.error("update process definition json workergroup error",e);
}
}
/**

65
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class WorkerGroupDao {
public static final Logger logger = LoggerFactory.getLogger(WorkerGroupDao.class);
/**
* query all old worker group
* @param conn jdbc connection
* @return old worker group Map
*/
public Map<Integer,String> queryAllOldWorkerGroup(Connection conn){
Map<Integer,String> workerGroupMap = new HashMap<>();
String sql = String.format("select id,name from t_ds_worker_group");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()){
int id = rs.getInt(1);
String name = rs.getString(2);
workerGroupMap.put(id,name);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return workerGroupMap;
}
}

40
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml

@ -1,40 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper">
<select id="queryAllWorkerGroup" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
select *
from t_ds_worker_group
order by update_time desc
</select>
<select id="queryWorkerGroupByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
select *
from t_ds_worker_group
where name = #{name}
</select>
<select id="queryListPaging" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
select *
from t_ds_worker_group
where 1 = 1
<if test="searchVal != null and searchVal != ''">
and name like concat('%', #{searchVal}, '%')
</if>
order by update_time desc
</select>
</mapper>

65
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.Test;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
public class ProcessDefinitionDaoTest {
final DataSource dataSource = getDataSource();
final ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
@Test
public void testQueryAllProcessDefinition() throws Exception{
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
assertThat(processDefinitionJsonMap.size(),greaterThanOrEqualTo(0));
}
@Test
public void testUpdateProcessDefinitionJson() throws Exception{
Map<Integer,String> processDefinitionJsonMap = new HashMap<>();
processDefinitionJsonMap.put(1,"test");
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),processDefinitionJsonMap);
}
@Test(expected = Exception.class)
public void testQueryAllProcessDefinitionException() throws Exception{
processDefinitionDao.queryAllProcessDefinition(null);
}
@Test(expected = Exception.class)
public void testUpdateProcessDefinitionJsonException() throws Exception{
processDefinitionDao.updateProcessDefinitionJson(null,null);
}
}

36
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.Test;
import javax.sql.DataSource;
import java.util.Map;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
public class UpgradeDaoTest {
PostgresqlUpgradeDao postgresqlUpgradeDao = PostgresqlUpgradeDao.getInstance();
@Test
public void testQueryQueryAllOldWorkerGroup() throws Exception{
postgresqlUpgradeDao.updateProcessDefinitionJsonWorkerGroup();
}
}

51
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.junit.Test;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
public class WokrerGrouopDaoTest {
protected final DataSource dataSource = getDataSource();
@Test
public void testQueryQueryAllOldWorkerGroup() throws Exception{
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
Map<Integer, String> workerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
assertThat(workerGroupMap.size(),greaterThanOrEqualTo(0));
}
@Test(expected = Exception.class)
public void testQueryQueryAllOldWorkerGroupException() throws Exception{
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
workerGroupDao.queryAllOldWorkerGroup(null);
}
}

4
pom.xml

@ -829,7 +829,6 @@
<include>**/dao/mapper/ProjectMapperTest.java</include>
<include>**/dao/mapper/ProjectUserMapperTest.java</include>
<include>**/dao/mapper/QueueMapperTest.java</include>
<!--<include>**/dao/mapper/ResourceMapperTest.java</include>-->
<include>**/dao/mapper/ResourceUserMapperTest.java</include>
<include>**/dao/mapper/ScheduleMapperTest.java</include>
<include>**/dao/mapper/SessionMapperTest.java</include>
@ -841,6 +840,9 @@
<include>**/dao/mapper/UserMapperTest.java</include>
<include>**/dao/utils/DagHelperTest.java</include>
<include>**/dao/AlertDaoTest.java</include>
<include>**/dao/upgrade/ProcessDefinitionDaoTest.java</include>
<include>**/dao/upgrade/WokrerGrouopDaoTest.java</include>
<include>**/dao/upgrade/UpgradeDaoTest.java</include>
<include>**/plugin/model/AlertDataTest.java</include>
<include>**/plugin/model/AlertInfoTest.java</include>
<include>**/plugin/utils/PropertyUtilsTest.java</include>

Loading…
Cancel
Save