diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index 35767a0a46..5881b026ff 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -124,6 +124,11 @@ public class TaskNode { */ private String workerGroup; + /** + * worker group id + */ + private Integer workerGroupId; + /** * task time out @@ -341,4 +346,12 @@ public class TaskNode { public void setConditionResult(String conditionResult) { this.conditionResult = conditionResult; } + + public Integer getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(Integer workerGroupId) { + this.workerGroupId = workerGroupId; + } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java index 70ca5e2f22..e21bc7765c 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java @@ -33,47 +33,12 @@ public class ShellExecutorTest { @Test public void execCommand() throws InterruptedException { - ThreadPoolExecutors executors = ThreadPoolExecutors.getInstance(); - CountDownLatch latch = new CountDownLatch(200); - - executors.execute(new Runnable() { - @Override - public void run() { - - try { - int i =0; - while(i++ <= 100){ - String res = ShellExecutor.execCommand("groups"); - logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0,5)); - Thread.sleep(100l); - latch.countDown(); - } - - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - } - } - }); - - executors.execute(new Runnable() { - @Override - public void run() { - - try { - int i =0; - while(i++ <= 100){ - String res = ShellExecutor.execCommand("whoami"); - logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result2:" + res); - Thread.sleep(100l); - latch.countDown(); - } - - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - } - } - }); - - latch.await(); + try { + String res = ShellExecutor.execCommand("groups"); + logger.info("thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0, 5)); + } catch (Exception e) { + e.printStackTrace(); + } } + } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java index b563487ac4..e9a6d994e8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java @@ -102,4 +102,14 @@ public class ProcessData { public void setTenantId(int tenantId) { this.tenantId = tenantId; } + + @Override + public String toString() { + return "ProcessData{" + + "tasks=" + tasks + + ", globalParams=" + globalParams + + ", timeout=" + timeout + + ", tenantId=" + tenantId + + '}'; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java index bce963686c..a2cc3fdf6c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java @@ -16,9 +16,6 @@ */ package org.apache.dolphinscheduler.dao.entity; -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; import java.util.Date; import java.util.List; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java new file mode 100644 index 0000000000..768f75c82a --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.upgrade; + +import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.*; + +public class ProcessDefinitionDao { + + + public static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionDao.class); + + /** + * queryAllProcessDefinition + * @param conn jdbc connection + * @return ProcessDefinition Json List + */ + public Map queryAllProcessDefinition(Connection conn){ + + Map processDefinitionJsonMap = new HashMap<>(); + + String sql = String.format("SELECT id,process_definition_json FROM t_ds_process_definition"); + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + + while (rs.next()){ + Integer id = rs.getInt(1); + String processDefinitionJson = rs.getString(2); + processDefinitionJsonMap.put(id,processDefinitionJson); + } + + } catch (Exception e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(rs, pstmt, conn); + } + + return processDefinitionJsonMap; + } + + + /** + * updateProcessDefinitionJson + * @param conn jdbc connection + * @param processDefinitionJsonMap processDefinitionJsonMap + */ + public void updateProcessDefinitionJson(Connection conn,Map processDefinitionJsonMap){ + String sql = "UPDATE t_ds_process_definition SET process_definition_json=? where id=?"; + try { + for (Map.Entry entry : processDefinitionJsonMap.entrySet()){ + try(PreparedStatement pstmt= conn.prepareStatement(sql)) { + pstmt.setString(1,entry.getValue()); + pstmt.setInt(2,entry.getKey()); + pstmt.executeUpdate(); + } + + } + + } catch (Exception e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(conn); + } + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index e708620f8a..692351b5f0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -16,14 +16,12 @@ */ package org.apache.dolphinscheduler.dao.upgrade; -import com.alibaba.druid.pool.DruidDataSource; import org.apache.dolphinscheduler.common.enums.DbType; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; -import org.apache.dolphinscheduler.common.utils.SchemaUtils; -import org.apache.dolphinscheduler.common.utils.ScriptRunner; -import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.AbstractBaseDao; import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; +import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +32,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.text.MessageFormat; +import java.util.*; public abstract class UpgradeDao extends AbstractBaseDao { @@ -44,6 +43,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { protected static final DataSource dataSource = getDataSource(); private static final DbType dbType = getCurrentDbType(); + @Override protected void init() { @@ -119,6 +119,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { // Execute the dolphinscheduler DML, it can be rolled back runInitDML(initSqlPath); + } /** @@ -256,6 +257,43 @@ public abstract class UpgradeDao extends AbstractBaseDao { upgradeDolphinSchedulerDML(schemaDir); + updateProcessDefinitionJsonWorkerGroup(); + + + } + + /** + * updateProcessDefinitionJsonWorkerGroup + */ + protected void updateProcessDefinitionJsonWorkerGroup(){ + WorkerGroupDao workerGroupDao = new WorkerGroupDao(); + ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); + Map replaceProcessDefinitionMap = new HashMap<>(); + try { + Map oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection()); + Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); + + for (Map.Entry entry : processDefinitionJsonMap.entrySet()){ + ProcessData processData = JSONUtils.parseObject(entry.getValue(), ProcessData.class); + + List tasks = processData.getTasks(); + for (TaskNode taskNode : tasks){ + Integer workerGroupId = taskNode.getWorkerGroupId(); + if (workerGroupId == -1){ + taskNode.setWorkerGroup("default"); + }else { + taskNode.setWorkerGroup(oldWorkerGroupMap.get(workerGroupId)); + } + } + replaceProcessDefinitionMap.put(entry.getKey(),JSONUtils.toJson(processData)); + } + if (replaceProcessDefinitionMap.size() > 0){ + processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap); + } + }catch (Exception e){ + logger.error("update process definition json workergroup error",e); + } + } /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java new file mode 100644 index 0000000000..936b1d477e --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.upgrade; + +import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class WorkerGroupDao { + + public static final Logger logger = LoggerFactory.getLogger(WorkerGroupDao.class); + + /** + * query all old worker group + * @param conn jdbc connection + * @return old worker group Map + */ + public Map queryAllOldWorkerGroup(Connection conn){ + Map workerGroupMap = new HashMap<>(); + + String sql = String.format("select id,name from t_ds_worker_group"); + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + + while (rs.next()){ + int id = rs.getInt(1); + String name = rs.getString(2); + workerGroupMap.put(id,name); + } + + } catch (Exception e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(rs, pstmt, conn); + } + + return workerGroupMap; + } +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml deleted file mode 100644 index 84dd4db88d..0000000000 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - - - - - - - \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java new file mode 100644 index 0000000000..a7bbd5323d --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.dao.upgrade; + + +import org.junit.Test; + +import javax.sql.DataSource; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertThat; + +public class ProcessDefinitionDaoTest { + final DataSource dataSource = getDataSource(); + final ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); + + @Test + public void testQueryAllProcessDefinition() throws Exception{ + + Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); + + assertThat(processDefinitionJsonMap.size(),greaterThanOrEqualTo(0)); + } + + @Test + public void testUpdateProcessDefinitionJson() throws Exception{ + + Map processDefinitionJsonMap = new HashMap<>(); + processDefinitionJsonMap.put(1,"test"); + + processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),processDefinitionJsonMap); + + } + + @Test(expected = Exception.class) + public void testQueryAllProcessDefinitionException() throws Exception{ + processDefinitionDao.queryAllProcessDefinition(null); + + } + + @Test(expected = Exception.class) + public void testUpdateProcessDefinitionJsonException() throws Exception{ + processDefinitionDao.updateProcessDefinitionJson(null,null); + + } + + +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java new file mode 100644 index 0000000000..ed96e920f5 --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.dao.upgrade; + +import org.junit.Test; + +import javax.sql.DataSource; +import java.util.Map; + +import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertThat; + +public class UpgradeDaoTest { + PostgresqlUpgradeDao postgresqlUpgradeDao = PostgresqlUpgradeDao.getInstance(); + + @Test + public void testQueryQueryAllOldWorkerGroup() throws Exception{ + postgresqlUpgradeDao.updateProcessDefinitionJsonWorkerGroup(); + } + +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java new file mode 100644 index 0000000000..2c9b80a89d --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.dao.upgrade; + +import org.junit.Test; + +import javax.sql.DataSource; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertThat; + +public class WokrerGrouopDaoTest { + protected final DataSource dataSource = getDataSource(); + + @Test + public void testQueryQueryAllOldWorkerGroup() throws Exception{ + WorkerGroupDao workerGroupDao = new WorkerGroupDao(); + + Map workerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection()); + + assertThat(workerGroupMap.size(),greaterThanOrEqualTo(0)); + } + + @Test(expected = Exception.class) + public void testQueryQueryAllOldWorkerGroupException() throws Exception{ + WorkerGroupDao workerGroupDao = new WorkerGroupDao(); + + workerGroupDao.queryAllOldWorkerGroup(null); + + } + +} diff --git a/pom.xml b/pom.xml index 13a4c33eeb..d4b7a20fa4 100644 --- a/pom.xml +++ b/pom.xml @@ -829,7 +829,6 @@ **/dao/mapper/ProjectMapperTest.java **/dao/mapper/ProjectUserMapperTest.java **/dao/mapper/QueueMapperTest.java - **/dao/mapper/ResourceUserMapperTest.java **/dao/mapper/ScheduleMapperTest.java **/dao/mapper/SessionMapperTest.java @@ -841,6 +840,9 @@ **/dao/mapper/UserMapperTest.java **/dao/utils/DagHelperTest.java **/dao/AlertDaoTest.java + **/dao/upgrade/ProcessDefinitionDaoTest.java + **/dao/upgrade/WokrerGrouopDaoTest.java + **/dao/upgrade/UpgradeDaoTest.java **/plugin/model/AlertDataTest.java **/plugin/model/AlertInfoTest.java **/plugin/utils/PropertyUtilsTest.java