Skip to content

Commit

Permalink
Add exponential backoff and jitter for retries
Browse files Browse the repository at this point in the history
Adds exponential backoff by multiplying the `retryTimeoutMs` by
`2^attempt`. Adds a random jitter to spread out the retries. Although
in the initial implementation the retry function isn't available to the
user the code is written in a way that makes it easier to change that.

Refactors the tests to avoid using `Thread#sleep`, which is not only a
bad practice producing not deterministic tests but also makes them very
slow. A reduction of more that 90% of testing time is percieved for the
tests where this was changed.

Resolves: #194
  • Loading branch information
acm19 committed Jan 2, 2024
1 parent f3a17b2 commit 1d4284d
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 62 deletions.
4 changes: 2 additions & 2 deletions docs/docus/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ Most Loki4j appender settings are optional. These few that are required are mark
|batchMaxBytes|4194304|Max number of bytes a single batch can contain (as counted by Loki). This value should not be greater than `server.grpc_server_max_recv_msg_size` in your Loki config|
|batchTimeoutMs|60000|Max time in milliseconds to keep a batch before sending it to Loki, even if max items/bytes limits for this batch are not reached|
|sendQueueMaxBytes|41943040|Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped|
|maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried in case of `ConnectException` or `503` status from Loki. Also, if `dropRateLimitedBatches=false` and status code is `429`, the request will be retried. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates|
|retryTimeoutMs|60000|Time in milliseconds to wait before the next attempt to re-send the failed batch|
|maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried in case of `ConnectException` or `503` status from Loki. Also, if `dropRateLimitedBatches=false` and status code is `429`, the request will be retried. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates. Allowed values from `1` to `30`, if set outside of this range the maximum allowed value is used instead|
|retryTimeoutMs|60000|Base time in milliseconds to wait before the next attempt to re-send the failed batch. Batches are retried with an exponential backoff (multiplying this value by `2^attempt`) and jitter. Allowed values from `1` to `86_400_000` (a day in milliseconds), if set outside of this range the maximum allowed value is used instead|
|dropRateLimitedBatches|false|Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). This reduces impacts on batches from other tenants, which could end up being delayed or dropped due to backoff.|
|internalQueuesCheckTimeoutMs|25|A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage|
|useDirectBuffers|true|Use off-heap memory for storing intermediate data|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import com.github.loki4j.client.batch.Batcher;
Expand Down Expand Up @@ -72,6 +73,12 @@ public final class AsyncBufferPipeline {

private final long retryTimeoutMs;

/**
* Sleep function that receives attempts and a timeout and sleeps depending on the
* value of those parameters. Used when retries are enabled.
*/
private final BiFunction<Integer, Long, Boolean> sleep;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
Expand Down Expand Up @@ -114,6 +121,7 @@ public AsyncBufferPipeline(PipelineConfig conf) {
drainOnStop = conf.drainOnStop;
maxRetries = conf.maxRetries;
retryTimeoutMs = conf.retryTimeoutMs;
sleep = conf.sleep;
dropRateLimitedBatches = conf.dropRateLimitedBatches;
parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.internalQueuesCheckTimeoutMs);
this.log = conf.internalLoggingFactory.apply(this);
Expand Down Expand Up @@ -326,7 +334,7 @@ private LokiResponse sendBatch(BinaryBatch batch) {
++retry <= maxRetries
&& checkIfEligibleForRetry(e, r)
&& reportRetryFailed(e, r)
&& sleep(retryTimeoutMs));
&& sleep.apply(retry, retryTimeoutMs));

if (metrics != null) metrics.batchSendFailed(sendErrorReasonProvider(e, r));
return null;
Expand Down Expand Up @@ -368,15 +376,6 @@ private boolean shouldRetryRateLimitedBatches(int status) {
return status == TOO_MANY_REQUEST_HTTP_STATUS && !dropRateLimitedBatches;
}

