Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
acm19 committed Jan 17, 2024
1 parent ccfdd0a commit ddd8577
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.github.loki4j.client.pipeline;

import java.util.Random;
import java.util.function.Supplier;

class Jitter implements Supplier<Long> {

private static final int JITTER_BOUND = 1000;
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Random::new);

@Override
public Long get() {
return Long.valueOf(random.get().nextInt(JITTER_BOUND));
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.loki4j.client.pipeline;

import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -200,7 +199,7 @@ public static class Builder {
private Supplier<Long> jitter = new Jitter();
private BiFunction<Integer, Long, Boolean> sleep = (attempt, timeout) -> {
try {
long backoff = timeout * (2 << (attempt - 1));
long backoff = timeout * (1 << (attempt - 1));
Thread.sleep(backoff + jitter.get());
return true;
} catch (InterruptedException e) {
Expand Down Expand Up @@ -338,19 +337,6 @@ public Builder setInternalLoggingFactory(Function<Object, Loki4jLogger> internal

}


private static class Jitter implements Supplier<Long> {

private static final int JITTER_BOUND = 1000;
private final ThreadLocal<Random> jitter = ThreadLocal.withInitial(Random::new);

@Override
public Long get() {
return Long.valueOf(jitter.get().nextInt(JITTER_BOUND));
}

}

/**
* A factory for Writer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@

import static com.github.loki4j.testkit.dummy.Generators.genMessage;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.IntSupplier;

public class Generators {
Expand Down Expand Up @@ -459,18 +463,27 @@ public static class FailingHttpClient extends DummyHttpClient {
public final AtomicBoolean fail = new AtomicBoolean(false);
public final AtomicBoolean rateLimited = new AtomicBoolean(false);
public volatile int sendCount = 0;
public IntSupplier barrier = () -> 0;
private final CyclicBarrier requestSent = new CyclicBarrier(2);

@Override
public LokiResponse send(ByteBuffer batch) throws Exception {
sendCount++;
var response = super.send(batch);
barrier.getAsInt();
await();
if (fail.get() && !rateLimited.get())
throw new ConnectException("Text exception");
else if (fail.get() && rateLimited.get())
return RATE_LIMITED;
return response;
}

void await() throws InterruptedException, BrokenBarrierException, TimeoutException {
try {
requestSent.await(1L, TimeUnit.MINUTES);
} catch (InterruptedException | BrokenBarrierException ex) {
Thread.currentThread().interrupt();
}
};
}

public static class WrappingHttpSender<T extends Loki4jHttpClient> extends AbstractHttpSender {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.IntSupplier;
import java.util.logging.Logger;

import com.github.loki4j.client.pipeline.PipelineConfig;

Expand Down Expand Up @@ -259,10 +261,8 @@ public void testBackpressure() {
}

@Test
public void testRetry() throws InterruptedException, BrokenBarrierException {
public void testRetry() throws InterruptedException, BrokenBarrierException, TimeoutException {
var failingHttpClient = new FailingHttpClient();
var requestBarrier = new RequestBarrier();
failingHttpClient.barrier = requestBarrier.barrier;
var sender = new WrappingHttpSender<FailingHttpClient>(failingHttpClient);
var encoder = defaultToStringEncoder();
var appender = appender(1, 4000L, encoder, sender);
Expand All @@ -279,17 +279,17 @@ public void testRetry() throws InterruptedException, BrokenBarrierException {
"ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ")
.build();

requestBarrier.await();
failingHttpClient.await();
assertEquals("send", 1, sender.client.sendCount);
assertEquals("send", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

sleep.retry.await();
requestBarrier.await();
failingHttpClient.await();
assertEquals("retry1", 2, sender.client.sendCount);
assertEquals("retry1", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

sleep.retry.await();
requestBarrier.await();
failingHttpClient.await();
assertEquals("retry2", 3, sender.client.sendCount);
assertEquals("retry2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

Expand All @@ -300,25 +300,23 @@ public void testRetry() throws InterruptedException, BrokenBarrierException {
.build();
appender.append(events[1]);

requestBarrier.await();
failingHttpClient.await();
assertEquals("send-2", 4, sender.client.sendCount);
assertEquals("send-2", expected2, StringPayload.parse(sender.client.lastBatch, encoder.charset));
sleep.retry.await();

sender.client.fail.set(false);

requestBarrier.await();
failingHttpClient.await();
assertEquals("retry1-2", 5, sender.client.sendCount);
assertEquals("retry1-2", expected2, StringPayload.parse(sender.client.lastBatch, encoder.charset));

appender.stop();
}

@Test
public void testRateLimitedRetry() throws InterruptedException, BrokenBarrierException {
public void testRateLimitedRetry() throws InterruptedException, BrokenBarrierException, TimeoutException {
var failingHttpClient = new FailingHttpClient();
var requestBarrier = new RequestBarrier();
failingHttpClient.barrier = requestBarrier.barrier;
var sender = new WrappingHttpSender<>(failingHttpClient);
var encoder = defaultToStringEncoder();

Expand All @@ -335,29 +333,27 @@ public void testRateLimitedRetry() throws InterruptedException, BrokenBarrierExc
"ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ")
.build();

requestBarrier.await();
failingHttpClient.await();
assertEquals("send", 1, sender.client.sendCount);
assertEquals("send", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

sleep.retry.await();
requestBarrier.await();
failingHttpClient.await();
assertEquals("retry1", 2, sender.client.sendCount);
assertEquals("retry1", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

sleep.retry.await();
requestBarrier.await();
failingHttpClient.await();
assertEquals("retry2", 3, sender.client.sendCount);
assertEquals("retry2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

appender.stop();
}

@Test
public void testRateLimitedNoRetries() throws InterruptedException, BrokenBarrierException {
public void testRateLimitedNoRetries() throws InterruptedException, BrokenBarrierException, TimeoutException {
var encoder = defaultToStringEncoder();
var requestBarrier = new RequestBarrier();
var failingHttpClient = new FailingHttpClient();
failingHttpClient.barrier = requestBarrier.barrier;
var sender = new WrappingHttpSender<>(failingHttpClient);

// retries rate limited requests
Expand All @@ -375,7 +371,7 @@ public void testRateLimitedNoRetries() throws InterruptedException, BrokenBarrie
appender.start();

appender.append(events[0]);
requestBarrier.await();
failingHttpClient.await();
assertEquals("send-2", 1, sender.client.sendCount);
assertEquals("send-2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset));

Expand All @@ -401,31 +397,14 @@ private static class CyclicSleep implements BiFunction<Integer, Long, Boolean> {
@Override
public Boolean apply(Integer t, Long u) {
try {
retry.await();
retry.await(1L, TimeUnit.MINUTES);
return true;
} catch (InterruptedException | BrokenBarrierException ex) {
} catch (InterruptedException | BrokenBarrierException | TimeoutException ex) {
Thread.currentThread().interrupt();
return false;
}
}

}

private static class RequestBarrier {

private final CyclicBarrier requestSent = new CyclicBarrier(2);
private final IntSupplier barrier = () -> {
try {
return requestSent.await();
} catch (InterruptedException | BrokenBarrierException ex) {
Thread.currentThread().interrupt();
return -1;
}
};

private int await() throws InterruptedException, BrokenBarrierException {
return requestSent.await();
}
}

}

0 comments on commit ddd8577

Please sign in to comment.