diff --git a/docs/docus/docs/configuration.md b/docs/docus/docs/configuration.md index 924d67e0..5e705fff 100644 --- a/docs/docus/docs/configuration.md +++ b/docs/docus/docs/configuration.md @@ -24,19 +24,20 @@ Most Loki4j appender settings are optional. These few that are required are mark ### General settings -Setting|Default|Description --------|-------|----------- -batchMaxItems|1000|Max number of events to put into a single batch before sending it to Loki -batchMaxBytes|4194304|Max number of bytes a single batch can contain (as counted by Loki). This value should not be greater than `server.grpc_server_max_recv_msg_size` in your Loki config -batchTimeoutMs|60000|Max time in milliseconds to keep a batch before sending it to Loki, even if max items/bytes limits for this batch are not reached -sendQueueMaxBytes|41943040|Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped -maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried only in case of ConnectException or 503 status from Loki. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates -retryTimeoutMs|60000|Time in milliseconds to wait before the next attempt to re-send the failed batch -internalQueuesCheckTimeoutMs|25|A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage -useDirectBuffers|true|Use off-heap memory for storing intermediate data -drainOnStop|true|If true, the appender will try to send all the remaining events on shutdown, so the proper shutdown procedure might take longer. Otherwise, the appender will drop the unsent events -metricsEnabled|false|If true, the appender will report its metrics using Micrometer -verbose|false|If true, the appender will print its own debug logs to stderr +| Setting | Default | Description | +|------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| batchMaxItems | 1000 | Max number of events to put into a single batch before sending it to Loki | +| batchMaxBytes | 4194304 | Max number of bytes a single batch can contain (as counted by Loki). This value should not be greater than `server.grpc_server_max_recv_msg_size` in your Loki config | +| batchTimeoutMs | 60000 | Max time in milliseconds to keep a batch before sending it to Loki, even if max items/bytes limits for this batch are not reached | +| sendQueueMaxBytes | 41943040 | Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped | +| maxRetries | 2 | Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried only in case of ConnectException or 503 status from Loki. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates | +| retryTimeoutMs | 60000 | Time in milliseconds to wait before the next attempt to re-send the failed batch | +| dropRateLimitedBatches | false | Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). This reduces impacts on batches from other tenants, which could end up being delayed or dropped due to backoff. | +| internalQueuesCheckTimeoutMs | 25 | A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage | +| useDirectBuffers | true | Use off-heap memory for storing intermediate data | +| drainOnStop | true | If true, the appender will try to send all the remaining events on shutdown, so the proper shutdown procedure might take longer. Otherwise, the appender will drop the unsent events | +| metricsEnabled | false | If true, the appender will report its metrics using Micrometer | +| verbose | false | If true, the appender will print its own debug logs to stderr | ### HTTP settings @@ -113,7 +114,7 @@ Check the corresponding [configuration section](apacheclient) for details. In this example we would like to change max batch size to 100 records, batch timeout to 10s, label key-value separator to `:`, and sort log records by time before sending them to Loki. -Also we would like to use [Apache HTTP sender](apacheclient) with a pool of 10 connections and [Protobuf API](protobuf). +Also, we would like to use [Apache HTTP sender](apacheclient) with a pool of 10 connections and [Protobuf API](protobuf). Finally, we want to see Loki4j debug output. ```xml diff --git a/loki-client/src/main/java/com/github/loki4j/client/pipeline/AsyncBufferPipeline.java b/loki-client/src/main/java/com/github/loki4j/client/pipeline/AsyncBufferPipeline.java index e4fad79b..720334e2 100644 --- a/loki-client/src/main/java/com/github/loki4j/client/pipeline/AsyncBufferPipeline.java +++ b/loki-client/src/main/java/com/github/loki4j/client/pipeline/AsyncBufferPipeline.java @@ -32,6 +32,8 @@ public final class AsyncBufferPipeline { + private static final int TOO_MANY_REQUEST_HTTP_STATUS = 429; + private static final Comparator compareByTime = (e1, e2) -> { var tsCmp = Long.compare(e1.timestampMs, e2.timestampMs); return tsCmp == 0 ? Integer.compare(e1.nanos, e2.nanos) : tsCmp; @@ -70,6 +72,13 @@ public final class AsyncBufferPipeline { private final long retryTimeoutMs; + /** + * Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). + * This reduces impacts on batches from other tenants, which could end up being delayed or dropped + * due to backoff. + */ + private final boolean dropRateLimitedBatches; + private volatile boolean started = false; private AtomicBoolean acceptNewEvents = new AtomicBoolean(true); @@ -105,6 +114,7 @@ public AsyncBufferPipeline(PipelineConfig conf) { drainOnStop = conf.drainOnStop; maxRetries = conf.maxRetries; retryTimeoutMs = conf.retryTimeoutMs; + dropRateLimitedBatches = conf.dropRateLimitedBatches; parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.internalQueuesCheckTimeoutMs); this.log = conf.internalLoggingFactory.apply(this); this.metrics = conf.metricsEnabled ? new Loki4jMetrics(conf.name, () -> unsentEvents.get()) : null; @@ -231,7 +241,7 @@ private void encodeStep(LogRecordBatch batch) throws InterruptedException { writeBatch(batch, writer); if (writer.isEmpty()) return; - while(started && + while(started && !sendQueue.offer( batch.batchId(), batch.size(), @@ -351,7 +361,11 @@ private Supplier sendErrorReasonProvider(Exception e, LokiResponse r) { } private boolean checkIfEligibleForRetry(Exception e, LokiResponse r) { - return e instanceof ConnectException || (r != null && r.status == 503); + return e instanceof ConnectException || (r != null && (r.status == 503 || shouldRetryRateLimitedBatches(r.status))); + } + + private boolean shouldRetryRateLimitedBatches(int status) { + return status == TOO_MANY_REQUEST_HTTP_STATUS && !dropRateLimitedBatches; } private boolean sleep(long timeoutMs) { diff --git a/loki-client/src/main/java/com/github/loki4j/client/pipeline/PipelineConfig.java b/loki-client/src/main/java/com/github/loki4j/client/pipeline/PipelineConfig.java index 4f3ee6d3..7a8eb173 100644 --- a/loki-client/src/main/java/com/github/loki4j/client/pipeline/PipelineConfig.java +++ b/loki-client/src/main/java/com/github/loki4j/client/pipeline/PipelineConfig.java @@ -14,7 +14,7 @@ import com.github.loki4j.client.writer.Writer; /** - * Configuration properties for Loki4j pipeline + * Configuration properties for Loki4j pipeline. */ public class PipelineConfig { @@ -42,25 +42,25 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) { } /** - * Name of this pipeline + * Name of this pipeline. */ public final String name; /** - * Max number of events to put into a single batch before sending it to Loki + * Max number of events to put into a single batch before sending it to Loki. */ public final int batchMaxItems; /** * Max number of bytes a single batch can contain (as counted by Loki). * This value should not be greater than server.grpc_server_max_recv_msg_size - * in your Loki config + * in your Loki config. */ public final int batchMaxBytes; /** * Max time in milliseconds to keep a batch before sending it to Loki, even if - * max items/bytes limits for this batch are not reached + * max items/bytes limits for this batch are not reached. */ public final long batchTimeoutMs; @@ -79,7 +79,7 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) { /** * Max number of bytes to keep in the send queue. - * When the queue is full, incoming log events are dropped + * When the queue is full, incoming log events are dropped. */ public final long sendQueueMaxBytes; @@ -95,6 +95,13 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) { */ public final long retryTimeoutMs; + /** + * Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). + * This reduces impacts on batches from other tenants, which could end up being delayed or dropped + * due to backoff. + */ + public final boolean dropRateLimitedBatches; + /** * A timeout for Loki4j threads to sleep if encode or send queues are empty. * Decreasing this value means lower latency at cost of higher CPU usage. @@ -102,48 +109,49 @@ public static HttpConfig.Builder java(long innerThreadsExpirationMs) { public final long internalQueuesCheckTimeoutMs; /** - * Use off-heap memory for storing intermediate data + * Use off-heap memory for storing intermediate data. */ public final boolean useDirectBuffers; /** * If true, the pipeline will try to send all the remaining events on shutdown, * so the proper shutdown procedure might take longer. - * Otherwise, the pipeline will drop the unsent events + * Otherwise, the pipeline will drop the unsent events. */ public final boolean drainOnStop; /** - * If true, the pipeline will report its metrics using Micrometer + * If true, the pipeline will report its metrics using Micrometer. */ public final boolean metricsEnabled; /** - * A factory for Writer + * A factory for Writer. */ public final WriterFactory writerFactory; /** - * Configuration properties for HTTP clients + * Configuration properties for HTTP clients. */ public final HttpConfig httpConfig; /** * A factory for HTTP client for sending logs to Loki. - * Argument is a config required for constructing an HTTP client + * Argument is a config required for constructing an HTTP client. */ public final Function httpClientFactory; /** * A factory for an internal logger. - * Argument is a source class to report log messages from + * Argument is a source class to report log messages from. */ public final Function internalLoggingFactory; public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long batchTimeoutMs, boolean sortByTime, - boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs, long internalQueuesCheckTimeoutMs, - boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled, WriterFactory writerFactory, HttpConfig httpConfig, - Function httpClientFactory, Function internalLoggingFactory) { + boolean staticLabels, long sendQueueMaxBytes, int maxRetries, long retryTimeoutMs, boolean dropRateLimitedBatches, + long internalQueuesCheckTimeoutMs, boolean useDirectBuffers, boolean drainOnStop, boolean metricsEnabled, + WriterFactory writerFactory, HttpConfig httpConfig, Function httpClientFactory, + Function internalLoggingFactory) { this.name = name; this.batchMaxItems = batchMaxItems; this.batchMaxBytes = batchMaxBytes; @@ -153,6 +161,7 @@ public PipelineConfig(String name, int batchMaxItems, int batchMaxBytes, long ba this.sendQueueMaxBytes = sendQueueMaxBytes; this.maxRetries = maxRetries; this.retryTimeoutMs = retryTimeoutMs; + this.dropRateLimitedBatches = dropRateLimitedBatches; this.internalQueuesCheckTimeoutMs = internalQueuesCheckTimeoutMs; this.useDirectBuffers = useDirectBuffers; this.drainOnStop = drainOnStop; @@ -178,6 +187,7 @@ public static class Builder { private long sendQueueMaxBytes = batchMaxBytes * 10; private int maxRetries = 2; private long retryTimeoutMs = 60 * 1000; + private boolean dropRateLimitedBatches = false; private long internalQueuesCheckTimeoutMs = 25; private boolean useDirectBuffers = true; private boolean drainOnStop = true; @@ -198,6 +208,7 @@ public PipelineConfig build() { sendQueueMaxBytes, maxRetries, retryTimeoutMs, + dropRateLimitedBatches, internalQueuesCheckTimeoutMs, useDirectBuffers, drainOnStop, @@ -253,6 +264,11 @@ public Builder setRetryTimeoutMs(long retryTimeoutMs) { return this; } + public Builder setDropRateLimitedBatches(boolean dropRateLimitedBatches) { + this.dropRateLimitedBatches = dropRateLimitedBatches; + return this; + } + public Builder setInternalQueuesCheckTimeoutMs(long internalQueuesCheckTimeoutMs) { this.internalQueuesCheckTimeoutMs = internalQueuesCheckTimeoutMs; return this; @@ -296,7 +312,7 @@ public Builder setInternalLoggingFactory(Function internal } /** - * A factory for Writer + * A factory for Writer. */ public static class WriterFactory { @@ -308,7 +324,7 @@ public static class WriterFactory { public final BiFunction factory; /** - * HTTP content-type generated by this Writer + * HTTP content-type generated by this Writer. */ public final String contentType; diff --git a/loki-logback-appender/src/main/java/com/github/loki4j/logback/Loki4jAppender.java b/loki-logback-appender/src/main/java/com/github/loki4j/logback/Loki4jAppender.java index 1c425eb9..6591eeb9 100644 --- a/loki-logback-appender/src/main/java/com/github/loki4j/logback/Loki4jAppender.java +++ b/loki-logback-appender/src/main/java/com/github/loki4j/logback/Loki4jAppender.java @@ -11,29 +11,29 @@ import ch.qos.logback.core.status.Status; /** - * Main appender that provides functionality for sending log record batches to Loki + * Main appender that provides functionality for sending log record batches to Loki. */ public class Loki4jAppender extends UnsynchronizedAppenderBase { /** - * Max number of events to put into a single batch before sending it to Loki + * Max number of events to put into a single batch before sending it to Loki. */ private int batchMaxItems = 1000; /** * Max number of bytes a single batch can contain (as counted by Loki). * This value should not be greater than server.grpc_server_max_recv_msg_size - * in your Loki config + * in your Loki config. */ private int batchMaxBytes = 4 * 1024 * 1024; /** * Max time in milliseconds to keep a batch before sending it to Loki, even if - * max items/bytes limits for this batch are not reached + * max items/bytes limits for this batch are not reached. */ private long batchTimeoutMs = 60 * 1000; /** * Max number of bytes to keep in the send queue. - * When the queue is full, incoming log events are dropped + * When the queue is full, incoming log events are dropped. */ private long sendQueueMaxBytes = batchMaxBytes * 10; @@ -49,6 +49,13 @@ public class Loki4jAppender extends UnsynchronizedAppenderBase { */ private long retryTimeoutMs = 60 * 1000; + /** + * Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). + * This reduces impacts on batches from other tenants, which could end up being delayed or dropped + * due to backoff. + */ + private boolean dropRateLimitedBatches = false; + /** * A timeout for Loki4j threads to sleep if encode or send queues are empty. * Decreasing this value means lower latency at cost of higher CPU usage. @@ -56,44 +63,44 @@ public class Loki4jAppender extends UnsynchronizedAppenderBase { private long internalQueuesCheckTimeoutMs = 25; /** - * If true, the appender will print its own debug logs to stderr + * If true, the appender will print its own debug logs to stderr. */ private boolean verbose = false; /** - * If true, the appender will report its metrics using Micrometer + * If true, the appender will report its metrics using Micrometer. */ private boolean metricsEnabled = false; /** * If true, the appender will try to send all the remaining events on shutdown, * so the proper shutdown procedure might take longer. - * Otherwise, the appender will drop the unsent events + * Otherwise, the appender will drop the unsent events. */ private boolean drainOnStop = true; /** - * Use off-heap memory for storing intermediate data + * Use off-heap memory for storing intermediate data. */ private boolean useDirectBuffers = true; /** - * An encoder to use for converting log record batches to format acceptable by Loki + * An encoder to use for converting log record batches to format acceptable by Loki. */ private Loki4jEncoder encoder; /** - * A configurator for HTTP sender + * A configurator for HTTP sender. */ private HttpSender sender; /** - * A pipeline that does all the heavy lifting log records processing + * A pipeline that does all the heavy lifting log records processing. */ private AsyncBufferPipeline pipeline; /** - * A counter for events dropped due to backpressure + * A counter for events dropped due to backpressure. */ private AtomicLong droppedEventsCount = new AtomicLong(0L); @@ -137,6 +144,7 @@ public void start() { .setSendQueueMaxBytes(sendQueueMaxBytes) .setMaxRetries(maxRetries) .setRetryTimeoutMs(retryTimeoutMs) + .setDropRateLimitedBatches(dropRateLimitedBatches) .setInternalQueuesCheckTimeoutMs(internalQueuesCheckTimeoutMs) .setUseDirectBuffers(useDirectBuffers) .setDrainOnStop(drainOnStop) @@ -229,6 +237,10 @@ public void setRetryTimeoutMs(long retryTimeoutMs) { this.retryTimeoutMs = retryTimeoutMs; } + public void setDropRateLimitedBatches(boolean dropRateLimitedBatches) { + this.dropRateLimitedBatches = dropRateLimitedBatches; + } + /** * "format" instead of "encoder" in the name allows to specify * the default implementation, so users don't have to write diff --git a/loki-logback-appender/src/test/java/com/github/loki4j/logback/Generators.java b/loki-logback-appender/src/test/java/com/github/loki4j/logback/Generators.java index 36a0ee5b..077404a2 100644 --- a/loki-logback-appender/src/test/java/com/github/loki4j/logback/Generators.java +++ b/loki-logback-appender/src/test/java/com/github/loki4j/logback/Generators.java @@ -453,14 +453,18 @@ public LokiResponse send(ByteBuffer batch) throws Exception { } public static class FailingHttpClient extends DummyHttpClient { - public AtomicBoolean fail = new AtomicBoolean(false); + private static final LokiResponse RATE_LIMITED = new LokiResponse(429, "Rate Limited Request"); + public final AtomicBoolean fail = new AtomicBoolean(false); + public final AtomicBoolean rateLimited = new AtomicBoolean(false); public volatile int sendCount = 0; @Override public LokiResponse send(ByteBuffer batch) throws Exception { sendCount++; var response = super.send(batch); - if (fail.get()) + if (fail.get() && !rateLimited.get()) throw new ConnectException("Text exception"); + else if (fail.get() && rateLimited.get()) + return RATE_LIMITED; return response; } } diff --git a/loki-logback-appender/src/test/java/com/github/loki4j/logback/Loki4jAppenderTest.java b/loki-logback-appender/src/test/java/com/github/loki4j/logback/Loki4jAppenderTest.java index 630aabce..9b5418de 100644 --- a/loki-logback-appender/src/test/java/com/github/loki4j/logback/Loki4jAppenderTest.java +++ b/loki-logback-appender/src/test/java/com/github/loki4j/logback/Loki4jAppenderTest.java @@ -11,6 +11,8 @@ import static com.github.loki4j.logback.Generators.*; +import java.nio.charset.Charset; + public class Loki4jAppenderTest { public static ILoggingEvent[] events = new ILoggingEvent[] { @@ -73,7 +75,7 @@ public void testBatchTimeout() { try { Thread.sleep(300L); } catch (InterruptedException e1) { } assertTrue("no batches before batchTimeout reached", sender.lastBatch() == null); - + try { Thread.sleep(300L); } catch (InterruptedException e1) { } assertEquals("batchTimeout", expected, StringPayload.parse(sender.lastBatch(), encoder.charset)); return null; @@ -93,7 +95,7 @@ public void testDrainOnStop() { try { Thread.sleep(300L); } catch (InterruptedException e1) { } assertTrue("no batches before stop", sender.lastBatch() == null); - + appender.stop(); assertEquals("batchTimeout", expected, StringPayload.parse(sender.lastBatch(), encoder.charset)); } @@ -259,25 +261,10 @@ public void testRetry() { appender.setRetryTimeoutMs(200); appender.start(); - // all retries failed - StringPayload expected = StringPayload.builder() - .stream("[level, INFO, app, my-app]", - "ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ") - .build(); sender.client.fail.set(true); appender.append(events[0]); - try { Thread.sleep(100L); } catch (InterruptedException e1) { } - assertEquals("send", 1, sender.client.sendCount); - assertEquals("send", expected, StringPayload.parse(sender.client.lastBatch, encoder.charset)); - - try { Thread.sleep(200L); } catch (InterruptedException e1) { } - assertEquals("retry1", 2, sender.client.sendCount); - assertEquals("retry1", expected, StringPayload.parse(sender.client.lastBatch, encoder.charset)); - - try { Thread.sleep(200L); } catch (InterruptedException e1) { } - assertEquals("retry2", 3, sender.client.sendCount); - assertEquals("retry2", expected, StringPayload.parse(sender.client.lastBatch, encoder.charset)); + assertRetries(sender, encoder.charset); // first retry is successful StringPayload expected2 = StringPayload.builder() @@ -298,4 +285,77 @@ public void testRetry() { appender.stop(); } + private void assertRetries(WrappingHttpSender sender, Charset charset) { + // all retries failed + StringPayload expectedPayload = StringPayload.builder() + .stream("[level, INFO, app, my-app]", + "ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ") + .build(); + + try { Thread.sleep(100L); } catch (InterruptedException e1) { } + assertEquals("send", 1, sender.client.sendCount); + assertEquals("send", expectedPayload, StringPayload.parse(sender.client.lastBatch, charset)); + try { Thread.sleep(200L); } catch (InterruptedException e1) { } + assertEquals("retry1", 2, sender.client.sendCount); + assertEquals("retry1", expectedPayload, StringPayload.parse(sender.client.lastBatch, charset)); + try { Thread.sleep(200L); } catch (InterruptedException e1) { } + assertEquals("retry2", 3, sender.client.sendCount); + assertEquals("retry2", expectedPayload, StringPayload.parse(sender.client.lastBatch, charset)); + } + + @Test + public void testRateLimitedRetry() { + var encoder = defaultToStringEncoder(); + var sender = new WrappingHttpSender(new FailingHttpClient()); + + // retries rate limited requests by default + var appender = buildRateLimitedAppender(false, encoder, sender); + appender.start(); + appender.append(events[0]); + + assertRetries(sender, encoder.charset); + + appender.stop(); + } + + private Loki4jAppender buildRateLimitedAppender( + boolean dropRateLimitedBatches, + AbstractLoki4jEncoder encoder, + WrappingHttpSender sender) { + var appender = appender(1, 4000L, encoder, sender); + appender.setDropRateLimitedBatches(dropRateLimitedBatches); + appender.setRetryTimeoutMs(200); + + sender.client.fail.set(true); + sender.client.rateLimited.set(true); + + return appender; + } + + @Test + public void testRateLimitedNoRetries() { + var encoder = defaultToStringEncoder(); + var sender = new WrappingHttpSender(new FailingHttpClient()); + + // retries rate limited requests + var appender = buildRateLimitedAppender(true, encoder, sender); + appender.setDropRateLimitedBatches(true); + StringPayload expectedPayload = StringPayload.builder() + .stream("[level, INFO, app, my-app]", + "ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ") + .build(); + + appender.start(); + + appender.append(events[0]); + try { Thread.sleep(50L); } catch (InterruptedException e1) { } + assertEquals("send-2", 1, sender.client.sendCount); + assertEquals("send-2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset)); + + try { Thread.sleep(200L); } catch (InterruptedException e1) { } + assertEquals("no-retried", 1, sender.client.sendCount); + + appender.stop(); + } + }