Skip to content

Commit

Permalink
Add support for retries on 429 rate limited requests
Browse files Browse the repository at this point in the history
Adds an option to enable/disable retries on rate limit errors:
`dropRateLimitedBatches`. It's enabled by default to keep consistency
with Promtail options. Also because Loki scales less aggressively and
relies on retries when the load is spiky.

Resolves: #192
  • Loading branch information
acm19 committed Nov 24, 2023
1 parent 186b338 commit 6396c9d
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 67 deletions.
29 changes: 15 additions & 14 deletions docs/docus/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ Most Loki4j appender settings are optional. These few that are required are mark

### General settings

Setting|Default|Description
-------|-------|-----------
batchMaxItems|1000|Max number of events to put into a single batch before sending it to Loki
batchMaxBytes|4194304|Max number of bytes a single batch can contain (as counted by Loki). This value should not be greater than `server.grpc_server_max_recv_msg_size` in your Loki config
batchTimeoutMs|60000|Max time in milliseconds to keep a batch before sending it to Loki, even if max items/bytes limits for this batch are not reached
sendQueueMaxBytes|41943040|Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped
maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried only in case of ConnectException or 503 status from Loki. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates
retryTimeoutMs|60000|Time in milliseconds to wait before the next attempt to re-send the failed batch
internalQueuesCheckTimeoutMs|25|A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage
useDirectBuffers|true|Use off-heap memory for storing intermediate data
drainOnStop|true|If true, the appender will try to send all the remaining events on shutdown, so the proper shutdown procedure might take longer. Otherwise, the appender will drop the unsent events
metricsEnabled|false|If true, the appender will report its metrics using Micrometer
verbose|false|If true, the appender will print its own debug logs to stderr
| Setting | Default | Description |
|------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| batchMaxItems | 1000 | Max number of events to put into a single batch before sending it to Loki |
| batchMaxBytes | 4194304 | Max number of bytes a single batch can contain (as counted by Loki). This value should not be greater than `server.grpc_server_max_recv_msg_size` in your Loki config |
| batchTimeoutMs | 60000 | Max time in milliseconds to keep a batch before sending it to Loki, even if max items/bytes limits for this batch are not reached |
| sendQueueMaxBytes | 41943040 | Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped |
| maxRetries | 2 | Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried only in case of ConnectException or 503 status from Loki. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates |
| retryTimeoutMs | 60000 | Time in milliseconds to wait before the next attempt to re-send the failed batch |
| dropRateLimitedBatches | false | Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). This reduces impacts on batches from other tenants, which could end up being delayed or dropped due to backoff. |
| internalQueuesCheckTimeoutMs | 25 | A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage |
| useDirectBuffers | true | Use off-heap memory for storing intermediate data |
| drainOnStop | true | If true, the appender will try to send all the remaining events on shutdown, so the proper shutdown procedure might take longer. Otherwise, the appender will drop the unsent events |
| metricsEnabled | false | If true, the appender will report its metrics using Micrometer |
| verbose | false | If true, the appender will print its own debug logs to stderr |

### HTTP settings

Expand Down Expand Up @@ -113,7 +114,7 @@ Check the corresponding [configuration section](apacheclient) for details.

In this example we would like to change max batch size to 100 records, batch timeout to 10s, label key-value separator to `:`,
and sort log records by time before sending them to Loki.
Also we would like to use [Apache HTTP sender](apacheclient) with a pool of 10 connections and [Protobuf API](protobuf).
Also, we would like to use [Apache HTTP sender](apacheclient) with a pool of 10 connections and [Protobuf API](protobuf).
Finally, we want to see Loki4j debug output.

```xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

public final class AsyncBufferPipeline {

private static final int TOO_MANY_REQUEST_HTTP_STATUS = 429;

private static final Comparator<LogRecord> compareByTime = (e1, e2) -> {
var tsCmp = Long.compare(e1.timestampMs, e2.timestampMs);
return tsCmp == 0 ? Integer.compare(e1.nanos, e2.nanos) : tsCmp;
Expand Down Expand Up @@ -70,6 +72,13 @@ public final class AsyncBufferPipeline {

private final long retryTimeoutMs;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
* due to backoff.
*/
private final boolean dropRateLimitedBatches;

private volatile boolean started = false;

private AtomicBoolean acceptNewEvents = new AtomicBoolean(true);
Expand Down Expand Up @@ -105,6 +114,7 @@ public AsyncBufferPipeline(PipelineConfig conf) {
drainOnStop = conf.drainOnStop;
maxRetries = conf.maxRetries;
retryTimeoutMs = conf.retryTimeoutMs;
dropRateLimitedBatches = conf.dropRateLimitedBatches;
parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.internalQueuesCheckTimeoutMs);
this.log = conf.internalLoggingFactory.apply(this);
this.metrics = conf.metricsEnabled ? new Loki4jMetrics(conf.name, () -> unsentEvents.get()) : null;
Expand Down Expand Up @@ -231,7 +241,7 @@ private void encodeStep(LogRecordBatch batch) throws InterruptedException {

writeBatch(batch, writer);
if (writer.isEmpty()) return;
while(started &&
while(started &&
!sendQueue.offer(
batch.batchId(),
batch.size(),
Expand Down Expand Up @@ -351,7 +361,11 @@ private Supplier<String> sendErrorReasonProvider(Exception e, LokiResponse r) {
}

private boolean checkIfEligibleForRetry(Exception e, LokiResponse r) {
return e instanceof ConnectException || (r != null && r.status == 503);
return e instanceof ConnectException || (r != null && (r.status == 503 || shouldRetryRateLimitedBatches(r.status)));
}

private boolean shouldRetryRateLimitedBatches(int status) {
return status == TOO_MANY_REQUEST_HTTP_STATUS && !dropRateLimitedBatches;
}

private boolean sleep(long timeoutMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.github.loki4j.client.writer.Writer;

/**
* Configuration properties for Loki4j pipeline
* Configuration properties for Loki4j pipeline.
*/
public class PipelineConfig {

Expand Down Expand Up @@ -42,25 +42,25 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
}

/**
* Name of this pipeline
* Name of this pipeline.
*/
public final String name;

/**
* Max number of events to put into a single batch before sending it to Loki
* Max number of events to put into a single batch before sending it to Loki.
*/
public final int batchMaxItems;

/**
* Max number of bytes a single batch can contain (as counted by Loki).
* This value should not be greater than server.grpc_server_max_recv_msg_size
* in your Loki config
* in your Loki config.
*/
public final int batchMaxBytes;

/**
* Max time in milliseconds to keep a batch before sending it to Loki, even if
* max items/bytes limits for this batch are not reached
* max items/bytes limits for this batch are not reached.
*/
public final long batchTimeoutMs;

Expand All @@ -79,7 +79,7 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {

/**
* Max number of bytes to keep in the send queue.
* When the queue is full, incoming log events are dropped
* When the queue is full, incoming log events are dropped.
*/
public final long sendQueueMaxBytes;

Expand All @@ -95,55 +95,63 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
*/
public final long retryTimeoutMs;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
* due to backoff.
*/
public final boolean dropRateLimitedBatches;

/**
* A timeout for Loki4j threads to sleep if encode or send queues are empty.
* Decreasing this value means lower latency at cost of higher CPU usage.
*/
public final long internalQueuesCheckTimeoutMs;

/**
* Use off-heap memory for storing intermediate data
* Use off-heap memory for storing intermediate data.
*/
public final boolean useDirectBuffers;

/**
* If true, the pipeline will try to send all the remaining events on shutdown,
* so the proper shutdown procedure might take longer.
* Otherwise, the pipeline will drop the unsent events
* Otherwise, the pipeline will drop the unsent events.
*/
public final boolean drainOnStop;

/**
* If true, the pipeline will report its metrics using Micrometer
* If true, the pipeline will report its metrics using Micrometer.
*/
public final boolean metricsEnabled;

/**
* A factory for Writer
* A factory for Writer.
*/
public final WriterFactory writerFactory;

/**
* Configuration properties for HTTP clients
* Configuration properties for HTTP clients.
*/
public final HttpConfig httpConfig;

/**
* A factory for HTTP client for sending logs to Loki.
* Argument is a config required for constructing an HTTP client
* Argument is a config required for constructing an HTTP client.
*/
public final Function<HttpConfig, Loki4jHttpClient> httpClientFactory;

/**
* A factory for an internal logger.
* Argument is a source class to report log messages from
* Argument is a source class to report log messages from.
*/
public final Function<Object, Loki4jLogger> internalLoggingFactory;

public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long batchTimeoutMs, boolean sortByTime,
boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs, long internalQueuesCheckTimeoutMs,
boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled, WriterFactory writerFactory, HttpConfig httpConfig,
Function<HttpConfig, Loki4jHttpClient> httpClientFactory, Function<Object, Loki4jLogger> internalLoggingFactory) {
boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs, boolean dropRateLimitedBatches,
long internalQueuesCheckTimeoutMs, boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled,
WriterFactory writerFactory, HttpConfig httpConfig, Function<HttpConfig, Loki4jHttpClient> httpClientFactory,
Function<Object, Loki4jLogger> internalLoggingFactory) {
this.name = name;
this.batchMaxItems = batchMaxItems;
this.batchMaxBytes = batchMaxBytes;
Expand All @@ -153,6 +161,7 @@ public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long ba
this.sendQueueMaxBytes = sendQueueMaxBytes;
this.maxRetries = maxRetries;
this.retryTimeoutMs = retryTimeoutMs;
this.dropRateLimitedBatches = dropRateLimitedBatches;
this.internalQueuesCheckTimeoutMs = internalQueuesCheckTimeoutMs;
this.useDirectBuffers = useDirectBuffers;
this.drainOnStop = drainOnStop;
Expand All @@ -178,6 +187,7 @@ public static class Builder {
private long sendQueueMaxBytes = batchMaxBytes * 10;
private int maxRetries = 2;
private long retryTimeoutMs = 60 * 1000;
private boolean dropRateLimitedBatches = false;
private long internalQueuesCheckTimeoutMs = 25;
private boolean useDirectBuffers = true;
private boolean drainOnStop = true;
Expand All @@ -198,6 +208,7 @@ public PipelineConfig build() {
sendQueueMaxBytes,
maxRetries,
retryTimeoutMs,
dropRateLimitedBatches,
internalQueuesCheckTimeoutMs,
useDirectBuffers,
drainOnStop,
Expand Down Expand Up @@ -253,6 +264,11 @@ public Builder setRetryTimeoutMs(long retryTimeoutMs) {
return this;
}

public Builder setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
this.dropRateLimitedBatches = dropRateLimitedBatches;
return this;
}

public Builder setInternalQueuesCheckTimeoutMs(long internalQueuesCheckTimeoutMs) {
this.internalQueuesCheckTimeoutMs = internalQueuesCheckTimeoutMs;
return this;
Expand Down Expand Up @@ -296,7 +312,7 @@ public Builder setInternalLoggingFactory(Function<Object, Loki4jLogger> internal
}

/**
* A factory for Writer
* A factory for Writer.
*/
public static class WriterFactory {

Expand All @@ -308,7 +324,7 @@ public static class WriterFactory {
public final BiFunction<Integer, ByteBufferFactory, Writer> factory;

/**
* HTTP content-type generated by this Writer
* HTTP content-type generated by this Writer.
*/
public final String contentType;

Expand Down
Loading

0 comments on commit 6396c9d

Please sign in to comment.