Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exponential backoff and jitter for retries #204

Merged
merged 3 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/docus/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ Most Loki4j appender settings are optional. These few that are required are mark
|batchMaxBytes|4194304|Max number of bytes a single batch can contain (as counted by Loki). This value should not be greater than `server.grpc_server_max_recv_msg_size` in your Loki config|
|batchTimeoutMs|60000|Max time in milliseconds to keep a batch before sending it to Loki, even if max items/bytes limits for this batch are not reached|
|sendQueueMaxBytes|41943040|Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped|
|maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried in case of `ConnectException` or `503` status from Loki. Also, if `dropRateLimitedBatches=false` and status code is `429`, the request will be retried. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates|
|retryTimeoutMs|60000|Time in milliseconds to wait before the next attempt to re-send the failed batch|
|maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried in case of `ConnectException` or `503` status from Loki. Also, if `dropRateLimitedBatches=false` and status code is `429`, the request will be retried. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates. Allowed values from `1` to `30`, if set outside of this range the maximum allowed value is used instead|
|retryTimeoutMs|60000|Base time in milliseconds to wait before the next attempt to re-send the failed batch. Batches are retried with an exponential backoff (multiplying this value by `2^attempt`) and jitter. Allowed values from `1` to `86_400_000` (a day in milliseconds), if set outside of this range the maximum allowed value is used instead|
|maxJitterMs|1000|Upper bound in milliseconds for a jitter added to the retries.|
|dropRateLimitedBatches|false|Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). This reduces impacts on batches from other tenants, which could end up being delayed or dropped due to backoff.|
|internalQueuesCheckTimeoutMs|25|A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage|
|useDirectBuffers|true|Use off-heap memory for storing intermediate data|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import com.github.loki4j.client.batch.Batcher;
Expand Down Expand Up @@ -72,6 +73,12 @@ public final class AsyncBufferPipeline {

private final long retryTimeoutMs;

/**
* Sleep function that receives attempts and a timeout and sleeps depending on the
* value of those parameters. Used when retries are enabled.
*/
private final BiFunction<Integer, Long, Boolean> sleep;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
Expand Down Expand Up @@ -114,6 +121,7 @@ public AsyncBufferPipeline(PipelineConfig conf) {
drainOnStop = conf.drainOnStop;
maxRetries = conf.maxRetries;
retryTimeoutMs = conf.retryTimeoutMs;
sleep = conf.sleep;
dropRateLimitedBatches = conf.dropRateLimitedBatches;
parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.internalQueuesCheckTimeoutMs);
this.log = conf.internalLoggingFactory.apply(this);
Expand Down Expand Up @@ -326,7 +334,7 @@ private LokiResponse sendBatch(BinaryBatch batch) {
++retry <= maxRetries
&& checkIfEligibleForRetry(e, r)
&& reportRetryFailed(e, r)
&& sleep(retryTimeoutMs));
&& sleep.apply(retry, retryTimeoutMs));

if (metrics != null) metrics.batchSendFailed(sendErrorReasonProvider(e, r));
return null;
Expand Down Expand Up @@ -368,15 +376,6 @@ private boolean shouldRetryRateLimitedBatches(int status) {
return status == TOO_MANY_REQUEST_HTTP_STATUS && !dropRateLimitedBatches;
}

private boolean sleep(long timeoutMs) {
try {
Thread.sleep(timeoutMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
}

void waitSendQueueLessThan(int size, long timeoutMs) {
var timeoutNs = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
var elapsedNs = 0L;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.github.loki4j.client.pipeline;

import java.util.Random;
import java.util.function.Function;

class Jitter {

private final Function<Integer, Long> randomSupplier;
private final int maxJitter;

Jitter(int maxJitter) {
randomSupplier = maxJitter > 0
? new RandomFunction()
: ignored -> 0L;
this.maxJitter = maxJitter;
}

long generate() {
return randomSupplier.apply(maxJitter);
}

private static class RandomFunction implements Function<Integer, Long> {
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Random::new);

@Override
public Long apply(Integer maxJitter) {
return Long.valueOf(random.get().nextInt(maxJitter));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import com.github.loki4j.client.http.ApacheHttpClient;
import com.github.loki4j.client.http.HttpConfig;
Expand Down Expand Up @@ -95,6 +96,12 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
*/
public final long retryTimeoutMs;

/**
* Sleep function that receives attempts and a timeout and sleeps depending on the
* value of those parameters.
*/
public final BiFunction<Integer, Long, Boolean> sleep;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
Expand Down Expand Up @@ -147,10 +154,11 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
*/
public final Function<Object, Loki4jLogger> internalLoggingFactory;

public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long batchTimeoutMs, boolean sortByTime,
boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs, boolean dropRateLimitedBatches,
long internalQueuesCheckTimeoutMs, boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled,
WriterFactory writerFactory, HttpConfig httpConfig, Function<HttpConfig, Loki4jHttpClient> httpClientFactory,
private PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long batchTimeoutMs, boolean sortByTime,
boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs,
BiFunction<Integer, Long, Boolean> sleep, boolean dropRateLimitedBatches, long internalQueuesCheckTimeoutMs,
boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled, WriterFactory writerFactory,
HttpConfig httpConfig, Function<HttpConfig, Loki4jHttpClient> httpClientFactory,
Function<Object, Loki4jLogger> internalLoggingFactory) {
this.name = name;
this.batchMaxItems = batchMaxItems;
Expand All @@ -161,6 +169,7 @@ public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long ba
this.sendQueueMaxBytes = sendQueueMaxBytes;
this.maxRetries = maxRetries;
this.retryTimeoutMs = retryTimeoutMs;
this.sleep = sleep;
this.dropRateLimitedBatches = dropRateLimitedBatches;
this.internalQueuesCheckTimeoutMs = internalQueuesCheckTimeoutMs;
this.useDirectBuffers = useDirectBuffers;
Expand All @@ -187,6 +196,17 @@ public static class Builder {
private long sendQueueMaxBytes = batchMaxBytes * 10;
private int maxRetries = 2;
private long retryTimeoutMs = 60 * 1000;
private Jitter jitter = new Jitter(1000);
private BiFunction<Integer, Long, Boolean> sleep = (attempt, timeoutMs) -> {
try {
long backoffMs = timeoutMs * (1 << (attempt - 1));
Thread.sleep(backoffMs + jitter.generate());
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
};
private boolean dropRateLimitedBatches = false;
private long internalQueuesCheckTimeoutMs = 25;
private boolean useDirectBuffers = true;
Expand All @@ -199,24 +219,25 @@ public static class Builder {

public PipelineConfig build() {
return new PipelineConfig(
name,
batchMaxItems,
batchMaxBytes,
batchTimeoutMs,
sortByTime,
staticLabels,
sendQueueMaxBytes,
maxRetries,
retryTimeoutMs,
dropRateLimitedBatches,
internalQueuesCheckTimeoutMs,
useDirectBuffers,
drainOnStop,
metricsEnabled,
writer,
httpConfigBuilder.build(writer.contentType),
httpClientFactory,
internalLoggingFactory);
name,
batchMaxItems,
batchMaxBytes,
batchTimeoutMs,
sortByTime,
staticLabels,
sendQueueMaxBytes,
maxRetries,
retryTimeoutMs,
sleep,
dropRateLimitedBatches,
internalQueuesCheckTimeoutMs,
useDirectBuffers,
drainOnStop,
metricsEnabled,
writer,
httpConfigBuilder.build(writer.contentType),
httpClientFactory,
internalLoggingFactory);
}

public Builder setName(String name) {
Expand Down Expand Up @@ -264,6 +285,16 @@ public Builder setRetryTimeoutMs(long retryTimeoutMs) {
return this;
}

public Builder setMaxJitterMs(int maxJitterMs) {
this.jitter = new Jitter(maxJitterMs);
return this;
}

public Builder setSleep(BiFunction<Integer, Long, Boolean> sleep) {
this.sleep = sleep;
return this;
}

public Builder setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
this.dropRateLimitedBatches = dropRateLimitedBatches;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.github.loki4j.logback;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

import com.github.loki4j.client.pipeline.AsyncBufferPipeline;
import com.github.loki4j.client.pipeline.PipelineConfig;
import com.github.loki4j.client.pipeline.PipelineConfig.Builder;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
Expand All @@ -15,6 +17,11 @@
*/
public class Loki4jAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

private static final int MAX_RETRIES = 30;
/**
* A day in milliseconds.
*/
private static final long MAX_RETRY_TIMEOUT_MS = 86_400_000L;
/**
* Max number of events to put into a single batch before sending it to Loki.
*/
Expand Down Expand Up @@ -49,6 +56,11 @@ public class Loki4jAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
*/
private long retryTimeoutMs = 60 * 1000;

/**
* Upper bound in milliseconds for a jitter added to the retries.
*/
private int maxJitterMs = 1000;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
Expand Down Expand Up @@ -104,6 +116,8 @@ public class Loki4jAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
*/
private AtomicLong droppedEventsCount = new AtomicLong(0L);

private PipelineConfig.Builder pipelineBuilder = PipelineConfig.builder();

@Override
public void start() {
if (getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) {
Expand Down Expand Up @@ -134,7 +148,7 @@ public void start() {
sender = new JavaHttpSender();
}

PipelineConfig pipelineConf = PipelineConfig.builder()
PipelineConfig pipelineConf = pipelineBuilder
.setName(this.getName() == null ? "none" : this.getName())
.setBatchMaxItems(batchMaxItems)
.setBatchMaxBytes(batchMaxBytes)
Expand All @@ -144,6 +158,7 @@ public void start() {
.setSendQueueMaxBytes(sendQueueMaxBytes)
.setMaxRetries(maxRetries)
.setRetryTimeoutMs(retryTimeoutMs)
.setMaxJitterMs(maxJitterMs)
.setDropRateLimitedBatches(dropRateLimitedBatches)
.setInternalQueuesCheckTimeoutMs(internalQueuesCheckTimeoutMs)
.setUseDirectBuffers(useDirectBuffers)
Expand Down Expand Up @@ -231,10 +246,24 @@ public void setSendQueueMaxBytes(long sendQueueMaxBytes) {
}

public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
if (maxRetries > MAX_RETRIES) {
addWarn("Invalid value for `maxRetries`, using " + MAX_RETRIES + " instead.");
this.maxRetries = MAX_RETRIES;
} else {
this.maxRetries = maxRetries;
}
}
public void setRetryTimeoutMs(long retryTimeoutMs) {
this.retryTimeoutMs = retryTimeoutMs;
if (retryTimeoutMs > MAX_RETRY_TIMEOUT_MS) {
addWarn("Invalid value for `retryTimeoutMs`, using " + MAX_RETRY_TIMEOUT_MS + " instead.");
this.retryTimeoutMs = MAX_RETRY_TIMEOUT_MS;
} else {
this.retryTimeoutMs = retryTimeoutMs;
}
}

public void setMaxJitterMs(int maxJitterMs) {
this.maxJitterMs = maxJitterMs;
}

public void setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
Expand All @@ -244,7 +273,7 @@ public void setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
/**
* "format" instead of "encoder" in the name allows to specify
* the default implementation, so users don't have to write
* full-qualified class name by default
* full-qualified class name by default.
*/
@DefaultClass(JsonEncoder.class)
public void setFormat(Loki4jEncoder encoder) {
Expand All @@ -257,7 +286,7 @@ HttpSender getSender() {

/**
* "http" instead of "sender" is just to have a more clear name
* for the configuration section
* for the configuration section.
*/
@DefaultClass(JavaHttpSender.class)
public void setHttp(HttpSender sender) {
Expand All @@ -277,4 +306,14 @@ public void setUseDirectBuffers(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers;
}

/**
* Sets the PipelineBuilder, this method is intended for testing
* purpose exclusively.
*
* @param pipelineBuilder the pipeline builder
*/
void setPipelineBuilder(Builder pipelineBuilder) {
this.pipelineBuilder = Objects.requireNonNull(pipelineBuilder);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import ch.qos.logback.core.spi.LifeCycle;

/**
* Basic interface for all Loki4j encoders
* Basic interface for all Loki4j encoders.
*/
public interface Loki4jEncoder extends ContextAware, LifeCycle {

Expand Down
Loading
Loading