Unverified Commit c28a697e authored by Gabry.wu's avatar Gabry.wu Committed by GitHub
Browse files

add RetryerUtils to retry async & sync function gracefully (#2697)



* add RetryerUtils tool class for async & sync retry invoking

* add RetryerUtils tool class for async & sync retry invoking

* add guava-retrying-2.0.0.jar to known-dependencies.txt

* fix javadoc

* fix code smell

Co-authored-by: default avatarqiaozhanwei <qiaozhanwei@outlook.com>
parent e4ee7d1d
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
@@ -594,5 +594,19 @@
			<artifactId>janino</artifactId>
			<version>${codehaus.janino.version}</version>
		</dependency>
		<dependency>
			<groupId>com.github.rholder</groupId>
			<artifactId>guava-retrying</artifactId>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
				<exclusion>
					<groupId>com.google.code.findbugs</groupId>
					<artifactId>jsr305</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
	</dependencies>
</project>
+108 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.common.utils;

import com.github.rholder.retry.*;
import org.apache.dolphinscheduler.common.Constants;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * The Retryer util.
 */
public class RetryerUtils {
    private static Retryer<Boolean> defaultRetryerResultCheck;
    private static Retryer<Boolean> defaultRetryerResultNoCheck;

    private RetryerUtils() {

    }

    private static Retryer<Boolean> getDefaultRetryerResultNoCheck() {
        if (defaultRetryerResultNoCheck == null) {
            defaultRetryerResultNoCheck = RetryerBuilder
                    .<Boolean>newBuilder()
                    .retryIfException()
                    .withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS))
                    .withStopStrategy(StopStrategies.stopAfterAttempt(3))
                    .build();
        }
        return defaultRetryerResultNoCheck;
    }

    /**
     * Gets default retryer.
     * the retryer will retry 3 times if exceptions throw
     * and wait 1 second between each retry
     *
     * @param checkResult true means the callable must return true before retrying
     *                    false means that retry callable only throw exceptions
     * @return the default retryer
     */
    public static Retryer<Boolean> getDefaultRetryer(boolean checkResult) {
        return checkResult ? getDefaultRetryer() : getDefaultRetryerResultNoCheck();
    }

    /**
     * Gets default retryer.
     * the retryer will retry 3 times if exceptions throw
     * and wait 1 second between each retry
     *
     * @return the default retryer
     */
    public static Retryer<Boolean> getDefaultRetryer() {
        if (defaultRetryerResultCheck == null) {
            defaultRetryerResultCheck = RetryerBuilder
                    .<Boolean>newBuilder()
                    .retryIfResult(Boolean.FALSE::equals)
                    .retryIfException()
                    .withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS))
                    .withStopStrategy(StopStrategies.stopAfterAttempt(3))
                    .build();
        }
        return defaultRetryerResultCheck;
    }

    /**
     * Use RETRYER to invoke the Callable
     *
     * @param callable    the callable
     * @param checkResult true means that retry callable before returning true
     *                    false means that retry callable only throw exceptions
     * @return the final result of callable
     * @throws ExecutionException the execution exception
     * @throws RetryException     the retry exception
     */
    public static Boolean retryCall(final Callable<Boolean> callable, boolean checkResult) throws ExecutionException, RetryException {
        return getDefaultRetryer(checkResult).call(callable);
    }

    /**
     * Use RETRYER to invoke the Callable before returning true
     *
     * @param callable the callable
     * @return the boolean
     * @throws ExecutionException the execution exception
     * @throws RetryException     the retry exception
     */
    public static Boolean retryCall(final Callable<Boolean> callable) throws ExecutionException, RetryException {
        return retryCall(callable, true);
    }
}
+216 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.common.utils;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.ExecutionException;

public class RetryerUtilsTest {

