Wenjun Ruan
3 months ago
committed by
GitHub
101 changed files with 1873 additions and 1359 deletions
@ -1,63 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.dao.entity; |
|
||||||
|
|
||||||
import java.util.Date; |
|
||||||
|
|
||||||
import lombok.Data; |
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.annotation.IdType; |
|
||||||
import com.baomidou.mybatisplus.annotation.TableId; |
|
||||||
import com.baomidou.mybatisplus.annotation.TableName; |
|
||||||
|
|
||||||
@Data |
|
||||||
@TableName("t_ds_trigger_relation") |
|
||||||
public class TriggerRelation { |
|
||||||
|
|
||||||
/** |
|
||||||
* id |
|
||||||
*/ |
|
||||||
@TableId(value = "id", type = IdType.AUTO) |
|
||||||
private Integer id; |
|
||||||
|
|
||||||
/** |
|
||||||
* trigger code |
|
||||||
*/ |
|
||||||
private long triggerCode; |
|
||||||
|
|
||||||
/** |
|
||||||
* triggerType |
|
||||||
*/ |
|
||||||
private int triggerType; |
|
||||||
|
|
||||||
/** |
|
||||||
* jobId |
|
||||||
*/ |
|
||||||
private Integer jobId; |
|
||||||
|
|
||||||
/** |
|
||||||
* create time |
|
||||||
*/ |
|
||||||
private Date createTime; |
|
||||||
|
|
||||||
/** |
|
||||||
* update time |
|
||||||
*/ |
|
||||||
private Date updateTime; |
|
||||||
|
|
||||||
} |
|
@ -1,72 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.dao.mapper; |
|
||||||
|
|
||||||
import org.apache.dolphinscheduler.dao.entity.TriggerRelation; |
|
||||||
|
|
||||||
import org.apache.ibatis.annotations.Param; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
|
||||||
|
|
||||||
/** |
|
||||||
* triggerRelation mapper interface
|
|
||||||
*/ |
|
||||||
public interface TriggerRelationMapper extends BaseMapper<TriggerRelation> { |
|
||||||
|
|
||||||
/** |
|
||||||
* query by code and id |
|
||||||
* @param triggerType |
|
||||||
* @param jobId |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
List<TriggerRelation> queryByTypeAndJobId(@Param("triggerType") Integer triggerType, @Param("jobId") int jobId); |
|
||||||
|
|
||||||
/** |
|
||||||
* query triggerRelation by code |
|
||||||
* |
|
||||||
* @param triggerCode triggerCode |
|
||||||
* @return triggerRelation |
|
||||||
*/ |
|
||||||
List<TriggerRelation> queryByTriggerRelationCode(@Param("triggerCode") Long triggerCode); |
|
||||||
|
|
||||||
/** |
|
||||||
* query triggerRelation by code |
|
||||||
* |
|
||||||
* @param triggerCode triggerCode |
|
||||||
* @return triggerRelation |
|
||||||
*/ |
|
||||||
List<TriggerRelation> queryByTriggerRelationCodeAndType(@Param("triggerCode") Long triggerCode, |
|
||||||
@Param("triggerType") Integer triggerType); |
|
||||||
|
|
||||||
/** |
|
||||||
* delete triggerRelation by code |
|
||||||
* |
|
||||||
* @param triggerCode triggerCode |
|
||||||
* @return int |
|
||||||
*/ |
|
||||||
int deleteByCode(@Param("triggerCode") Long triggerCode); |
|
||||||
|
|
||||||
/** |
|
||||||
* if exist update else insert |
|
||||||
* |
|
||||||
* @param triggerRelation |
|
||||||
*/ |
|
||||||
void upsert(@Param("triggerRelation") TriggerRelation triggerRelation); |
|
||||||
} |
|
@ -0,0 +1,23 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.repository; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.dao.entity.User; |
||||||
|
|
||||||
|
public interface UserDao extends IDao<User> { |
||||||
|
} |
@ -0,0 +1,36 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.repository.impl; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.dao.entity.User; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.UserMapper; |
||||||
|
import org.apache.dolphinscheduler.dao.repository.BaseDao; |
||||||
|
import org.apache.dolphinscheduler.dao.repository.UserDao; |
||||||
|
|
||||||
|
import lombok.NonNull; |
||||||
|
|
||||||
|
import org.springframework.stereotype.Repository; |
||||||
|
|
||||||
|
@Repository |
||||||
|
public class UserDaoImpl extends BaseDao<User, UserMapper> implements UserDao { |
||||||
|
|
||||||
|
public UserDaoImpl(@NonNull UserMapper userMapper) { |
||||||
|
super(userMapper); |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -1,61 +0,0 @@ |
|||||||
<?xml version="1.0" encoding="UTF-8" ?> |
|
||||||
<!-- |
|
||||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
~ contributor license agreements. See the NOTICE file distributed with |
|
||||||
~ this work for additional information regarding copyright ownership. |
|
||||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
~ (the "License"); you may not use this file except in compliance with |
|
||||||
~ the License. You may obtain a copy of the License at |
|
||||||
~ |
|
||||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
|
||||||
~ |
|
||||||
~ Unless required by applicable law or agreed to in writing, software |
|
||||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
~ See the License for the specific language governing permissions and |
|
||||||
~ limitations under the License. |
|
||||||
--> |
|
||||||
|
|
||||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
|
||||||
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TriggerRelationMapper"> |
|
||||||
<sql id="baseSql"> |
|
||||||
id, trigger_code, trigger_type, job_id, create_time, update_time |
|
||||||
</sql> |
|
||||||
|
|
||||||
<select id="queryByTypeAndJobId" resultType="org.apache.dolphinscheduler.dao.entity.TriggerRelation"> |
|
||||||
select |
|
||||||
<include refid="baseSql"/> |
|
||||||
from t_ds_trigger_relation |
|
||||||
WHERE trigger_type = #{triggerType} and job_id = #{jobId} |
|
||||||
</select> |
|
||||||
|
|
||||||
<select id="queryByTriggerRelationCode" resultType="org.apache.dolphinscheduler.dao.entity.TriggerRelation"> |
|
||||||
select |
|
||||||
<include refid="baseSql"/> |
|
||||||
from t_ds_trigger_relation |
|
||||||
WHERE trigger_code = #{triggerCode} |
|
||||||
</select> |
|
||||||
|
|
||||||
<select id="queryByTriggerRelationCodeAndType" resultType="org.apache.dolphinscheduler.dao.entity.TriggerRelation"> |
|
||||||
select |
|
||||||
<include refid="baseSql"/> |
|
||||||
from t_ds_trigger_relation |
|
||||||
WHERE trigger_code = #{triggerCode} and trigger_type = #{triggerType} |
|
||||||
</select> |
|
||||||
|
|
||||||
<delete id="deleteByCode"> |
|
||||||
delete from t_ds_trigger_relation where triggerCode = #{triggerCode} |
|
||||||
</delete> |
|
||||||
|
|
||||||
<insert id="upsert"> |
|
||||||
INSERT INTO t_ds_trigger_relation (trigger_code, trigger_type, job_id, create_time, update_time) VALUES( |
|
||||||
#{triggerRelation.triggerCode},#{triggerRelation.triggerType},#{triggerRelation.jobId},#{triggerRelation.createTime},#{triggerRelation.updateTime}) |
|
||||||
ON DUPLICATE KEY UPDATE update_time = #{triggerRelation.updateTime}; |
|
||||||
</insert> |
|
||||||
|
|
||||||
<insert id="upsert" databaseId="PostgreSQL"> |
|
||||||
INSERT INTO t_ds_trigger_relation (trigger_code, trigger_type, job_id, create_time, update_time) VALUES( |
|
||||||
#{triggerRelation.triggerCode},#{triggerRelation.triggerType},#{triggerRelation.jobId},#{triggerRelation.createTime},#{triggerRelation.updateTime}) |
|
||||||
ON CONFLICT(trigger_type,job_id,trigger_code) DO UPDATE SET update_time = #{triggerRelation.updateTime}; |
|
||||||
</insert> |
|
||||||
</mapper> |
|
@ -1,134 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.dao.mapper; |
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat; |
|
||||||
|
|
||||||
import org.apache.dolphinscheduler.common.enums.ApiTriggerType; |
|
||||||
import org.apache.dolphinscheduler.common.utils.DateUtils; |
|
||||||
import org.apache.dolphinscheduler.dao.BaseDaoTest; |
|
||||||
import org.apache.dolphinscheduler.dao.entity.TriggerRelation; |
|
||||||
|
|
||||||
import org.junit.jupiter.api.Assertions; |
|
||||||
import org.junit.jupiter.api.Test; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
|
|
||||||
/** |
|
||||||
* trigger mapper test |
|
||||||
*/ |
|
||||||
public class TriggerRelationMapperTest extends BaseDaoTest { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
TriggerRelationMapper triggerRelationMapper; |
|
||||||
|
|
||||||
/** |
|
||||||
* test insert |
|
||||||
* |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Test |
|
||||||
public void testInsert() { |
|
||||||
TriggerRelation expectedObj = createTriggerRelation(); |
|
||||||
Assertions.assertTrue(expectedObj.getId() > 0); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* test select by id |
|
||||||
* |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Test |
|
||||||
public void testSelectById() { |
|
||||||
TriggerRelation expectRelation = createTriggerRelation(); |
|
||||||
TriggerRelation actualRelation = triggerRelationMapper.selectById(expectRelation.getId()); |
|
||||||
Assertions.assertEquals(expectRelation, actualRelation); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* test select by type and job id |
|
||||||
* |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Test |
|
||||||
public void testQueryByTypeAndJobId() { |
|
||||||
TriggerRelation expectRelation = createTriggerRelation(); |
|
||||||
assertThat( |
|
||||||
triggerRelationMapper.queryByTypeAndJobId(expectRelation.getTriggerType(), expectRelation.getJobId())) |
|
||||||
.containsExactly(expectRelation); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* test select by trigger code |
|
||||||
* |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Test |
|
||||||
public void testQueryByTriggerRelationCode() { |
|
||||||
TriggerRelation expectRelation = createTriggerRelation(); |
|
||||||
assertThat(triggerRelationMapper.queryByTriggerRelationCode(expectRelation.getTriggerCode())) |
|
||||||
.containsExactly(expectRelation); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* test select by type and trigger code |
|
||||||
* |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Test |
|
||||||
public void testQueryByTriggerRelationCodeAndType() { |
|
||||||
TriggerRelation expectRelation = createTriggerRelation(); |
|
||||||
assertThat(triggerRelationMapper.queryByTriggerRelationCodeAndType(expectRelation.getTriggerCode(), |
|
||||||
expectRelation.getTriggerType())).containsExactly(expectRelation); |
|
||||||
} |
|
||||||
|
|
||||||
@Test |
|
||||||
public void testUpsert() { |
|
||||||
TriggerRelation expectRelation = createTriggerRelation(); |
|
||||||
triggerRelationMapper.upsert(expectRelation); |
|
||||||
assertThat(triggerRelationMapper.selectById(expectRelation.getId())).isEqualTo(expectRelation); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* test delete |
|
||||||
*/ |
|
||||||
@Test |
|
||||||
public void testDelete() { |
|
||||||
TriggerRelation expectRelation = createTriggerRelation(); |
|
||||||
triggerRelationMapper.deleteById(expectRelation.getId()); |
|
||||||
assertThat(triggerRelationMapper.selectById(expectRelation.getId())).isNull(); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* create TriggerRelation and insert |
|
||||||
* |
|
||||||
* @return TriggerRelation |
|
||||||
* @throws Exception |
|
||||||
*/ |
|
||||||
private TriggerRelation createTriggerRelation() { |
|
||||||
TriggerRelation triggerRelation = new TriggerRelation(); |
|
||||||
triggerRelation.setTriggerCode(4567890); |
|
||||||
triggerRelation.setTriggerType(ApiTriggerType.COMMAND.getCode()); |
|
||||||
triggerRelation.setJobId(99); |
|
||||||
triggerRelation.setCreateTime(DateUtils.getCurrentDate()); |
|
||||||
triggerRelation.setUpdateTime(DateUtils.getCurrentDate()); |
|
||||||
|
|
||||||
triggerRelationMapper.insert(triggerRelation); |
|
||||||
return triggerRelation; |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
17
dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java → dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
17
dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java → dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/ClientsTest.java
@ -0,0 +1,53 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.extract.master; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.extract.base.RpcMethod; |
||||||
|
import org.apache.dolphinscheduler.extract.base.RpcService; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseResponse; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerResponse; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerResponse; |
||||||
|
|
||||||
|
/** |
||||||
|
* Workflow instance controller used to do control operation for workflow instance. |
||||||
|
*/ |
||||||
|
@RpcService |
||||||
|
public interface IWorkflowControlClient { |
||||||
|
|
||||||
|
@RpcMethod |
||||||
|
WorkflowManualTriggerResponse manualTriggerWorkflow(final WorkflowManualTriggerRequest workflowManualTriggerRequest); |
||||||
|
|
||||||
|
@RpcMethod |
||||||
|
WorkflowBackfillTriggerResponse backfillTriggerWorkflow(final WorkflowBackfillTriggerRequest workflowBackfillTriggerRequest); |
||||||
|
|
||||||
|
@RpcMethod |
||||||
|
WorkflowScheduleTriggerResponse scheduleTriggerWorkflow(final WorkflowScheduleTriggerRequest workflowScheduleTriggerRequest); |
||||||
|
|
||||||
|
@RpcMethod |
||||||
|
WorkflowInstancePauseResponse pauseWorkflowInstance(final WorkflowInstancePauseRequest workflowInstancePauseRequest); |
||||||
|
|
||||||
|
@RpcMethod |
||||||
|
WorkflowInstanceStopResponse stopWorkflowInstance(final WorkflowInstanceStopRequest workflowInstanceStopRequest); |
||||||
|
} |
@ -1,39 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.extract.master; |
|
||||||
|
|
||||||
import org.apache.dolphinscheduler.extract.base.RpcMethod; |
|
||||||
import org.apache.dolphinscheduler.extract.base.RpcService; |
|
||||||
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstancePauseRequest; |
|
||||||
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstancePauseResponse; |
|
||||||
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStopRequest; |
|
||||||
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStopResponse; |
|
||||||
|
|
||||||
/** |
|
||||||
* Workflow instance controller used to do control operation for workflow instance. |
|
||||||
*/ |
|
||||||
@RpcService |
|
||||||
public interface IWorkflowInstanceController { |
|
||||||
|
|
||||||
@RpcMethod |
|
||||||
WorkflowInstancePauseResponse pauseWorkflowInstance(WorkflowInstancePauseRequest workflowInstancePauseRequest); |
|
||||||
|
|
||||||
@RpcMethod |
|
||||||
WorkflowInstanceStopResponse stopWorkflowInstance(WorkflowInstanceStopRequest workflowInstanceStopRequest); |
|
||||||
|
|
||||||
} |
|
@ -0,0 +1,83 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.extract.master.transportor.workflow; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.FailureStrategy; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Priority; |
||||||
|
import org.apache.dolphinscheduler.common.enums.TaskDependType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WarningType; |
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.model.Property; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Builder; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@Builder |
||||||
|
@NoArgsConstructor |
||||||
|
@AllArgsConstructor |
||||||
|
public class WorkflowBackfillTriggerRequest { |
||||||
|
|
||||||
|
private Integer userId; |
||||||
|
|
||||||
|
private List<String> backfillTimeList; |
||||||
|
|
||||||
|
private Long workflowCode; |
||||||
|
|
||||||
|
private Integer workflowVersion; |
||||||
|
|
||||||
|
private List<Long> startNodes; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private FailureStrategy failureStrategy = FailureStrategy.CONTINUE; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private TaskDependType taskDependType = TaskDependType.TASK_POST; |
||||||
|
|
||||||
|
private CommandType execType; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private WarningType warningType = WarningType.NONE; |
||||||
|
|
||||||
|
private Integer warningGroupId; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Priority workflowInstancePriority = Priority.MEDIUM; |
||||||
|
|
||||||
|
private String workerGroup; |
||||||
|
|
||||||
|
private String tenantCode; |
||||||
|
|
||||||
|
private Long environmentCode; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private List<Property> startParamList = new ArrayList<>(); |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Flag dryRun = Flag.NO; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Flag testFlag = Flag.NO; |
||||||
|
|
||||||
|
} |
@ -0,0 +1,51 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.extract.master.transportor.workflow; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Builder; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@Builder |
||||||
|
@NoArgsConstructor |
||||||
|
@AllArgsConstructor |
||||||
|
public class WorkflowBackfillTriggerResponse { |
||||||
|
|
||||||
|
private boolean success; |
||||||
|
|
||||||
|
private String message; |
||||||
|
|
||||||
|
private Integer workflowInstanceId; |
||||||
|
|
||||||
|
public static WorkflowBackfillTriggerResponse fail(String message) { |
||||||
|
return WorkflowBackfillTriggerResponse.builder() |
||||||
|
.success(false) |
||||||
|
.message(message) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
public static WorkflowBackfillTriggerResponse success(Integer workflowInstanceId) { |
||||||
|
return WorkflowBackfillTriggerResponse.builder() |
||||||
|
.success(true) |
||||||
|
.workflowInstanceId(workflowInstanceId) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
2
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/WorkflowInstancePauseRequest.java → dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstancePauseRequest.java
2
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/WorkflowInstancePauseRequest.java → dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstancePauseRequest.java
2
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/WorkflowInstancePauseResponse.java → dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstancePauseResponse.java
2
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/WorkflowInstancePauseResponse.java → dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstancePauseResponse.java
2
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/WorkflowInstanceStopRequest.java → dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceStopRequest.java
2
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/WorkflowInstanceStopRequest.java → dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceStopRequest.java
2
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/WorkflowInstanceStopResponse.java → dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceStopResponse.java
2
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/WorkflowInstanceStopResponse.java → dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceStopResponse.java
@ -0,0 +1,77 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.extract.master.transportor.workflow; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.FailureStrategy; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Priority; |
||||||
|
import org.apache.dolphinscheduler.common.enums.TaskDependType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WarningType; |
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.model.Property; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Builder; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@Builder |
||||||
|
@NoArgsConstructor |
||||||
|
@AllArgsConstructor |
||||||
|
public class WorkflowManualTriggerRequest { |
||||||
|
|
||||||
|
private Integer userId; |
||||||
|
|
||||||
|
private Long workflowDefinitionCode; |
||||||
|
|
||||||
|
private Integer workflowDefinitionVersion; |
||||||
|
|
||||||
|
private List<Long> startNodes; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private FailureStrategy failureStrategy = FailureStrategy.CONTINUE; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private TaskDependType taskDependType = TaskDependType.TASK_POST; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private WarningType warningType = WarningType.NONE; |
||||||
|
|
||||||
|
private Integer warningGroupId; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Priority workflowInstancePriority = Priority.MEDIUM; |
||||||
|
|
||||||
|
private String workerGroup; |
||||||
|
|
||||||
|
private String tenantCode; |
||||||
|
|
||||||
|
private Long environmentCode; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private List<Property> startParamList = new ArrayList<>(); |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Flag dryRun = Flag.NO; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Flag testFlag = Flag.NO; |
||||||
|
} |
@ -0,0 +1,50 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.extract.master.transportor.workflow; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Builder; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@Builder |
||||||
|
@NoArgsConstructor |
||||||
|
@AllArgsConstructor |
||||||
|
public class WorkflowManualTriggerResponse { |
||||||
|
|
||||||
|
private boolean success; |
||||||
|
|
||||||
|
private String message; |
||||||
|
|
||||||
|
private Integer workflowInstanceId; |
||||||
|
|
||||||
|
public static WorkflowManualTriggerResponse fail(String message) { |
||||||
|
return WorkflowManualTriggerResponse.builder() |
||||||
|
.success(false) |
||||||
|
.message(message) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
public static WorkflowManualTriggerResponse success(Integer workflowInstanceId) { |
||||||
|
return WorkflowManualTriggerResponse.builder() |
||||||
|
.success(true) |
||||||
|
.workflowInstanceId(workflowInstanceId) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,75 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.extract.master.transportor.workflow; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.FailureStrategy; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Priority; |
||||||
|
import org.apache.dolphinscheduler.common.enums.TaskDependType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WarningType; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Builder; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@Builder |
||||||
|
@NoArgsConstructor |
||||||
|
@AllArgsConstructor |
||||||
|
public class WorkflowScheduleTriggerRequest { |
||||||
|
|
||||||
|
private Integer userId; |
||||||
|
|
||||||
|
private Date scheduleTIme; |
||||||
|
|
||||||
|
private String timezoneId; |
||||||
|
|
||||||
|
private Long workflowCode; |
||||||
|
|
||||||
|
private Integer workflowVersion; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private FailureStrategy failureStrategy = FailureStrategy.CONTINUE; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private TaskDependType taskDependType = TaskDependType.TASK_POST; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private WarningType warningType = WarningType.NONE; |
||||||
|
|
||||||
|
private Integer warningGroupId; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Priority workflowInstancePriority = Priority.MEDIUM; |
||||||
|
|
||||||
|
private String workerGroup; |
||||||
|
|
||||||
|
private String tenantCode; |
||||||
|
|
||||||
|
private Long environmentCode; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Flag dryRun = Flag.NO; |
||||||
|
|
||||||
|
@Builder.Default |
||||||
|
private Flag testFlag = Flag.NO; |
||||||
|
|
||||||
|
} |
@ -0,0 +1,51 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.extract.master.transportor.workflow; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Builder; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@Builder |
||||||
|
@NoArgsConstructor |
||||||
|
@AllArgsConstructor |
||||||
|
public class WorkflowScheduleTriggerResponse { |
||||||
|
|
||||||
|
private boolean success; |
||||||
|
|
||||||
|
private String message; |
||||||
|
|
||||||
|
private Integer workflowInstanceId; |
||||||
|
|
||||||
|
public static WorkflowScheduleTriggerResponse fail(String message) { |
||||||
|
return WorkflowScheduleTriggerResponse.builder() |
||||||
|
.success(false) |
||||||
|
.message(message) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
public static WorkflowScheduleTriggerResponse success(Integer workflowInstanceId) { |
||||||
|
return WorkflowScheduleTriggerResponse.builder() |
||||||
|
.success(true) |
||||||
|
.workflowInstanceId(workflowInstanceId) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,121 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.master.engine.workflow.statemachine; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPausedLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStopLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStoppedLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowSucceedLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; |
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
|
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
|
||||||
|
/** |
||||||
|
* The workflow submitted state should transform to running state when handle command. |
||||||
|
*/ |
||||||
|
@Slf4j |
||||||
|
@Component |
||||||
|
public class WorkflowSubmittedStateAction extends AbstractWorkflowStateAction { |
||||||
|
|
||||||
|
@Override |
||||||
|
public void startEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowStartLifecycleEvent workflowStartEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStartEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void topologyLogicalTransitionEventAction( |
||||||
|
final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent workflowTopologyLogicalTransitionWithTaskFinishEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowTopologyLogicalTransitionWithTaskFinishEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowPauseLifecycleEvent workflowPauseEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowPauseEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowPausedLifecycleEvent workflowPausedEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowPausedEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void stopEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowStopLifecycleEvent workflowStopEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStopEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void stoppedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowStoppedLifecycleEvent workflowStoppedEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowStoppedEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void succeedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowSucceedLifecycleEvent workflowSucceedEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowSucceedEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void failedEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowFailedLifecycleEvent workflowFailedEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowFailedEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void finalizeEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable, |
||||||
|
final WorkflowFinalizeLifecycleEvent workflowFinalizeEvent) { |
||||||
|
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable); |
||||||
|
logWarningIfCannotDoAction(workflowExecutionRunnable, workflowFinalizeEvent); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public WorkflowExecutionStatus matchState() { |
||||||
|
return WorkflowExecutionStatus.SUBMITTED_SUCCESS; |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* The running state can only finish with success/failure. |
||||||
|
*/ |
||||||
|
@Override |
||||||
|
protected void emitWorkflowFinishedEventIfApplicable(IWorkflowExecutionRunnable workflowExecutionRunnable) { |
||||||
|
throw new IllegalStateException( |
||||||
|
"The workflow " + workflowExecutionRunnable.getName() + |
||||||
|
"is submitted, shouldn't emit workflow finished event"); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,86 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.dao.entity.Command; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.User; |
||||||
|
import org.apache.dolphinscheduler.dao.repository.CommandDao; |
||||||
|
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; |
||||||
|
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; |
||||||
|
import org.apache.dolphinscheduler.dao.repository.UserDao; |
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||||
|
import org.springframework.transaction.annotation.Transactional; |
||||||
|
|
||||||
|
@Slf4j |
||||||
|
public abstract class AbstractWorkflowTrigger<TriggerRequest, TriggerResponse> |
||||||
|
implements |
||||||
|
IWorkflowTrigger<TriggerRequest, TriggerResponse> { |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private ProcessDefinitionLogDao workflowDefinitionDao; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private ProcessInstanceDao workflowInstanceDao; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private UserDao userDao; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private CommandDao commandDao; |
||||||
|
|
||||||
|
@Override |
||||||
|
@Transactional |
||||||
|
public TriggerResponse triggerWorkflow(final TriggerRequest triggerRequest) { |
||||||
|
final ProcessInstance workflowInstance = constructWorkflowInstance(triggerRequest); |
||||||
|
workflowInstanceDao.insert(workflowInstance); |
||||||
|
|
||||||
|
final Command command = constructTriggerCommand(triggerRequest, workflowInstance); |
||||||
|
commandDao.insert(command); |
||||||
|
|
||||||
|
return onTriggerSuccess(workflowInstance); |
||||||
|
} |
||||||
|
|
||||||
|
protected abstract ProcessInstance constructWorkflowInstance(final TriggerRequest triggerRequest); |
||||||
|
|
||||||
|
protected abstract Command constructTriggerCommand(final TriggerRequest triggerRequest, |
||||||
|
final ProcessInstance workflowInstance); |
||||||
|
|
||||||
|
protected abstract TriggerResponse onTriggerSuccess(final ProcessInstance workflowInstance); |
||||||
|
|
||||||
|
protected ProcessDefinition getProcessDefinition(final Long workflowCode, final Integer workflowVersion) { |
||||||
|
final ProcessDefinitionLog workflow = workflowDefinitionDao.queryByDefinitionCodeAndVersion( |
||||||
|
workflowCode, workflowVersion); |
||||||
|
if (workflow == null) { |
||||||
|
throw new IllegalStateException( |
||||||
|
"Workflow definition not found: " + workflowCode + " version " + workflowVersion); |
||||||
|
} |
||||||
|
return workflow; |
||||||
|
} |
||||||
|
|
||||||
|
protected User getExecutorUser(final Integer userId) { |
||||||
|
return userDao.queryOptionalById(userId) |
||||||
|
.orElseThrow(() -> new IllegalStateException("User not found: " + userId)); |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,30 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; |
||||||
|
|
||||||
|
/** |
||||||
|
* The trigger interface of the workflow, used to trigger the workflow and generate the workflow instance. |
||||||
|
* |
||||||
|
* @param <TriggerRequest> trigger request |
||||||
|
* @param <TriggerResponse> trigger response |
||||||
|
*/ |
||||||
|
public interface IWorkflowTrigger<TriggerRequest, TriggerResponse> { |
||||||
|
|
||||||
|
TriggerResponse triggerWorkflow(final TriggerRequest triggerRequest); |
||||||
|
|
||||||
|
} |
@ -0,0 +1,114 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WarningType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
||||||
|
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||||
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.Command; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; |
||||||
|
import org.apache.dolphinscheduler.extract.master.command.BackfillWorkflowCommandParam; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse; |
||||||
|
|
||||||
|
import org.apache.commons.lang3.ObjectUtils; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
|
||||||
|
/** |
||||||
|
* Backfill trigger of the workflow, used to trigger the workflow and generate the workflow instance in the backfill way. |
||||||
|
*/ |
||||||
|
@Component |
||||||
|
public class WorkflowBackfillTrigger |
||||||
|
extends |
||||||
|
AbstractWorkflowTrigger<WorkflowBackfillTriggerRequest, WorkflowBackfillTriggerResponse> { |
||||||
|
|
||||||
|
@Override |
||||||
|
protected ProcessInstance constructWorkflowInstance(WorkflowBackfillTriggerRequest backfillTriggerRequest) { |
||||||
|
final CommandType commandType = CommandType.COMPLEMENT_DATA; |
||||||
|
final Long workflowCode = backfillTriggerRequest.getWorkflowCode(); |
||||||
|
final Integer workflowVersion = backfillTriggerRequest.getWorkflowVersion(); |
||||||
|
final List<String> backfillTimeList = backfillTriggerRequest.getBackfillTimeList(); |
||||||
|
final ProcessDefinition workflowDefinition = getProcessDefinition(workflowCode, workflowVersion); |
||||||
|
|
||||||
|
final ProcessInstance workflowInstance = new ProcessInstance(); |
||||||
|
workflowInstance.setProcessDefinitionCode(workflowDefinition.getCode()); |
||||||
|
workflowInstance.setProcessDefinitionVersion(workflowDefinition.getVersion()); |
||||||
|
workflowInstance.setProjectCode(workflowDefinition.getProjectCode()); |
||||||
|
workflowInstance.setCommandType(commandType); |
||||||
|
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, commandType.name()); |
||||||
|
workflowInstance.setRecovery(Flag.NO); |
||||||
|
workflowInstance.setScheduleTime(DateUtils.stringToDate(backfillTimeList.get(0))); |
||||||
|
workflowInstance.setStartTime(new Date()); |
||||||
|
workflowInstance.setRestartTime(workflowInstance.getStartTime()); |
||||||
|
workflowInstance.setRunTimes(1); |
||||||
|
workflowInstance.setName(String.join("-", workflowDefinition.getName(), DateUtils.getCurrentTimeStamp())); |
||||||
|
workflowInstance.setTaskDependType(backfillTriggerRequest.getTaskDependType()); |
||||||
|
workflowInstance.setFailureStrategy(backfillTriggerRequest.getFailureStrategy()); |
||||||
|
workflowInstance |
||||||
|
.setWarningType(ObjectUtils.defaultIfNull(backfillTriggerRequest.getWarningType(), WarningType.NONE)); |
||||||
|
workflowInstance.setWarningGroupId(backfillTriggerRequest.getWarningGroupId()); |
||||||
|
workflowInstance.setExecutorId(backfillTriggerRequest.getUserId()); |
||||||
|
workflowInstance.setExecutorName(getExecutorUser(backfillTriggerRequest.getUserId()).getUserName()); |
||||||
|
workflowInstance.setTenantCode(backfillTriggerRequest.getTenantCode()); |
||||||
|
workflowInstance.setIsSubProcess(Flag.NO); |
||||||
|
workflowInstance.addHistoryCmd(commandType); |
||||||
|
workflowInstance.setProcessInstancePriority(backfillTriggerRequest.getWorkflowInstancePriority()); |
||||||
|
workflowInstance |
||||||
|
.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(backfillTriggerRequest.getWorkerGroup())); |
||||||
|
workflowInstance.setEnvironmentCode( |
||||||
|
EnvironmentUtils.getEnvironmentCodeOrDefault(backfillTriggerRequest.getEnvironmentCode())); |
||||||
|
workflowInstance.setTimeout(workflowDefinition.getTimeout()); |
||||||
|
workflowInstance.setDryRun(backfillTriggerRequest.getDryRun().getCode()); |
||||||
|
workflowInstance.setTestFlag(backfillTriggerRequest.getTestFlag().getCode()); |
||||||
|
return workflowInstance; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
protected Command constructTriggerCommand(WorkflowBackfillTriggerRequest backfillTriggerRequest, |
||||||
|
ProcessInstance workflowInstance) { |
||||||
|
final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder() |
||||||
|
.commandParams(backfillTriggerRequest.getStartParamList()) |
||||||
|
.startNodes(backfillTriggerRequest.getStartNodes()) |
||||||
|
.timeZone(DateUtils.getTimezone()) |
||||||
|
.backfillTimeList(backfillTriggerRequest.getBackfillTimeList()) |
||||||
|
.build(); |
||||||
|
return Command.builder() |
||||||
|
.commandType(backfillTriggerRequest.getExecType()) |
||||||
|
.processDefinitionCode(backfillTriggerRequest.getWorkflowCode()) |
||||||
|
.processDefinitionVersion(backfillTriggerRequest.getWorkflowVersion()) |
||||||
|
.processInstanceId(workflowInstance.getId()) |
||||||
|
.processInstancePriority(workflowInstance.getProcessInstancePriority()) |
||||||
|
.commandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam)) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
protected WorkflowBackfillTriggerResponse onTriggerSuccess(ProcessInstance workflowInstance) { |
||||||
|
return WorkflowBackfillTriggerResponse.success(workflowInstance.getId()); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,111 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WarningType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
||||||
|
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||||
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.Command; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; |
||||||
|
import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerResponse; |
||||||
|
|
||||||
|
import org.apache.commons.lang3.ObjectUtils; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
|
||||||
|
/** |
||||||
|
* Manual trigger of the workflow, used to trigger the workflow and generate the workflow instance in the manual way. |
||||||
|
*/ |
||||||
|
@Component |
||||||
|
public class WorkflowManualTrigger |
||||||
|
extends |
||||||
|
AbstractWorkflowTrigger<WorkflowManualTriggerRequest, WorkflowManualTriggerResponse> { |
||||||
|
|
||||||
|
@Override |
||||||
|
protected ProcessInstance constructWorkflowInstance(final WorkflowManualTriggerRequest workflowManualTriggerRequest) { |
||||||
|
final CommandType commandType = CommandType.START_PROCESS; |
||||||
|
final Long workflowCode = workflowManualTriggerRequest.getWorkflowDefinitionCode(); |
||||||
|
final Integer workflowVersion = workflowManualTriggerRequest.getWorkflowDefinitionVersion(); |
||||||
|
final ProcessDefinition workflowDefinition = getProcessDefinition(workflowCode, workflowVersion); |
||||||
|
|
||||||
|
final ProcessInstance workflowInstance = new ProcessInstance(); |
||||||
|
workflowInstance.setProcessDefinitionCode(workflowDefinition.getCode()); |
||||||
|
workflowInstance.setProcessDefinitionVersion(workflowDefinition.getVersion()); |
||||||
|
workflowInstance.setProjectCode(workflowDefinition.getProjectCode()); |
||||||
|
workflowInstance.setCommandType(commandType); |
||||||
|
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, commandType.name()); |
||||||
|
workflowInstance.setRecovery(Flag.NO); |
||||||
|
workflowInstance.setStartTime(new Date()); |
||||||
|
workflowInstance.setRestartTime(workflowInstance.getStartTime()); |
||||||
|
workflowInstance.setRunTimes(1); |
||||||
|
workflowInstance.setName(String.join("-", workflowDefinition.getName(), DateUtils.getCurrentTimeStamp())); |
||||||
|
workflowInstance.setTaskDependType(workflowManualTriggerRequest.getTaskDependType()); |
||||||
|
workflowInstance.setFailureStrategy(workflowManualTriggerRequest.getFailureStrategy()); |
||||||
|
workflowInstance.setWarningType( |
||||||
|
ObjectUtils.defaultIfNull(workflowManualTriggerRequest.getWarningType(), WarningType.NONE)); |
||||||
|
workflowInstance.setWarningGroupId(workflowManualTriggerRequest.getWarningGroupId()); |
||||||
|
workflowInstance.setExecutorId(workflowManualTriggerRequest.getUserId()); |
||||||
|
workflowInstance.setExecutorName(getExecutorUser(workflowManualTriggerRequest.getUserId()).getUserName()); |
||||||
|
workflowInstance.setTenantCode(workflowManualTriggerRequest.getTenantCode()); |
||||||
|
workflowInstance.setIsSubProcess(Flag.NO); |
||||||
|
workflowInstance.addHistoryCmd(commandType); |
||||||
|
workflowInstance.setProcessInstancePriority(workflowManualTriggerRequest.getWorkflowInstancePriority()); |
||||||
|
workflowInstance.setWorkerGroup( |
||||||
|
WorkerGroupUtils.getWorkerGroupOrDefault(workflowManualTriggerRequest.getWorkerGroup())); |
||||||
|
workflowInstance.setEnvironmentCode( |
||||||
|
EnvironmentUtils.getEnvironmentCodeOrDefault(workflowManualTriggerRequest.getEnvironmentCode())); |
||||||
|
workflowInstance.setTimeout(workflowDefinition.getTimeout()); |
||||||
|
workflowInstance.setDryRun(workflowManualTriggerRequest.getDryRun().getCode()); |
||||||
|
workflowInstance.setTestFlag(workflowManualTriggerRequest.getTestFlag().getCode()); |
||||||
|
return workflowInstance; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
protected Command constructTriggerCommand(final WorkflowManualTriggerRequest workflowManualTriggerRequest, |
||||||
|
final ProcessInstance workflowInstance) { |
||||||
|
final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder() |
||||||
|
.commandParams(workflowManualTriggerRequest.getStartParamList()) |
||||||
|
.startNodes(workflowManualTriggerRequest.getStartNodes()) |
||||||
|
.timeZone(DateUtils.getTimezone()) |
||||||
|
.build(); |
||||||
|
return Command.builder() |
||||||
|
.commandType(CommandType.START_PROCESS) |
||||||
|
.processDefinitionCode(workflowManualTriggerRequest.getWorkflowDefinitionCode()) |
||||||
|
.processDefinitionVersion(workflowManualTriggerRequest.getWorkflowDefinitionVersion()) |
||||||
|
.processInstanceId(workflowInstance.getId()) |
||||||
|
.processInstancePriority(workflowInstance.getProcessInstancePriority()) |
||||||
|
.commandParam(JSONUtils.toJsonString(runWorkflowCommandParam)) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
protected WorkflowManualTriggerResponse onTriggerSuccess(ProcessInstance workflowInstance) { |
||||||
|
return WorkflowManualTriggerResponse.success(workflowInstance.getId()); |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,106 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WarningType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
||||||
|
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||||
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.Command; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; |
||||||
|
import org.apache.dolphinscheduler.extract.master.command.ScheduleWorkflowCommandParam; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerResponse; |
||||||
|
|
||||||
|
import org.apache.commons.lang3.ObjectUtils; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
|
||||||
|
@Component |
||||||
|
public class WorkflowScheduleTrigger |
||||||
|
extends |
||||||
|
AbstractWorkflowTrigger<WorkflowScheduleTriggerRequest, WorkflowScheduleTriggerResponse> { |
||||||
|
|
||||||
|
@Override |
||||||
|
protected ProcessInstance constructWorkflowInstance(WorkflowScheduleTriggerRequest scheduleTriggerRequest) { |
||||||
|
final CommandType commandType = CommandType.SCHEDULER; |
||||||
|
final Long workflowCode = scheduleTriggerRequest.getWorkflowCode(); |
||||||
|
final Integer workflowVersion = scheduleTriggerRequest.getWorkflowVersion(); |
||||||
|
final ProcessDefinition workflowDefinition = getProcessDefinition(workflowCode, workflowVersion); |
||||||
|
|
||||||
|
final ProcessInstance workflowInstance = new ProcessInstance(); |
||||||
|
workflowInstance.setProcessDefinitionCode(workflowDefinition.getCode()); |
||||||
|
workflowInstance.setProcessDefinitionVersion(workflowDefinition.getVersion()); |
||||||
|
workflowInstance.setProjectCode(workflowDefinition.getProjectCode()); |
||||||
|
workflowInstance.setCommandType(commandType); |
||||||
|
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, commandType.name()); |
||||||
|
workflowInstance.setRecovery(Flag.NO); |
||||||
|
workflowInstance.setScheduleTime(scheduleTriggerRequest.getScheduleTIme()); |
||||||
|
workflowInstance.setStartTime(new Date()); |
||||||
|
workflowInstance.setRestartTime(workflowInstance.getStartTime()); |
||||||
|
workflowInstance.setRunTimes(1); |
||||||
|
workflowInstance.setName(String.join("-", workflowDefinition.getName(), DateUtils.getCurrentTimeStamp())); |
||||||
|
workflowInstance.setTaskDependType(scheduleTriggerRequest.getTaskDependType()); |
||||||
|
workflowInstance.setFailureStrategy(scheduleTriggerRequest.getFailureStrategy()); |
||||||
|
workflowInstance |
||||||
|
.setWarningType(ObjectUtils.defaultIfNull(scheduleTriggerRequest.getWarningType(), WarningType.NONE)); |
||||||
|
workflowInstance.setWarningGroupId(scheduleTriggerRequest.getWarningGroupId()); |
||||||
|
workflowInstance.setExecutorId(scheduleTriggerRequest.getUserId()); |
||||||
|
workflowInstance.setExecutorName(getExecutorUser(scheduleTriggerRequest.getUserId()).getUserName()); |
||||||
|
workflowInstance.setTenantCode(scheduleTriggerRequest.getTenantCode()); |
||||||
|
workflowInstance.setIsSubProcess(Flag.NO); |
||||||
|
workflowInstance.addHistoryCmd(commandType); |
||||||
|
workflowInstance.setProcessInstancePriority(scheduleTriggerRequest.getWorkflowInstancePriority()); |
||||||
|
workflowInstance |
||||||
|
.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(scheduleTriggerRequest.getWorkerGroup())); |
||||||
|
workflowInstance.setEnvironmentCode( |
||||||
|
EnvironmentUtils.getEnvironmentCodeOrDefault(scheduleTriggerRequest.getEnvironmentCode())); |
||||||
|
workflowInstance.setTimeout(workflowDefinition.getTimeout()); |
||||||
|
workflowInstance.setDryRun(scheduleTriggerRequest.getDryRun().getCode()); |
||||||
|
workflowInstance.setTestFlag(scheduleTriggerRequest.getTestFlag().getCode()); |
||||||
|
return workflowInstance; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
protected Command constructTriggerCommand(final WorkflowScheduleTriggerRequest scheduleTriggerRequest, |
||||||
|
final ProcessInstance workflowInstance) { |
||||||
|
final ScheduleWorkflowCommandParam scheduleWorkflowCommandParam = ScheduleWorkflowCommandParam.builder() |
||||||
|
.timeZone(scheduleTriggerRequest.getTimezoneId()) |
||||||
|
.build(); |
||||||
|
return Command.builder() |
||||||
|
.commandType(CommandType.SCHEDULER) |
||||||
|
.processDefinitionCode(scheduleTriggerRequest.getWorkflowCode()) |
||||||
|
.processDefinitionVersion(scheduleTriggerRequest.getWorkflowVersion()) |
||||||
|
.processInstanceId(workflowInstance.getId()) |
||||||
|
.processInstancePriority(workflowInstance.getProcessInstancePriority()) |
||||||
|
.commandParam(JSONUtils.toJsonString(scheduleWorkflowCommandParam)) |
||||||
|
.build(); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
protected WorkflowScheduleTriggerResponse onTriggerSuccess(ProcessInstance workflowInstance) { |
||||||
|
return WorkflowScheduleTriggerResponse.success(workflowInstance.getId()); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,126 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.master.rpc; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseResponse; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerResponse; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerRequest; |
||||||
|
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerResponse; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.WorkflowCacheRepository; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowBackfillTrigger; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowManualTrigger; |
||||||
|
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowScheduleTrigger; |
||||||
|
|
||||||
|
import org.apache.commons.lang3.exception.ExceptionUtils; |
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||||
|
import org.springframework.stereotype.Service; |
||||||
|
|
||||||
|
@Slf4j |
||||||
|
@Service |
||||||
|
public class WorkflowControlClient implements IWorkflowControlClient { |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private WorkflowManualTrigger workflowManualTrigger; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private WorkflowBackfillTrigger workflowBackfillTrigger; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private WorkflowScheduleTrigger workflowScheduleTrigger; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private WorkflowCacheRepository workflowRepository; |
||||||
|
|
||||||
|
@Override |
||||||
|
public WorkflowManualTriggerResponse manualTriggerWorkflow(final WorkflowManualTriggerRequest manualTriggerRequest) { |
||||||
|
try { |
||||||
|
return workflowManualTrigger.triggerWorkflow(manualTriggerRequest); |
||||||
|
} catch (Exception ex) { |
||||||
|
log.error("Handle workflowTriggerRequest: {} failed", manualTriggerRequest, ex); |
||||||
|
return WorkflowManualTriggerResponse.fail("Trigger workflow failed: " + ExceptionUtils.getMessage(ex)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public WorkflowBackfillTriggerResponse backfillTriggerWorkflow(final WorkflowBackfillTriggerRequest backfillTriggerRequest) { |
||||||
|
try { |
||||||
|
return workflowBackfillTrigger.triggerWorkflow(backfillTriggerRequest); |
||||||
|
} catch (Exception ex) { |
||||||
|
log.error("Handle workflowBackfillTriggerRequest: {} failed", backfillTriggerRequest, ex); |
||||||
|
return WorkflowBackfillTriggerResponse.fail("Backfill workflow failed: " + ExceptionUtils.getMessage(ex)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public WorkflowScheduleTriggerResponse scheduleTriggerWorkflow(WorkflowScheduleTriggerRequest workflowScheduleTriggerRequest) { |
||||||
|
try { |
||||||
|
return workflowScheduleTrigger.triggerWorkflow(workflowScheduleTriggerRequest); |
||||||
|
} catch (Exception ex) { |
||||||
|
log.error("Handle workflowScheduleTriggerRequest: {} failed", workflowScheduleTriggerRequest, ex); |
||||||
|
return WorkflowScheduleTriggerResponse |
||||||
|
.fail("Schedule trigger workflow failed: " + ExceptionUtils.getMessage(ex)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public WorkflowInstancePauseResponse pauseWorkflowInstance(final WorkflowInstancePauseRequest workflowInstancePauseRequest) { |
||||||
|
try { |
||||||
|
final Integer workflowInstanceId = workflowInstancePauseRequest.getWorkflowInstanceId(); |
||||||
|
final IWorkflowExecutionRunnable workflow = workflowRepository.get(workflowInstanceId); |
||||||
|
if (workflow == null) { |
||||||
|
return WorkflowInstancePauseResponse.fail( |
||||||
|
"Cannot find the WorkflowExecuteRunnable: " + workflowInstanceId); |
||||||
|
} |
||||||
|
workflow.pause(); |
||||||
|
return WorkflowInstancePauseResponse.success(); |
||||||
|
} catch (Exception ex) { |
||||||
|
log.error("Handle workflowInstancePauseRequest: {} failed", workflowInstancePauseRequest, ex); |
||||||
|
return WorkflowInstancePauseResponse.fail( |
||||||
|
"Pause workflow instance failed: " + ExceptionUtils.getMessage(ex)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public WorkflowInstanceStopResponse stopWorkflowInstance(final WorkflowInstanceStopRequest workflowInstanceStopRequest) { |
||||||
|
try { |
||||||
|
final Integer workflowInstanceId = workflowInstanceStopRequest.getWorkflowInstanceId(); |
||||||
|
final IWorkflowExecutionRunnable workflow = workflowRepository.get(workflowInstanceId); |
||||||
|
if (workflow == null) { |
||||||
|
return WorkflowInstanceStopResponse |
||||||
|
.fail("Cannot find the WorkflowExecuteRunnable: " + workflowInstanceId); |
||||||
|
} |
||||||
|
workflow.stop(); |
||||||
|
return WorkflowInstanceStopResponse.success(); |
||||||
|
} catch (Exception ex) { |
||||||
|
log.error("Handle workflowInstanceStopRequest: {} failed", workflowInstanceStopRequest, ex); |
||||||
|
return WorkflowInstanceStopResponse.fail( |
||||||
|
"Stop workflow instance failed:" + ExceptionUtils.getMessage(ex)); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -1,81 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.server.master.rpc; |
|
||||||
|
|
||||||
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceController; |
|
||||||
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstancePauseRequest; |
|
||||||
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstancePauseResponse; |
|
||||||
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStopRequest; |
|
||||||
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStopResponse; |
|
||||||
import org.apache.dolphinscheduler.server.master.engine.WorkflowCacheRepository; |
|
||||||
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent; |
|
||||||
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStopLifecycleEvent; |
|
||||||
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; |
|
||||||
|
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils; |
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.stereotype.Service; |
|
||||||
|
|
||||||
@Slf4j |
|
||||||
@Service |
|
||||||
public class WorkflowInstanceControllerImpl implements IWorkflowInstanceController { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private WorkflowCacheRepository workflowCacheRepository; |
|
||||||
|
|
||||||
@Override |
|
||||||
public WorkflowInstancePauseResponse pauseWorkflowInstance(final WorkflowInstancePauseRequest workflowInstancePauseRequest) { |
|
||||||
try { |
|
||||||
final Integer workflowInstanceId = workflowInstancePauseRequest.getWorkflowInstanceId(); |
|
||||||
final IWorkflowExecutionRunnable workflowExecutionRunnable = |
|
||||||
workflowCacheRepository.get(workflowInstanceId); |
|
||||||
if (workflowExecutionRunnable == null) { |
|
||||||
return WorkflowInstancePauseResponse |
|
||||||
.fail("Cannot find the WorkflowExecuteRunnable: " + workflowInstanceId); |
|
||||||
} |
|
||||||
workflowExecutionRunnable.getWorkflowEventBus() |
|
||||||
.publish(WorkflowPauseLifecycleEvent.of(workflowExecutionRunnable)); |
|
||||||
return WorkflowInstancePauseResponse.success(); |
|
||||||
} catch (Exception ex) { |
|
||||||
log.error("Handle workflowInstancePauseRequest: {} failed", workflowInstancePauseRequest, ex); |
|
||||||
return WorkflowInstancePauseResponse.fail(ExceptionUtils.getMessage(ex)); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public WorkflowInstanceStopResponse stopWorkflowInstance(final WorkflowInstanceStopRequest workflowInstanceStopRequest) { |
|
||||||
try { |
|
||||||
final Integer workflowInstanceId = workflowInstanceStopRequest.getWorkflowInstanceId(); |
|
||||||
final IWorkflowExecutionRunnable workflowExecutionRunnable = |
|
||||||
workflowCacheRepository.get(workflowInstanceId); |
|
||||||
if (workflowExecutionRunnable == null) { |
|
||||||
return WorkflowInstanceStopResponse |
|
||||||
.fail("Cannot find the WorkflowExecuteRunnable: " + workflowInstanceId); |
|
||||||
} |
|
||||||
workflowExecutionRunnable.getWorkflowEventBus() |
|
||||||
.publish(WorkflowStopLifecycleEvent.of(workflowExecutionRunnable)); |
|
||||||
return WorkflowInstanceStopResponse.success(); |
|
||||||
} catch (Exception ex) { |
|
||||||
log.error("Handle workflowInstanceStopRequest: {} failed", workflowInstanceStopRequest, ex); |
|
||||||
return WorkflowInstanceStopResponse.fail(ExceptionUtils.getMessage(ex)); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
@ -1,40 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.service.process; |
|
||||||
|
|
||||||
import org.apache.dolphinscheduler.common.enums.ApiTriggerType; |
|
||||||
import org.apache.dolphinscheduler.dao.entity.TriggerRelation; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
import org.springframework.stereotype.Component; |
|
||||||
|
|
||||||
/** |
|
||||||
* Trigger relation operator to db,because operator command process instance |
|
||||||
*/ |
|
||||||
@Component |
|
||||||
public interface TriggerRelationService { |
|
||||||
|
|
||||||
void saveTriggerToDb(ApiTriggerType type, Long triggerCode, Integer jobId); |
|
||||||
|
|
||||||
List<TriggerRelation> queryByTypeAndJobId(ApiTriggerType apiTriggerType, int jobId); |
|
||||||
|
|
||||||
int saveCommandTrigger(Integer commandId, Integer processInstanceId); |
|
||||||
|
|
||||||
int saveProcessInstanceTrigger(Integer commandId, Integer processInstanceId); |
|
||||||
} |
|
@ -1,94 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.service.process; |
|
||||||
|
|
||||||
import org.apache.dolphinscheduler.common.enums.ApiTriggerType; |
|
||||||
import org.apache.dolphinscheduler.dao.entity.TriggerRelation; |
|
||||||
import org.apache.dolphinscheduler.dao.mapper.TriggerRelationMapper; |
|
||||||
|
|
||||||
import org.apache.commons.collections4.CollectionUtils; |
|
||||||
|
|
||||||
import java.util.Date; |
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.stereotype.Component; |
|
||||||
|
|
||||||
/** |
|
||||||
* Trigger relation operator to db |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Component |
|
||||||
public class TriggerRelationServiceImpl implements TriggerRelationService { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private TriggerRelationMapper triggerRelationMapper; |
|
||||||
|
|
||||||
@Override |
|
||||||
public void saveTriggerToDb(ApiTriggerType type, Long triggerCode, Integer jobId) { |
|
||||||
TriggerRelation triggerRelation = new TriggerRelation(); |
|
||||||
triggerRelation.setTriggerType(type.getCode()); |
|
||||||
triggerRelation.setJobId(jobId); |
|
||||||
triggerRelation.setTriggerCode(triggerCode); |
|
||||||
triggerRelation.setCreateTime(new Date()); |
|
||||||
triggerRelation.setUpdateTime(new Date()); |
|
||||||
triggerRelationMapper.upsert(triggerRelation); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public List<TriggerRelation> queryByTypeAndJobId(ApiTriggerType apiTriggerType, int jobId) { |
|
||||||
return triggerRelationMapper.queryByTypeAndJobId(apiTriggerType.getCode(), jobId); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public int saveCommandTrigger(Integer commandId, Integer processInstanceId) { |
|
||||||
List<TriggerRelation> existTriggers = queryByTypeAndJobId(ApiTriggerType.PROCESS, processInstanceId); |
|
||||||
if (CollectionUtils.isEmpty(existTriggers)) { |
|
||||||
return 0; |
|
||||||
} |
|
||||||
existTriggers.forEach(triggerRelation -> saveTriggerToDb(ApiTriggerType.COMMAND, |
|
||||||
triggerRelation.getTriggerCode(), commandId)); |
|
||||||
int triggerRelationSize = existTriggers.size(); |
|
||||||
if (triggerRelationSize > 1) { |
|
||||||
// Fix https://github.com/apache/dolphinscheduler/issues/15864
|
|
||||||
// This case shouldn't happen
|
|
||||||
log.error("The PROCESS TriggerRelation of command: {} is more than one", commandId); |
|
||||||
} |
|
||||||
return existTriggers.size(); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public int saveProcessInstanceTrigger(Integer commandId, Integer processInstanceId) { |
|
||||||
List<TriggerRelation> existTriggers = queryByTypeAndJobId(ApiTriggerType.COMMAND, commandId); |
|
||||||
if (CollectionUtils.isEmpty(existTriggers)) { |
|
||||||
return 0; |
|
||||||
} |
|
||||||
existTriggers.forEach(triggerRelation -> saveTriggerToDb(ApiTriggerType.PROCESS, |
|
||||||
triggerRelation.getTriggerCode(), processInstanceId)); |
|
||||||
int triggerRelationSize = existTriggers.size(); |
|
||||||
if (triggerRelationSize > 1) { |
|
||||||
// Fix https://github.com/apache/dolphinscheduler/issues/15864
|
|
||||||
// This case shouldn't happen
|
|
||||||
log.error("The COMMAND TriggerRelation of command: {} is more than one", commandId); |
|
||||||
} |
|
||||||
return existTriggers.size(); |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
@ -1,97 +0,0 @@ |
|||||||
/* |
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
|
||||||
* contributor license agreements. See the NOTICE file distributed with |
|
||||||
* this work for additional information regarding copyright ownership. |
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
|
||||||
* (the "License"); you may not use this file except in compliance with |
|
||||||
* the License. You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
*/ |
|
||||||
|
|
||||||
package org.apache.dolphinscheduler.service.process; |
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat; |
|
||||||
import static org.mockito.ArgumentMatchers.any; |
|
||||||
import static org.mockito.Mockito.doNothing; |
|
||||||
import static org.mockito.Mockito.when; |
|
||||||
|
|
||||||
import org.apache.dolphinscheduler.common.enums.ApiTriggerType; |
|
||||||
import org.apache.dolphinscheduler.dao.entity.TriggerRelation; |
|
||||||
import org.apache.dolphinscheduler.dao.mapper.TriggerRelationMapper; |
|
||||||
|
|
||||||
import java.util.Date; |
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test; |
|
||||||
import org.junit.jupiter.api.extension.ExtendWith; |
|
||||||
import org.mockito.InjectMocks; |
|
||||||
import org.mockito.Mock; |
|
||||||
import org.mockito.junit.jupiter.MockitoExtension; |
|
||||||
import org.mockito.junit.jupiter.MockitoSettings; |
|
||||||
import org.mockito.quality.Strictness; |
|
||||||
|
|
||||||
import com.google.common.collect.Lists; |
|
||||||
|
|
||||||
/** |
|
||||||
* Trigger Relation Service Test |
|
||||||
*/ |
|
||||||
@ExtendWith(MockitoExtension.class) |
|
||||||
@MockitoSettings(strictness = Strictness.LENIENT) |
|
||||||
public class TriggerRelationServiceTest { |
|
||||||
|
|
||||||
@InjectMocks |
|
||||||
private TriggerRelationServiceImpl triggerRelationService; |
|
||||||
@Mock |
|
||||||
private TriggerRelationMapper triggerRelationMapper; |
|
||||||
|
|
||||||
@Test |
|
||||||
public void saveTriggerToDb() { |
|
||||||
doNothing().when(triggerRelationMapper).upsert(any()); |
|
||||||
triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, 1234567890L, 100); |
|
||||||
} |
|
||||||
|
|
||||||
@Test |
|
||||||
public void queryByTypeAndJobId() { |
|
||||||
doNothing().when(triggerRelationMapper).upsert(any()); |
|
||||||
when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.PROCESS.getCode(), 100)) |
|
||||||
.thenReturn(Lists.newArrayList(getTriggerTdoDb())); |
|
||||||
|
|
||||||
assertThat(triggerRelationService.queryByTypeAndJobId(ApiTriggerType.PROCESS, 100)).hasSize(1); |
|
||||||
assertThat(triggerRelationService.queryByTypeAndJobId(ApiTriggerType.PROCESS, 200)).isEmpty(); |
|
||||||
} |
|
||||||
|
|
||||||
@Test |
|
||||||
public void saveCommandTrigger() { |
|
||||||
doNothing().when(triggerRelationMapper).upsert(any()); |
|
||||||
when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.PROCESS.getCode(), 100)) |
|
||||||
.thenReturn(Lists.newArrayList(getTriggerTdoDb())); |
|
||||||
assertThat(triggerRelationService.saveCommandTrigger(1234567890, 100)).isAtLeast(1); |
|
||||||
assertThat(triggerRelationService.saveCommandTrigger(1234567890, 200)).isEqualTo(0); |
|
||||||
|
|
||||||
} |
|
||||||
|
|
||||||
@Test |
|
||||||
public void saveProcessInstanceTrigger() { |
|
||||||
doNothing().when(triggerRelationMapper).upsert(any()); |
|
||||||
when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.COMMAND.getCode(), 100)) |
|
||||||
.thenReturn(Lists.newArrayList(getTriggerTdoDb())); |
|
||||||
assertThat(triggerRelationService.saveProcessInstanceTrigger(100, 1234567890)).isAtLeast(1); |
|
||||||
assertThat(triggerRelationService.saveProcessInstanceTrigger(200, 1234567890)).isEqualTo(0); |
|
||||||
} |
|
||||||
|
|
||||||
private TriggerRelation getTriggerTdoDb() { |
|
||||||
TriggerRelation triggerRelation = new TriggerRelation(); |
|
||||||
triggerRelation.setTriggerType(ApiTriggerType.PROCESS.getCode()); |
|
||||||
triggerRelation.setJobId(100); |
|
||||||
triggerRelation.setTriggerCode(1234567890L); |
|
||||||
triggerRelation.setCreateTime(new Date()); |
|
||||||
triggerRelation.setUpdateTime(new Date()); |
|
||||||
return triggerRelation; |
|
||||||
} |
|
||||||
} |
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue