Skip to content

Commit

Permalink
Add support for retries on 429 rate limited requests (#193)
Browse files Browse the repository at this point in the history
Adds an option to enable/disable retries on rate limit errors:
`dropRateLimitedBatches`. It's enabled by default to keep consistency
with Promtail options. Also because Loki scales less aggressively and
relies on retries when the load is spiky.

Resolves: #192
  • Loading branch information
acm19 committed Nov 30, 2023
1 parent 11a20a3 commit b66ec83
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 78 deletions.
67 changes: 34 additions & 33 deletions docs/docus/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,45 @@ Most Loki4j appender settings are optional. These few that are required are mark

### General settings

Setting|Default|Description
-------|-------|-----------
batchMaxItems|1000|Max number of events to put into a single batch before sending it to Loki
batchMaxBytes|4194304|Max number of bytes a single batch can contain (as counted by Loki). This value should not be greater than `server.grpc_server_max_recv_msg_size` in your Loki config
batchTimeoutMs|60000|Max time in milliseconds to keep a batch before sending it to Loki, even if max items/bytes limits for this batch are not reached
sendQueueMaxBytes|41943040|Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped
maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried only in case of ConnectException or 503 status from Loki. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates
retryTimeoutMs|60000|Time in milliseconds to wait before the next attempt to re-send the failed batch
internalQueuesCheckTimeoutMs|25|A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage
useDirectBuffers|true|Use off-heap memory for storing intermediate data
drainOnStop|true|If true, the appender will try to send all the remaining events on shutdown, so the proper shutdown procedure might take longer. Otherwise, the appender will drop the unsent events
metricsEnabled|false|If true, the appender will report its metrics using Micrometer
verbose|false|If true, the appender will print its own debug logs to stderr
|Setting|Default|Description|
|-------|-------|-----------|
|batchMaxItems|1000|Max number of events to put into a single batch before sending it to Loki|
|batchMaxBytes|4194304|Max number of bytes a single batch can contain (as counted by Loki). This value should not be greater than `server.grpc_server_max_recv_msg_size` in your Loki config|
|batchTimeoutMs|60000|Max time in milliseconds to keep a batch before sending it to Loki, even if max items/bytes limits for this batch are not reached|
|sendQueueMaxBytes|41943040|Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped|
|maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried in case of `ConnectException` or `503` status from Loki. Also, if `dropRateLimitedBatches=false` and status code is `429`, the request will be retried. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates|
|retryTimeoutMs|60000|Time in milliseconds to wait before the next attempt to re-send the failed batch|
|dropRateLimitedBatches|false|Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). This reduces impacts on batches from other tenants, which could end up being delayed or dropped due to backoff.|
|internalQueuesCheckTimeoutMs|25|A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage|
|useDirectBuffers|true|Use off-heap memory for storing intermediate data|
|drainOnStop|true|If true, the appender will try to send all the remaining events on shutdown, so the proper shutdown procedure might take longer. Otherwise, the appender will drop the unsent events|
|metricsEnabled|false|If true, the appender will report its metrics using Micrometer|
|verbose|false|If true, the appender will print its own debug logs to stderr|

### HTTP settings

Setting|Default|Description
-------|-------|-----------
http.url||**Required**. Loki endpoint to be used for sending batches
http.connectionTimeoutMs|30000|Time in milliseconds to wait for HTTP connection to Loki to be established before reporting an error
http.requestTimeoutMs|5000|Time in milliseconds to wait for HTTP request to Loki to be responded before reporting an error
http.auth.username||Username to use for basic auth
http.auth.password||Password to use for basic auth
http.tenantId||Tenant identifier. It is required only for sending logs directly to Loki operating in multi-tenant mode. Otherwise this setting has no effect
|Setting|Default|Description|
|-------|-------|-----------|
|http.url||**Required**. Loki endpoint to be used for sending batches|
|http.connectionTimeoutMs|30000|Time in milliseconds to wait for HTTP connection to Loki to be established before reporting an error|
|http.requestTimeoutMs|5000|Time in milliseconds to wait for HTTP request to Loki to be responded before reporting an error|
|http.auth.username||Username to use for basic auth|
|http.auth.password||Password to use for basic auth|
|http.tenantId||Tenant identifier. It is required only for sending logs directly to Loki operating in multi-tenant mode. Otherwise this setting has no effect|

### Format settings

