Skip to content

Commit

Permalink
Fix PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
acm19 committed Feb 9, 2024
1 parent 66b95f6 commit 027c6e1
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 53 deletions.
1 change: 1 addition & 0 deletions docs/docus/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Most Loki4j appender settings are optional. These few that are required are mark
|sendQueueMaxBytes|41943040|Max number of bytes to keep in the send queue. When the queue is full, incoming log events are dropped|
|maxRetries|2|Max number of attempts to send a batch to Loki before it will be dropped. A failed batch send could be retried in case of `ConnectException` or `503` status from Loki. Also, if `dropRateLimitedBatches=false` and status code is `429`, the request will be retried. All other exceptions and 4xx-5xx statuses do not cause a retry in order to avoid duplicates. Allowed values from `1` to `30`, if set outside of this range the maximum allowed value is used instead|
|retryTimeoutMs|60000|Base time in milliseconds to wait before the next attempt to re-send the failed batch. Batches are retried with an exponential backoff (multiplying this value by `2^attempt`) and jitter. Allowed values from `1` to `86_400_000` (a day in milliseconds), if set outside of this range the maximum allowed value is used instead|
|maxJitterMs|1000|Upper bound in milliseconds for a jitter added to the retries.|
|dropRateLimitedBatches|false|Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests). This reduces impacts on batches from other tenants, which could end up being delayed or dropped due to backoff.|
|internalQueuesCheckTimeoutMs|25|A timeout for Loki4j threads to sleep if encode or send queues are empty. Decreasing this value means lower latency at cost of higher CPU usage|
|useDirectBuffers|true|Use off-heap memory for storing intermediate data|
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
package com.github.loki4j.client.pipeline;

import java.util.Random;
import java.util.function.Supplier;
import java.util.function.Function;