private boolean sleep(long timeoutMs) {
try {
Thread.sleep(timeoutMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
}

void waitSendQueueLessThan(int size, long timeoutMs) {
var timeoutNs = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
var elapsedNs = 0L;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.github.loki4j.client.pipeline;

import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import com.github.loki4j.client.http.ApacheHttpClient;
import com.github.loki4j.client.http.HttpConfig;
Expand Down Expand Up @@ -95,6 +97,12 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
*/
public final long retryTimeoutMs;

/**
* Sleep function that receives attempts and a timeout and sleeps depending on the
* value of those parameters.
*/
public final BiFunction<Integer, Long, Boolean> sleep;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
Expand Down Expand Up @@ -147,10 +155,11 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
*/
public final Function<Object, Loki4jLogger> internalLoggingFactory;

public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long batchTimeoutMs, boolean sortByTime,
boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs, boolean dropRateLimitedBatches,
long internalQueuesCheckTimeoutMs, boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled,
WriterFactory writerFactory, HttpConfig httpConfig, Function<HttpConfig, Loki4jHttpClient> httpClientFactory,
private PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long batchTimeoutMs, boolean sortByTime,
boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs,
BiFunction<Integer, Long, Boolean> sleep, boolean dropRateLimitedBatches, long internalQueuesCheckTimeoutMs,
boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled, WriterFactory writerFactory,
HttpConfig httpConfig, Function<HttpConfig, Loki4jHttpClient> httpClientFactory,
Function<Object, Loki4jLogger> internalLoggingFactory) {
this.name = name;
this.batchMaxItems = batchMaxItems;
Expand All @@ -161,6 +170,7 @@ public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long ba
this.sendQueueMaxBytes = sendQueueMaxBytes;
this.maxRetries = maxRetries;
this.retryTimeoutMs = retryTimeoutMs;
this.sleep = sleep;
this.dropRateLimitedBatches = dropRateLimitedBatches;
this.internalQueuesCheckTimeoutMs = internalQueuesCheckTimeoutMs;
this.useDirectBuffers = useDirectBuffers;
Expand All @@ -187,6 +197,17 @@ public static class Builder {
private long sendQueueMaxBytes = batchMaxBytes * 10;
private int maxRetries = 2;
private long retryTimeoutMs = 60 * 1000;
private Supplier<Long> jitter = new Jitter();
private BiFunction<Integer, Long, Boolean> sleep = (attempt, timeout) -> {
try {
long backoff = timeout * (2 << (attempt - 1));
Thread.sleep(backoff + jitter.get());
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
};
private boolean dropRateLimitedBatches = false;
private long internalQueuesCheckTimeoutMs = 25;
private boolean useDirectBuffers = true;
Expand All @@ -199,24 +220,25 @@ public static class Builder {

public PipelineConfig build() {
return new PipelineConfig(
name,
batchMaxItems,
batchMaxBytes,
batchTimeoutMs,
sortByTime,
staticLabels,
sendQueueMaxBytes,
maxRetries,
retryTimeoutMs,
dropRateLimitedBatches,
internalQueuesCheckTimeoutMs,
useDirectBuffers,
drainOnStop,
metricsEnabled,
writer,
httpConfigBuilder.build(writer.contentType),
httpClientFactory,
internalLoggingFactory);
name,
batchMaxItems,
batchMaxBytes,
batchTimeoutMs,
sortByTime,
staticLabels,
sendQueueMaxBytes,
maxRetries,
retryTimeoutMs,
sleep,
dropRateLimitedBatches,
internalQueuesCheckTimeoutMs,
useDirectBuffers,
drainOnStop,
metricsEnabled,
writer,
httpConfigBuilder.build(writer.contentType),
httpClientFactory,
internalLoggingFactory);
}

public Builder setName(String name) {
Expand Down Expand Up @@ -264,6 +286,11 @@ public Builder setRetryTimeoutMs(long retryTimeoutMs) {
return this;
}

public Builder setSleep(BiFunction<Integer, Long, Boolean> sleep) {
this.sleep = sleep;
return this;
}

public Builder setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
this.dropRateLimitedBatches = dropRateLimitedBatches;
return this;
Expand Down Expand Up @@ -311,6 +338,19 @@ public Builder setInternalLoggingFactory(Function<Object, Loki4jLogger> internal

}


private static class Jitter implements Supplier<Long> {

private static final int JITTER_BOUND = 1000;
private final ThreadLocal<Random> jitter = ThreadLocal.withInitial(Random::new);

@Override
public Long get() {
return Long.valueOf(jitter.get().nextInt(JITTER_BOUND));
}

}

/**
* A factory for Writer.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.github.loki4j.logback;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

import com.github.loki4j.client.pipeline.AsyncBufferPipeline;
import com.github.loki4j.client.pipeline.PipelineConfig;
import com.github.loki4j.client.pipeline.PipelineConfig.Builder;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
Expand All @@ -15,6 +17,11 @@
*/
public class Loki4jAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

private static final int MAX_RETRIES = 30;
/**
* A day in milliseconds.
*/
private static final long MAX_RETRY_TIMEOUT_MS = 86_400_000L;
/**
* Max number of events to put into a single batch before sending it to Loki.
*/
Expand Down Expand Up @@ -104,6 +111,8 @@ public class Loki4jAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
*/
private AtomicLong droppedEventsCount = new AtomicLong(0L);

private PipelineConfig.Builder pipelineBuilder = PipelineConfig.builder();

@Override
public void start() {
if (getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) {
Expand Down Expand Up @@ -134,7 +143,7 @@ public void start() {
sender = new JavaHttpSender();
}

PipelineConfig pipelineConf = PipelineConfig.builder()
PipelineConfig pipelineConf = pipelineBuilder
.setName(this.getName() == null ? "none" : this.getName())
.setBatchMaxItems(batchMaxItems)
.setBatchMaxBytes(batchMaxBytes)
Expand Down Expand Up @@ -231,10 +240,20 @@ public void setSendQueueMaxBytes(long sendQueueMaxBytes) {
}

public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
if (maxRetries > 0 && maxRetries <= MAX_RETRIES) {
this.maxRetries = maxRetries;
} else {
this.maxRetries = MAX_RETRIES;
addError("Invalid value for `maxRetries`, using " + MAX_RETRIES + " instead.");
}
}
public void setRetryTimeoutMs(long retryTimeoutMs) {
this.retryTimeoutMs = retryTimeoutMs;
if (retryTimeoutMs > 0 && retryTimeoutMs <= MAX_RETRY_TIMEOUT_MS) {
this.retryTimeoutMs = retryTimeoutMs;
} else {
this.retryTimeoutMs = MAX_RETRY_TIMEOUT_MS;
addError("Invalid value for `retryTimeoutMs`, using " + MAX_RETRY_TIMEOUT_MS + " instead.");
}
}

public void setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
Expand All @@ -244,7 +263,7 @@ public void setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
/**
* "format" instead of "encoder" in the name allows to specify
* the default implementation, so users don't have to write
* full-qualified class name by default
* full-qualified class name by default.
*/
@DefaultClass(JsonEncoder.class)
public void setFormat(Loki4jEncoder encoder) {
Expand All @@ -257,7 +276,7 @@ HttpSender getSender() {

/**
* "http" instead of "sender" is just to have a more clear name
* for the configuration section
* for the configuration section.
*/
@DefaultClass(JavaHttpSender.class)
public void setHttp(HttpSender sender) {
Expand All @@ -277,4 +296,14 @@ public void setUseDirectBuffers(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers;
}

/**
* Sets the PipelineBuilder, this method is intended for testing
* purpose exclusively.
*
* @param pipelineBuilder the pipeline builder
*/
void setPipelineBuilder(Builder pipelineBuilder) {
this.pipelineBuilder = Objects.requireNonNull(pipelineBuilder);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import ch.qos.logback.core.spi.LifeCycle;

/**
* Basic interface for all Loki4j encoders
* Basic interface for all Loki4j encoders.
*/
public interface Loki4jEncoder extends ContextAware, LifeCycle {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@

import static com.github.loki4j.testkit.dummy.Generators.genMessage;

import java.util.function.IntSupplier;

public class Generators {

private static String LABELS_MESSAGE_SEPARATOR = " %%% ";
private static final String LABELS_MESSAGE_SEPARATOR = " %%% ";

static HttpConfig.Builder defaultHttpConfig = HttpConfig.builder();

Expand Down Expand Up @@ -252,7 +254,7 @@ else if (lev < 0.9)
ExceptionGenerator.exception(msg)));
}

return events.toArray(new LoggingEvent[0]);
return events.toArray(LoggingEvent[]::new);
}

public static LoggingEvent loggingEvent(
Expand Down Expand Up @@ -457,10 +459,12 @@ public static class FailingHttpClient extends DummyHttpClient {
public final AtomicBoolean fail = new AtomicBoolean(false);
public final AtomicBoolean rateLimited = new AtomicBoolean(false);
public volatile int sendCount = 0;
public IntSupplier barrier = () -> 0;
@Override
public LokiResponse send(ByteBuffer batch) throws Exception {
sendCount++;
var response = super.send(batch);
barrier.getAsInt();
if (fail.get() && !rateLimited.get())
throw new ConnectException("Text exception");
else if (fail.get() && rateLimited.get())
Expand Down
Loading

0 comments on commit 1d4284d

Please sign in to comment.