Setting|Default|Description
-------|-------|-----------
format.label.pattern||**Required**. Logback pattern to use for log record's label
format.label.pairSeparator|,|Character sequence to use as a separator between labels. If starts with "regex:" prefix, the remainder is applied as a regular expression separator. Otherwise, the provided char sequence is used as a separator literally
format.label.keyValueSeparator|=|Character to use as a separator between label's name and its value
format.label.readMarkers|false|If true, Loki4j scans each log record for attached LabelMarker to add its values to record's labels
format.label.nopex|true|If true, exception info is not added to labels. If false, you should take care of proper formatting
format.label.streamCache|UnboundAtomicMapCache|An implementation of a Stream cache to use
format.message.pattern||**Required**. Logback pattern to use for log record's message
format.staticLabels|false|If you use only one label for all log records, you can set this flag to true and save some CPU time on grouping records by label
format.sortByTime|false|If true, log records in batch are sorted by timestamp. If false, records will be sent to Loki in arrival order. Enable this if you see 'entry out of order' error from Loki
|Setting|Default|Description|
|-------|-------|-----------|
|format.label.pattern||**Required**. Logback pattern to use for log record's label|
|format.label.pairSeparator|,|Character sequence to use as a separator between labels. If starts with "regex:" prefix, the remainder is applied as a regular expression separator. Otherwise, the provided char sequence is used as a separator literally|
|format.label.keyValueSeparator|=|Character to use as a separator between label's name and its value|
|format.label.readMarkers|false|If true, Loki4j scans each log record for attached LabelMarker to add its values to record's labels|
|format.label.nopex|true|If true, exception info is not added to labels. If false, you should take care of proper formatting|
|format.label.streamCache|UnboundAtomicMapCache|An implementation of a Stream cache to use|
|format.message.pattern||**Required**. Logback pattern to use for log record's message|
|format.staticLabels|false|If you use only one label for all log records, you can set this flag to true and save some CPU time on grouping records by label|
|format.sortByTime|false|If true, log records in batch are sorted by timestamp. If false, records will be sent to Loki in arrival order. Enable this if you see 'entry out of order' error from Loki|

## Examples

Expand Down Expand Up @@ -113,7 +114,7 @@ Check the corresponding [configuration section](apacheclient) for details.

In this example we would like to change max batch size to 100 records, batch timeout to 10s, label key-value separator to `:`,
and sort log records by time before sending them to Loki.
Also we would like to use [Apache HTTP sender](apacheclient) with a pool of 10 connections and [Protobuf API](protobuf).
Also, we would like to use [Apache HTTP sender](apacheclient) with a pool of 10 connections and [Protobuf API](protobuf).
Finally, we want to see Loki4j debug output.

```xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

public final class AsyncBufferPipeline {

private static final int TOO_MANY_REQUEST_HTTP_STATUS = 429;

private static final Comparator<LogRecord> compareByTime = (e1, e2) -> {
var tsCmp = Long.compare(e1.timestampMs, e2.timestampMs);
return tsCmp == 0 ? Integer.compare(e1.nanos, e2.nanos) : tsCmp;
Expand Down Expand Up @@ -70,6 +72,13 @@ public final class AsyncBufferPipeline {

private final long retryTimeoutMs;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
* due to backoff.
*/
private final boolean dropRateLimitedBatches;

private volatile boolean started = false;

private AtomicBoolean acceptNewEvents = new AtomicBoolean(true);
Expand Down Expand Up @@ -105,6 +114,7 @@ public AsyncBufferPipeline(PipelineConfig conf) {
drainOnStop = conf.drainOnStop;
maxRetries = conf.maxRetries;
retryTimeoutMs = conf.retryTimeoutMs;
dropRateLimitedBatches = conf.dropRateLimitedBatches;
parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.internalQueuesCheckTimeoutMs);
this.log = conf.internalLoggingFactory.apply(this);
this.metrics = conf.metricsEnabled ? new Loki4jMetrics(conf.name, () -> unsentEvents.get()) : null;
Expand Down Expand Up @@ -231,7 +241,7 @@ private void encodeStep(LogRecordBatch batch) throws InterruptedException {

writeBatch(batch, writer);
if (writer.isEmpty()) return;
while(started &&
while(started &&
!sendQueue.offer(
batch.batchId(),
batch.size(),
Expand Down Expand Up @@ -351,7 +361,11 @@ private Supplier<String> sendErrorReasonProvider(Exception e, LokiResponse r) {
}

private boolean checkIfEligibleForRetry(Exception e, LokiResponse r) {
return e instanceof ConnectException || (r != null && r.status == 503);
return e instanceof ConnectException || (r != null && (r.status == 503 || shouldRetryRateLimitedBatches(r.status)));
}

private boolean shouldRetryRateLimitedBatches(int status) {
return status == TOO_MANY_REQUEST_HTTP_STATUS && !dropRateLimitedBatches;
}

private boolean sleep(long timeoutMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.github.loki4j.client.writer.Writer;

/**
* Configuration properties for Loki4j pipeline
* Configuration properties for Loki4j pipeline.
*/
public class PipelineConfig {

Expand Down Expand Up @@ -42,25 +42,25 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
}

/**
* Name of this pipeline
* Name of this pipeline.
*/
public final String name;

/**
* Max number of events to put into a single batch before sending it to Loki
* Max number of events to put into a single batch before sending it to Loki.
*/
public final int batchMaxItems;

/**
* Max number of bytes a single batch can contain (as counted by Loki).
* This value should not be greater than server.grpc_server_max_recv_msg_size
* in your Loki config
* in your Loki config.
*/
public final int batchMaxBytes;

/**
* Max time in milliseconds to keep a batch before sending it to Loki, even if
* max items/bytes limits for this batch are not reached
* max items/bytes limits for this batch are not reached.
*/
public final long batchTimeoutMs;

Expand All @@ -79,7 +79,7 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {

/**
* Max number of bytes to keep in the send queue.
* When the queue is full, incoming log events are dropped
* When the queue is full, incoming log events are dropped.
*/
public final long sendQueueMaxBytes;

Expand All @@ -95,55 +95,63 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) {
*/
public final long retryTimeoutMs;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
* due to backoff.
*/
public final boolean dropRateLimitedBatches;

/**
* A timeout for Loki4j threads to sleep if encode or send queues are empty.
* Decreasing this value means lower latency at cost of higher CPU usage.
*/
public final long internalQueuesCheckTimeoutMs;

/**
* Use off-heap memory for storing intermediate data
* Use off-heap memory for storing intermediate data.
*/
public final boolean useDirectBuffers;

/**
* If true, the pipeline will try to send all the remaining events on shutdown,
* so the proper shutdown procedure might take longer.
* Otherwise, the pipeline will drop the unsent events
* Otherwise, the pipeline will drop the unsent events.
*/
public final boolean drainOnStop;

/**
* If true, the pipeline will report its metrics using Micrometer
* If true, the pipeline will report its metrics using Micrometer.
*/
public final boolean metricsEnabled;

/**
* A factory for Writer
* A factory for Writer.
*/
public final WriterFactory writerFactory;

/**
* Configuration properties for HTTP clients
* Configuration properties for HTTP clients.
*/
public final HttpConfig httpConfig;

/**
* A factory for HTTP client for sending logs to Loki.
* Argument is a config required for constructing an HTTP client
* Argument is a config required for constructing an HTTP client.
*/
public final Function<HttpConfig, Loki4jHttpClient> httpClientFactory;

/**
* A factory for an internal logger.
* Argument is a source class to report log messages from
* Argument is a source class to report log messages from.
*/
public final Function<Object, Loki4jLogger> internalLoggingFactory;

public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long batchTimeoutMs, boolean sortByTime,
boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs, long internalQueuesCheckTimeoutMs,
boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled, WriterFactory writerFactory, HttpConfig httpConfig,
Function<HttpConfig, Loki4jHttpClient> httpClientFactory, Function<Object, Loki4jLogger> internalLoggingFactory) {
boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs, boolean dropRateLimitedBatches,
long internalQueuesCheckTimeoutMs, boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled,
WriterFactory writerFactory, HttpConfig httpConfig, Function<HttpConfig, Loki4jHttpClient> httpClientFactory,
Function<Object, Loki4jLogger> internalLoggingFactory) {
this.name = name;
this.batchMaxItems = batchMaxItems;
this.batchMaxBytes = batchMaxBytes;
Expand All @@ -153,6 +161,7 @@ public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long ba
this.sendQueueMaxBytes = sendQueueMaxBytes;
this.maxRetries = maxRetries;
this.retryTimeoutMs = retryTimeoutMs;
this.dropRateLimitedBatches = dropRateLimitedBatches;
this.internalQueuesCheckTimeoutMs = internalQueuesCheckTimeoutMs;
this.useDirectBuffers = useDirectBuffers;
this.drainOnStop = drainOnStop;
Expand All @@ -178,6 +187,7 @@ public static class Builder {
private long sendQueueMaxBytes = batchMaxBytes * 10;
private int maxRetries = 2;
private long retryTimeoutMs = 60 * 1000;
private boolean dropRateLimitedBatches = false;
private long internalQueuesCheckTimeoutMs = 25;
private boolean useDirectBuffers = true;
private boolean drainOnStop = true;
Expand All @@ -198,6 +208,7 @@ public PipelineConfig build() {
sendQueueMaxBytes,
maxRetries,
retryTimeoutMs,
dropRateLimitedBatches,
internalQueuesCheckTimeoutMs,
useDirectBuffers,
drainOnStop,
Expand Down Expand Up @@ -253,6 +264,11 @@ public Builder setRetryTimeoutMs(long retryTimeoutMs) {
return this;
}

public Builder setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
this.dropRateLimitedBatches = dropRateLimitedBatches;
return this;
}

public Builder setInternalQueuesCheckTimeoutMs(long internalQueuesCheckTimeoutMs) {
this.internalQueuesCheckTimeoutMs = internalQueuesCheckTimeoutMs;
return this;
Expand Down Expand Up @@ -296,7 +312,7 @@ public Builder setInternalLoggingFactory(Function<Object, Loki4jLogger> internal
}

/**
* A factory for Writer
* A factory for Writer.
*/
public static class WriterFactory {

Expand All @@ -308,7 +324,7 @@ public static class WriterFactory {
public final BiFunction<Integer, ByteBufferFactory, Writer> factory;

/**
* HTTP content-type generated by this Writer
* HTTP content-type generated by this Writer.
*/
public final String contentType;

Expand Down
Loading

0 comments on commit b66ec83

Please sign in to comment.