diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 3954159dfa..4d4ccede26 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -594,5 +594,19 @@ janino ${codehaus.janino.version} + + com.github.rholder + guava-retrying + + + com.google.guava + guava + + + com.google.code.findbugs + jsr305 + + + diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java new file mode 100644 index 0000000000..e6f4c502de --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import com.github.rholder.retry.*; +import org.apache.dolphinscheduler.common.Constants; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * The Retryer util. + */ +public class RetryerUtils { + private static Retryer defaultRetryerResultCheck; + private static Retryer defaultRetryerResultNoCheck; + + private RetryerUtils() { + + } + + private static Retryer getDefaultRetryerResultNoCheck() { + if (defaultRetryerResultNoCheck == null) { + defaultRetryerResultNoCheck = RetryerBuilder + .newBuilder() + .retryIfException() + .withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS)) + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + } + return defaultRetryerResultNoCheck; + } + + /** + * Gets default retryer. + * the retryer will retry 3 times if exceptions throw + * and wait 1 second between each retry + * + * @param checkResult true means the callable must return true before retrying + * false means that retry callable only throw exceptions + * @return the default retryer + */ + public static Retryer getDefaultRetryer(boolean checkResult) { + return checkResult ? getDefaultRetryer() : getDefaultRetryerResultNoCheck(); + } + + /** + * Gets default retryer. + * the retryer will retry 3 times if exceptions throw + * and wait 1 second between each retry + * + * @return the default retryer + */ + public static Retryer getDefaultRetryer() { + if (defaultRetryerResultCheck == null) { + defaultRetryerResultCheck = RetryerBuilder + .newBuilder() + .retryIfResult(Boolean.FALSE::equals) + .retryIfException() + .withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS)) + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + } + return defaultRetryerResultCheck; + } + + /** + * Use RETRYER to invoke the Callable + * + * @param callable the callable + * @param checkResult true means that retry callable before returning true + * false means that retry callable only throw exceptions + * @return the final result of callable + * @throws ExecutionException the execution exception + * @throws RetryException the retry exception + */ + public static Boolean retryCall(final Callable callable, boolean checkResult) throws ExecutionException, RetryException { + return getDefaultRetryer(checkResult).call(callable); + } + + /** + * Use RETRYER to invoke the Callable before returning true + * + * @param callable the callable + * @return the boolean + * @throws ExecutionException the execution exception + * @throws RetryException the retry exception + */ + public static Boolean retryCall(final Callable callable) throws ExecutionException, RetryException { + return retryCall(callable, true); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java new file mode 100644 index 0000000000..7841e46585 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; + +public class RetryerUtilsTest { + + @Test + public void testDefaultRetryer() { + Retryer retryer = RetryerUtils.getDefaultRetryer(); + Assert.assertNotNull(retryer); + try { + boolean result = retryer.call(() -> true); + Assert.assertTrue(result); + } catch (ExecutionException | RetryException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + Retryer retryer1 = RetryerUtils.getDefaultRetryer(true); + Assert.assertEquals(retryer, retryer1); + } + + @Test + public void testDefaultRetryerResultCheck() { + Retryer retryer = RetryerUtils.getDefaultRetryer(); + Assert.assertNotNull(retryer); + try { + for (int execTarget = 1; execTarget <= 3; execTarget++) { + int finalExecTarget = execTarget; + int[] execTime = {0}; + boolean result = retryer.call(() -> { + execTime[0]++; + return execTime[0] == finalExecTarget; + }); + Assert.assertEquals(finalExecTarget, execTime[0]); + Assert.assertTrue(result); + } + } catch (ExecutionException | RetryException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + int[] execTime = {0}; + try { + retryer.call(() -> { + execTime[0]++; + return execTime[0] == 4; + }); + Assert.fail("Retry times not reached"); + } catch (RetryException e) { + Assert.assertEquals(3, e.getNumberOfFailedAttempts()); + Assert.assertEquals(3, execTime[0]); + } catch (ExecutionException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + } + + @Test + public void testDefaultRetryerResultNoCheck() { + Retryer retryer = RetryerUtils.getDefaultRetryer(false); + Assert.assertNotNull(retryer); + try { + for (int execTarget = 1; execTarget <= 5; execTarget++) { + int[] execTime = {0}; + boolean result = retryer.call(() -> { + execTime[0]++; + return execTime[0] > 1; + }); + Assert.assertEquals(1, execTime[0]); + Assert.assertFalse(result); + } + } catch (ExecutionException | RetryException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + } + + @Test + public void testRecallResultCheck() { + try { + for (int execTarget = 1; execTarget <= 3; execTarget++) { + int finalExecTarget = execTarget; + int[] execTime = {0}; + boolean result = RetryerUtils.retryCall(() -> { + execTime[0]++; + return execTime[0] == finalExecTarget; + }); + Assert.assertEquals(finalExecTarget, execTime[0]); + Assert.assertTrue(result); + } + } catch (ExecutionException | RetryException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + int[] execTime = {0}; + try { + RetryerUtils.retryCall(() -> { + execTime[0]++; + return execTime[0] == 4; + }); + Assert.fail("Recall times not reached"); + } catch (RetryException e) { + Assert.assertEquals(3, e.getNumberOfFailedAttempts()); + Assert.assertEquals(3, execTime[0]); + } catch (ExecutionException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + } + + @Test + public void testRecallResultCheckWithPara() { + try { + for (int execTarget = 1; execTarget <= 3; execTarget++) { + int finalExecTarget = execTarget; + int[] execTime = {0}; + boolean result = RetryerUtils.retryCall(() -> { + execTime[0]++; + return execTime[0] == finalExecTarget; + }, true); + Assert.assertEquals(finalExecTarget, execTime[0]); + Assert.assertTrue(result); + } + } catch (ExecutionException | RetryException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + int[] execTime = {0}; + try { + RetryerUtils.retryCall(() -> { + execTime[0]++; + return execTime[0] == 4; + }, true); + Assert.fail("Recall times not reached"); + } catch (RetryException e) { + Assert.assertEquals(3, e.getNumberOfFailedAttempts()); + Assert.assertEquals(3, execTime[0]); + } catch (ExecutionException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + } + + @Test + public void testRecallResultNoCheck() { + try { + for (int execTarget = 1; execTarget <= 5; execTarget++) { + int[] execTime = {0}; + boolean result = RetryerUtils.retryCall(() -> { + execTime[0]++; + return execTime[0] > 1; + }, false); + Assert.assertEquals(1, execTime[0]); + Assert.assertFalse(result); + } + } catch (ExecutionException | RetryException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + } + + private void testRetryExceptionWithPara(boolean checkResult) { + try { + for (int execTarget = 1; execTarget <= 3; execTarget++) { + int finalExecTarget = execTarget; + int[] execTime = {0}; + boolean result = RetryerUtils.retryCall(() -> { + execTime[0]++; + if (execTime[0] != finalExecTarget) { + throw new IllegalArgumentException(String.valueOf(execTime[0])); + } + return true; + }, checkResult); + Assert.assertEquals(finalExecTarget, execTime[0]); + Assert.assertTrue(result); + } + } catch (ExecutionException | RetryException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + int[] execTime = {0}; + try { + RetryerUtils.retryCall(() -> { + execTime[0]++; + if (execTime[0] != 4) { + throw new IllegalArgumentException(String.valueOf(execTime[0])); + } + return true; + }, checkResult); + Assert.fail("Recall times not reached"); + } catch (RetryException e) { + Assert.assertEquals(3, e.getNumberOfFailedAttempts()); + Assert.assertEquals(3, execTime[0]); + Assert.assertNotNull(e.getCause()); + Assert.assertEquals(3, Integer.parseInt(e.getCause().getMessage())); + } catch (ExecutionException e) { + Assert.fail("Retry call failed " + e.getMessage()); + } + } + + @Test + public void testRetryException() { + testRetryExceptionWithPara(true); + testRetryExceptionWithPara(false); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 4ca110f42b..ae26b8d7dd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -20,11 +20,13 @@ package org.apache.dolphinscheduler.server.worker.processor; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.sift.SiftingAppender; import com.alibaba.fastjson.JSONObject; +import com.github.rholder.retry.RetryException; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; @@ -43,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; /** @@ -101,21 +104,19 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); + // tell master that task is in executing + final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command(); + try { - this.doAck(taskExecutionContext); - }catch (Exception e){ - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - this.doAck(taskExecutionContext); + RetryerUtils.retryCall(() -> { + taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand); + return Boolean.TRUE; + }); + // submit task + workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService)); + } catch (ExecutionException | RetryException e) { + logger.error(e.getMessage(), e); } - - // submit task - workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService)); - } - - private void doAck(TaskExecutionContext taskExecutionContext){ - // tell master that task is in executing - TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext); - taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command()); } /** diff --git a/pom.xml b/pom.xml index 48835981ce..5dca76ce4c 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,7 @@ 2.5 1.9.3 2.9.2 + 2.0.0 @@ -544,6 +545,12 @@ swagger-bootstrap-ui ${swagger.version} + + + com.github.rholder + guava-retrying + ${guava-retry.version} + @@ -771,6 +778,7 @@ **/common/utils/HttpUtilsTest.java **/common/ConstantsTest.java **/common/utils/HadoopUtils.java + **/common/utils/RetryerUtilsTest.java **/common/plugin/FilePluginManagerTest **/common/plugin/PluginClassLoaderTest **/dao/mapper/AccessTokenMapperTest.java diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 6601cf9c34..b1c7f5f806 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -209,3 +209,4 @@ xml-apis-1.4.01.jar xmlenc-0.52.jar xz-1.0.jar zookeeper-3.4.14.jar +guava-retrying-2.0.0.jar