    @Test
    public void testDefaultRetryer() {
        Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer();
        Assert.assertNotNull(retryer);
        try {
            boolean result = retryer.call(() -> true);
            Assert.assertTrue(result);
        } catch (ExecutionException | RetryException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
        Retryer<Boolean> retryer1 = RetryerUtils.getDefaultRetryer(true);
        Assert.assertEquals(retryer, retryer1);
    }

    @Test
    public void testDefaultRetryerResultCheck() {
        Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer();
        Assert.assertNotNull(retryer);
        try {
            for (int execTarget = 1; execTarget <= 3; execTarget++) {
                int finalExecTarget = execTarget;
                int[] execTime = {0};
                boolean result = retryer.call(() -> {
                    execTime[0]++;
                    return execTime[0] == finalExecTarget;
                });
                Assert.assertEquals(finalExecTarget, execTime[0]);
                Assert.assertTrue(result);
            }
        } catch (ExecutionException | RetryException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
        int[] execTime = {0};
        try {
            retryer.call(() -> {
                execTime[0]++;
                return execTime[0] == 4;
            });
            Assert.fail("Retry times not reached");
        } catch (RetryException e) {
            Assert.assertEquals(3, e.getNumberOfFailedAttempts());
            Assert.assertEquals(3, execTime[0]);
        } catch (ExecutionException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
    }

    @Test
    public void testDefaultRetryerResultNoCheck() {
        Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer(false);
        Assert.assertNotNull(retryer);
        try {
            for (int execTarget = 1; execTarget <= 5; execTarget++) {
                int[] execTime = {0};
                boolean result = retryer.call(() -> {
                    execTime[0]++;
                    return execTime[0] > 1;
                });
                Assert.assertEquals(1, execTime[0]);
                Assert.assertFalse(result);
            }
        } catch (ExecutionException | RetryException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
    }

    @Test
    public void testRecallResultCheck() {
        try {
            for (int execTarget = 1; execTarget <= 3; execTarget++) {
                int finalExecTarget = execTarget;
                int[] execTime = {0};
                boolean result = RetryerUtils.retryCall(() -> {
                    execTime[0]++;
                    return execTime[0] == finalExecTarget;
                });
                Assert.assertEquals(finalExecTarget, execTime[0]);
                Assert.assertTrue(result);
            }
        } catch (ExecutionException | RetryException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
        int[] execTime = {0};
        try {
            RetryerUtils.retryCall(() -> {
                execTime[0]++;
                return execTime[0] == 4;
            });
            Assert.fail("Recall times not reached");
        } catch (RetryException e) {
            Assert.assertEquals(3, e.getNumberOfFailedAttempts());
            Assert.assertEquals(3, execTime[0]);
        } catch (ExecutionException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
    }

    @Test
    public void testRecallResultCheckWithPara() {
        try {
            for (int execTarget = 1; execTarget <= 3; execTarget++) {
                int finalExecTarget = execTarget;
                int[] execTime = {0};
                boolean result = RetryerUtils.retryCall(() -> {
                    execTime[0]++;
                    return execTime[0] == finalExecTarget;
                }, true);
                Assert.assertEquals(finalExecTarget, execTime[0]);
                Assert.assertTrue(result);
            }
        } catch (ExecutionException | RetryException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
        int[] execTime = {0};
        try {
            RetryerUtils.retryCall(() -> {
                execTime[0]++;
                return execTime[0] == 4;
            }, true);
            Assert.fail("Recall times not reached");
        } catch (RetryException e) {
            Assert.assertEquals(3, e.getNumberOfFailedAttempts());
            Assert.assertEquals(3, execTime[0]);
        } catch (ExecutionException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
    }

    @Test
    public void testRecallResultNoCheck() {
        try {
            for (int execTarget = 1; execTarget <= 5; execTarget++) {
                int[] execTime = {0};
                boolean result = RetryerUtils.retryCall(() -> {
                    execTime[0]++;
                    return execTime[0] > 1;
                }, false);
                Assert.assertEquals(1, execTime[0]);
                Assert.assertFalse(result);
            }
        } catch (ExecutionException | RetryException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
    }

    private void testRetryExceptionWithPara(boolean checkResult) {
        try {
            for (int execTarget = 1; execTarget <= 3; execTarget++) {
                int finalExecTarget = execTarget;
                int[] execTime = {0};
                boolean result = RetryerUtils.retryCall(() -> {
                    execTime[0]++;
                    if (execTime[0] != finalExecTarget) {
                        throw new IllegalArgumentException(String.valueOf(execTime[0]));
                    }
                    return true;
                }, checkResult);
                Assert.assertEquals(finalExecTarget, execTime[0]);
                Assert.assertTrue(result);
            }
        } catch (ExecutionException | RetryException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
        int[] execTime = {0};
        try {
            RetryerUtils.retryCall(() -> {
                execTime[0]++;
                if (execTime[0] != 4) {
                    throw new IllegalArgumentException(String.valueOf(execTime[0]));
                }
                return true;
            }, checkResult);
            Assert.fail("Recall times not reached");
        } catch (RetryException e) {
            Assert.assertEquals(3, e.getNumberOfFailedAttempts());
            Assert.assertEquals(3, execTime[0]);
            Assert.assertNotNull(e.getCause());
            Assert.assertEquals(3, Integer.parseInt(e.getCause().getMessage()));
        } catch (ExecutionException e) {
            Assert.fail("Retry call failed " + e.getMessage());
        }
    }

    @Test
    public void testRetryException() {
        testRetryExceptionWithPara(true);
        testRetryExceptionWithPara(false);
    }
}
+14 −13
Original line number Diff line number Diff line
@@ -20,11 +20,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import com.github.rholder.retry.RetryException;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
@@ -43,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

/**
@@ -101,21 +104,19 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
        taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
                new NettyRemoteChannel(channel, command.getOpaque()));

        try {
            this.doAck(taskExecutionContext);
        }catch (Exception e){
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
            this.doAck(taskExecutionContext);
        }
        // tell master that task is in executing
        final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command();
        
        try {
            RetryerUtils.retryCall(() -> {
                taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand);
                return Boolean.TRUE;
            });
            // submit task
            workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
        } catch (ExecutionException | RetryException e) {
            logger.error(e.getMessage(), e);
        }

    private void doAck(TaskExecutionContext taskExecutionContext){
        // tell master that task is in executing
        TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
        taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
    }

    /**
+8 −0
Original line number Diff line number Diff line
@@ -118,6 +118,7 @@
        <servlet-api.version>2.5</servlet-api.version>
        <swagger.version>1.9.3</swagger.version>
        <springfox.version>2.9.2</springfox.version>
        <guava-retry.version>2.0.0</guava-retry.version>
    </properties>

    <dependencyManagement>
@@ -544,6 +545,12 @@
                <artifactId>swagger-bootstrap-ui</artifactId>
                <version>${swagger.version}</version>
            </dependency>

            <dependency>
                <groupId>com.github.rholder</groupId>
                <artifactId>guava-retrying</artifactId>
                <version>${guava-retry.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

@@ -771,6 +778,7 @@
                        <include>**/common/utils/HttpUtilsTest.java</include>
                        <include>**/common/ConstantsTest.java</include>
                        <include>**/common/utils/HadoopUtils.java</include>
                        <include>**/common/utils/RetryerUtilsTest.java</include>
                        <include>**/common/plugin/FilePluginManagerTest</include>
                        <include>**/common/plugin/PluginClassLoaderTest</include>
                        <include>**/dao/mapper/AccessTokenMapperTest.java</include>
Loading