class Jitter implements Supplier<Long> {
class Jitter {

private static final int MAX_JITTER = 1000;
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Random::new);
private final Function<Integer, Long> randomSupplier;
private final int maxJitter;

@Override
public Long get() {
return Long.valueOf(random.get().nextInt(MAX_JITTER));
Jitter(int maxJitter) {
randomSupplier = maxJitter > 0
? new RandomFunction()
: ignored -> 0L;
this.maxJitter = maxJitter;
}

long generate() {
return randomSupplier.apply(maxJitter);
}

private static class RandomFunction implements Function<Integer, Long> {
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Random::new);

@Override
public Long apply(Integer maxJitter) {
return Long.valueOf(random.get().nextInt(maxJitter));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ public static class Builder {
private long sendQueueMaxBytes = batchMaxBytes * 10;
private int maxRetries = 2;
private long retryTimeoutMs = 60 * 1000;
private Supplier<Long> jitter = new Jitter();
private BiFunction<Integer, Long, Boolean> sleep = (attempt, timeout_ms) -> {
private Jitter jitter = new Jitter(1000);
private BiFunction<Integer, Long, Boolean> sleep = (attempt, timeoutMs) -> {
try {
long backoff_ms = timeout_ms * (1 << (attempt - 1));
Thread.sleep(backoff_ms + jitter.get());
long backoffMs = timeoutMs * (1 << (attempt - 1));
Thread.sleep(backoffMs + jitter.generate());
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -285,6 +285,11 @@ public Builder setRetryTimeoutMs(long retryTimeoutMs) {
return this;
}

public Builder setMaxJitterMs(int maxJitterMs) {
this.jitter = new Jitter(maxJitterMs);
return this;
}

public Builder setSleep(BiFunction<Integer, Long, Boolean> sleep) {
this.sleep = sleep;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public class Loki4jAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
*/
private long retryTimeoutMs = 60 * 1000;

/**
* Upper bound in milliseconds for a jitter added to the retries.
*/
private int maxJitterMs = 1000;

/**
* Disables retries of batches that Loki responds to with a 429 status code (TooManyRequests).
* This reduces impacts on batches from other tenants, which could end up being delayed or dropped
Expand Down Expand Up @@ -153,6 +158,7 @@ public void start() {
.setSendQueueMaxBytes(sendQueueMaxBytes)
.setMaxRetries(maxRetries)
.setRetryTimeoutMs(retryTimeoutMs)
.setMaxJitterMs(maxJitterMs)
.setDropRateLimitedBatches(dropRateLimitedBatches)
.setInternalQueuesCheckTimeoutMs(internalQueuesCheckTimeoutMs)
.setUseDirectBuffers(useDirectBuffers)
Expand Down Expand Up @@ -240,22 +246,26 @@ public void setSendQueueMaxBytes(long sendQueueMaxBytes) {
}

public void setMaxRetries(int maxRetries) {
if (maxRetries > 0 && maxRetries <= MAX_RETRIES) {
this.maxRetries = maxRetries;
} else {
if (maxRetries > MAX_RETRIES) {
addWarn("Invalid value for `maxRetries`, using " + MAX_RETRIES + " instead.");
this.maxRetries = MAX_RETRIES;
addError("Invalid value for `maxRetries`, using " + MAX_RETRIES + " instead.");
} else {
this.maxRetries = maxRetries;
}
}
public void setRetryTimeoutMs(long retryTimeoutMs) {
if (retryTimeoutMs > 0 && retryTimeoutMs <= MAX_RETRY_TIMEOUT_MS) {
this.retryTimeoutMs = retryTimeoutMs;
} else {
if (retryTimeoutMs > MAX_RETRY_TIMEOUT_MS) {
addWarn("Invalid value for `retryTimeoutMs`, using " + MAX_RETRY_TIMEOUT_MS + " instead.");
this.retryTimeoutMs = MAX_RETRY_TIMEOUT_MS;
addError("Invalid value for `retryTimeoutMs`, using " + MAX_RETRY_TIMEOUT_MS + " instead.");
} else {
this.retryTimeoutMs = retryTimeoutMs;
}
}

public void setMaxJitterMs(int maxJitterMs) {
this.maxJitterMs = maxJitterMs;
}

public void setDropRateLimitedBatches(boolean dropRateLimitedBatches) {
this.dropRateLimitedBatches = dropRateLimitedBatches;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,11 @@ public static class FailingHttpClient extends DummyHttpClient {

@Override
public LokiResponse send(ByteBuffer batch) throws Exception {
// ensures data is not updated until assertions are done
await();
sendCount++;
var response = super.send(batch);
// ensures the code has run before running the assertions
await();
if (fail.get() && !rateLimited.get())
throw new ConnectException("Text exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import static org.junit.Assert.*;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;

import org.junit.Ignore;
import org.junit.Test;

import com.github.loki4j.client.pipeline.PipelineConfig;
import com.github.loki4j.logback.Generators.FailingHttpClient;
import com.github.loki4j.logback.Generators.FailingStringWriter;
import com.github.loki4j.logback.Generators.StoppableHttpClient;
Expand All @@ -17,14 +20,6 @@
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;

import com.github.loki4j.client.pipeline.PipelineConfig;

public class Loki4jAppenderTest {

public static ILoggingEvent[] events = new ILoggingEvent[] {
Expand Down Expand Up @@ -238,8 +233,7 @@ public void testRetry() throws InterruptedException, BrokenBarrierException, Tim
var sender = new WrappingHttpSender<FailingHttpClient>(failingHttpClient);
var encoder = defaultToStringEncoder();
var appender = appender(1, 4000L, encoder, sender);
var sleep = new CyclicSleep();
appender.setPipelineBuilder(PipelineConfig.builder().setSleep(sleep));
appender.setPipelineBuilder(PipelineConfig.builder().setSleep((a, b) -> true));
appender.start();

sender.client.fail.set(true);
Expand All @@ -251,16 +245,17 @@ public void testRetry() throws InterruptedException, BrokenBarrierException, Tim
"ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ")
.build();

failingHttpClient.await();
failingHttpClient.await();
assertEquals("send", 1, sender.client.sendCount);
assertEquals("send", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

sleep.retry.await();
failingHttpClient.await();
failingHttpClient.await();
assertEquals("retry1", 2, sender.client.sendCount);
assertEquals("retry1", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

sleep.retry.await();
failingHttpClient.await();
failingHttpClient.await();
assertEquals("retry2", 3, sender.client.sendCount);
assertEquals("retry2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));
Expand All @@ -272,10 +267,11 @@ public void testRetry() throws InterruptedException, BrokenBarrierException, Tim
.build();
appender.append(events[1]);

failingHttpClient.await();
failingHttpClient.await();
assertEquals("send-2", 4, sender.client.sendCount);
assertEquals("send-2", expected2, StringPayload.parse(sender.client.lastBatch, encoder.charset));
sleep.retry.await();
failingHttpClient.await();

sender.client.fail.set(false);

Expand All @@ -294,8 +290,7 @@ public void testRateLimitedRetry() throws InterruptedException, BrokenBarrierExc

// retries rate limited requests by default
var appender = buildRateLimitedAppender(false, encoder, sender);
var sleep = new CyclicSleep();
appender.setPipelineBuilder(PipelineConfig.builder().setSleep(sleep));
appender.setPipelineBuilder(PipelineConfig.builder().setSleep((a, b) -> true));
appender.start();
appender.append(events[0]);

Expand All @@ -305,16 +300,17 @@ public void testRateLimitedRetry() throws InterruptedException, BrokenBarrierExc
"ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ")
.build();

failingHttpClient.await();
failingHttpClient.await();
assertEquals("send", 1, sender.client.sendCount);
assertEquals("send", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

sleep.retry.await();
failingHttpClient.await();
failingHttpClient.await();
assertEquals("retry1", 2, sender.client.sendCount);
assertEquals("retry1", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

sleep.retry.await();
failingHttpClient.await();
failingHttpClient.await();
assertEquals("retry2", 3, sender.client.sendCount);
assertEquals("retry2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));
Expand Down Expand Up @@ -344,6 +340,7 @@ public void testRateLimitedNoRetries() throws InterruptedException, BrokenBarrie

appender.append(events[0]);
failingHttpClient.await();
failingHttpClient.await();
assertEquals("send-2", 1, sender.client.sendCount);
assertEquals("send-2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

Expand All @@ -363,20 +360,4 @@ private Loki4jAppender buildRateLimitedAppender(
return appender;
}

private static class CyclicSleep implements BiFunction<Integer, Long, Boolean> {
private CyclicBarrier retry = new CyclicBarrier(2);

@Override
public Boolean apply(Integer t, Long u) {
try {
retry.await(1L, TimeUnit.MINUTES);
return true;
} catch (InterruptedException | BrokenBarrierException | TimeoutException ex) {
Thread.currentThread().interrupt();
return false;
}
}

}

}

0 comments on commit 027c6e1

Please sign in to comment.