diff --git a/loki-client/src/main/java/com/github/loki4j/client/pipeline/PipelineConfig.java b/loki-client/src/main/java/com/github/loki4j/client/pipeline/PipelineConfig.java index 69c14124..18bdd5c9 100644 --- a/loki-client/src/main/java/com/github/loki4j/client/pipeline/PipelineConfig.java +++ b/loki-client/src/main/java/com/github/loki4j/client/pipeline/PipelineConfig.java @@ -2,7 +2,6 @@ import java.util.function.BiFunction; import java.util.function.Function; -import java.util.function.Supplier; import com.github.loki4j.client.http.ApacheHttpClient; import com.github.loki4j.client.http.HttpConfig; diff --git a/loki-logback-appender/src/test/java/com/github/loki4j/logback/AbstractLoki4jEncoderTest.java b/loki-logback-appender/src/test/java/com/github/loki4j/logback/AbstractLoki4jEncoderTest.java index 3a382de8..400d6ea5 100644 --- a/loki-logback-appender/src/test/java/com/github/loki4j/logback/AbstractLoki4jEncoderTest.java +++ b/loki-logback-appender/src/test/java/com/github/loki4j/logback/AbstractLoki4jEncoderTest.java @@ -128,7 +128,7 @@ public void testMarker() { .stream("[l, INFO, mrk1, v1, mrk2, v2]", "ts=104 INFO | Test message 4") .build(), - StringPayload.parse(sender.lastBatch())); + StringPayload.parse(sender.lastSendData())); //System.out.println(new String(sender.lastBatch())); return null; }); @@ -165,7 +165,7 @@ public void testOrdering() { "ts=103 ERROR | Test message 5", "ts=110 INFO | Test message 6") .build(), - StringPayload.parse(sender.lastBatch())); + StringPayload.parse(sender.lastSendData())); return null; }); @@ -188,7 +188,7 @@ public void testOrdering() { "ts=105 INFO | Test message 1", "ts=110 INFO | Test message 6") .build(), - StringPayload.parse(sender.lastBatch())); + StringPayload.parse(sender.lastSendData())); return null; }); @@ -214,7 +214,7 @@ public void testOrdering() { .stream("[l, ERROR]", "ts=103 ERROR | Test message 5") .build(), - StringPayload.parse(sender.lastBatch())); + StringPayload.parse(sender.lastSendData())); return null; }); @@ -240,7 +240,7 @@ public void testOrdering() { .stream("[l, ERROR]", "ts=103 ERROR | Test message 5") .build(), - StringPayload.parse(sender.lastBatch())); + StringPayload.parse(sender.lastSendData())); return null; }); } diff --git a/loki-logback-appender/src/test/java/com/github/loki4j/logback/Generators.java b/loki-logback-appender/src/test/java/com/github/loki4j/logback/Generators.java index 47a07cf0..42fb94b8 100644 --- a/loki-logback-appender/src/test/java/com/github/loki4j/logback/Generators.java +++ b/loki-logback-appender/src/test/java/com/github/loki4j/logback/Generators.java @@ -1,7 +1,6 @@ package com.github.loki4j.logback; import java.io.IOException; -import java.net.ConnectException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -11,11 +10,6 @@ import java.util.Iterator; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -27,13 +21,14 @@ import com.github.loki4j.client.batch.LogRecordBatch; import com.github.loki4j.client.http.HttpConfig; import com.github.loki4j.client.http.Loki4jHttpClient; -import com.github.loki4j.client.http.LokiResponse; import com.github.loki4j.client.pipeline.PipelineConfig; import com.github.loki4j.client.util.ByteBufferFactory; import com.github.loki4j.client.writer.Writer; +import com.github.loki4j.testkit.dummy.DummyHttpClient; import com.github.loki4j.testkit.dummy.ExceptionGenerator; import com.github.loki4j.testkit.dummy.LokiHttpServerMock; import com.github.loki4j.testkit.dummy.StringPayload; +import com.github.loki4j.testkit.dummy.DummyHttpClient.SendInvocation; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.LoggerContext; @@ -424,82 +419,19 @@ public void serializeBatch(LogRecordBatch batch) { } } - public static class DummyHttpSender extends AbstractHttpSender { - private final DummyHttpClient client = new DummyHttpClient(); + public static class DummyHttpSender extends WrappingHttpSender { - public byte[] lastBatch() { - return client.lastBatch; + public DummyHttpSender() { + super(new DummyHttpClient()); } - @Override - public HttpConfig.Builder getConfig() { - return defaultHttpConfig; - } - - @Override - public Function getHttpClientFactory() { - return cfg -> client; - } - } - - public static class DummyHttpClient implements Loki4jHttpClient { - public byte[] lastBatch; - - @Override - public void close() throws Exception { } - - @Override - public HttpConfig getConfig() { - return defaultHttpConfig.build("test"); - } - - @Override - public LokiResponse send(ByteBuffer batch) throws Exception { - lastBatch = new byte[batch.remaining()]; - batch.get(lastBatch); - return new LokiResponse(204, ""); - } - } - - public static class StoppableHttpClient extends DummyHttpClient { - public AtomicBoolean wait = new AtomicBoolean(false); - @Override - public LokiResponse send(ByteBuffer batch) throws Exception { - while(wait.get()) - LockSupport.parkNanos(1000); - return super.send(batch); + public SendInvocation captureSendInvocation() { + return client.captureSendInvocation(); } - } - public static class FailingHttpClient extends DummyHttpClient { - private static final LokiResponse RATE_LIMITED = new LokiResponse(429, "Rate Limited Request"); - public final AtomicBoolean fail = new AtomicBoolean(false); - public final AtomicBoolean rateLimited = new AtomicBoolean(false); - public volatile int sendCount = 0; - private final CyclicBarrier requestSent = new CyclicBarrier(2); - - @Override - public LokiResponse send(ByteBuffer batch) throws Exception { - // ensures data is not updated until assertions are done - await(); - sendCount++; - var response = super.send(batch); - // ensures the code has run before running the assertions - await(); - if (fail.get() && !rateLimited.get()) - throw new ConnectException("Text exception"); - else if (fail.get() && rateLimited.get()) - return RATE_LIMITED; - return response; + public byte[] lastSendData() { + return client.lastSendData(); } - - void await() throws InterruptedException, BrokenBarrierException, TimeoutException { - try { - requestSent.await(1L, TimeUnit.MINUTES); - } catch (InterruptedException | BrokenBarrierException ex) { - Thread.currentThread().interrupt(); - } - }; } public static class WrappingHttpSender extends AbstractHttpSender { diff --git a/loki-logback-appender/src/test/java/com/github/loki4j/logback/JsonLayoutTest.java b/loki-logback-appender/src/test/java/com/github/loki4j/logback/JsonLayoutTest.java index 4d52f3d5..81ac268a 100644 --- a/loki-logback-appender/src/test/java/com/github/loki4j/logback/JsonLayoutTest.java +++ b/loki-logback-appender/src/test/java/com/github/loki4j/logback/JsonLayoutTest.java @@ -49,7 +49,7 @@ public void testWorksInAppender() { appender.append(events); appender.waitAllAppended(); - var actual = StringPayload.parse(sender.lastBatch(), encoder.charset); + var actual = StringPayload.parse(sender.lastSendData(), encoder.charset); //System.out.println(expected); //System.out.println(actual); assertEquals("jsonLayout", expected, actual); diff --git a/loki-logback-appender/src/test/java/com/github/loki4j/logback/Loki4jAppenderTest.java b/loki-logback-appender/src/test/java/com/github/loki4j/logback/Loki4jAppenderTest.java index 4f80712d..01870d95 100644 --- a/loki-logback-appender/src/test/java/com/github/loki4j/logback/Loki4jAppenderTest.java +++ b/loki-logback-appender/src/test/java/com/github/loki4j/logback/Loki4jAppenderTest.java @@ -5,17 +5,18 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.util.function.BiFunction; import org.junit.Test; import com.github.loki4j.client.pipeline.PipelineConfig; -import com.github.loki4j.logback.Generators.FailingHttpClient; import com.github.loki4j.logback.Generators.FailingStringWriter; -import com.github.loki4j.logback.Generators.StoppableHttpClient; import com.github.loki4j.logback.Generators.WrappingHttpSender; +import com.github.loki4j.testkit.dummy.FailingHttpClient; import com.github.loki4j.testkit.dummy.StringPayload; +import com.github.loki4j.testkit.dummy.SuspendableHttpClient; +import com.github.loki4j.testkit.dummy.FailingHttpClient.FailureType; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -59,13 +60,14 @@ public void testBatchSize() { var encoder = defaultToStringEncoder(); var sender = dummySender(); withAppender(appender(3, 1000L, encoder, sender), appender -> { + var sendCapture = sender.captureSendInvocation(); appender.append(events[0]); appender.append(events[1]); - assertTrue("no batches before batchSize reached", sender.lastBatch() == null); + assertTrue("no batches before batchSize reached", sender.lastSendData() == null); appender.append(events[2]); - appender.waitAllAppended(); - assertEquals("batchSize", expected, StringPayload.parse(sender.lastBatch(), encoder.charset)); + var send = sendCapture.waitForNextSend(100); + assertEquals("batchSize", expected, StringPayload.parse(send.data, encoder.charset)); return null; }); } @@ -78,13 +80,13 @@ public void testBatchTimeout() { appender.append(events[0]); appender.append(events[1]); appender.append(events[2]); - assertTrue("no batches before batchTimeout reached", sender.lastBatch() == null); + assertTrue("no batches before batchTimeout reached", sender.lastSendData() == null); try { Thread.sleep(300L); } catch (InterruptedException e1) { } - assertTrue("no batches before batchTimeout reached", sender.lastBatch() == null); + assertTrue("no batches before batchTimeout reached", sender.lastSendData() == null); try { Thread.sleep(300L); } catch (InterruptedException e1) { } - assertEquals("batchTimeout", expected, StringPayload.parse(sender.lastBatch(), encoder.charset)); + assertEquals("batchTimeout", expected, StringPayload.parse(sender.lastSendData(), encoder.charset)); return null; }); } @@ -113,13 +115,13 @@ public void testDrainOnStop() { appender.append(events[0]); appender.append(events[1]); appender.append(events[2]); - assertTrue("no batches before stop", sender.lastBatch() == null); + assertTrue("no batches before stop", sender.lastSendData() == null); try { Thread.sleep(300L); } catch (InterruptedException e1) { } - assertTrue("no batches before stop", sender.lastBatch() == null); + assertTrue("no batches before stop", sender.lastSendData() == null); appender.stop(); - assertEquals("batchTimeout", expected, StringPayload.parse(sender.lastBatch(), encoder.charset)); + assertEquals("batchTimeout", expected, StringPayload.parse(sender.lastSendData(), encoder.charset)); } @Test @@ -148,14 +150,14 @@ public void testDrainOnStopWhileEncoderFails() { appender.append(events[0]); appender.append(events[1]); appender.append(events[2]); - assertTrue("no batches before stop", sender.lastBatch() == null); + assertTrue("no batches before stop", sender.lastSendData() == null); try { Thread.sleep(300L); } catch (InterruptedException e1) { } - assertTrue("no batches before stop", sender.lastBatch() == null); + assertTrue("no batches before stop", sender.lastSendData() == null); failingWriterRef.get().fail.set(false); appender.stop(); - assertEquals("batchTimeout", expected, StringPayload.parse(sender.lastBatch(), encoder.charset)); + assertEquals("batchTimeout", expected, StringPayload.parse(sender.lastSendData(), encoder.charset)); } @Test @@ -168,13 +170,13 @@ public void testDrainOnStopDisabled() { appender.append(events[0]); appender.append(events[1]); appender.append(events[2]); - assertTrue("no batches before stop", sender.lastBatch() == null); + assertTrue("no batches before stop", sender.lastSendData() == null); try { Thread.sleep(300L); } catch (InterruptedException e1) { } - assertTrue("no batches before stop", sender.lastBatch() == null); + assertTrue("no batches before stop", sender.lastSendData() == null); appender.stop(); - assertTrue("no batches after stop", sender.lastBatch() == null); + assertTrue("no batches after stop", sender.lastSendData() == null); } @Test @@ -194,27 +196,28 @@ public void testTooLargeEventDropped() { var appender = appender(3, 4000L, encoder, sender); appender.setBatchMaxBytes(500); appender.start(); + var sendCapture = sender.captureSendInvocation(); appender.append(events[0]); appender.append(loggingEvent(100L, Level.INFO, "TestApp", "main", longStr, null)); appender.append(events[1]); appender.append(events[2]); - try { Thread.sleep(100L); } catch (InterruptedException e1) { } - assertEquals("batchSize", expected, StringPayload.parse(sender.lastBatch(), encoder.charset)); + var send = sendCapture.waitForNextSend(100); + assertEquals("batchSize", expected, StringPayload.parse(send.data, encoder.charset)); appender.stop(); } @Test public void testBackpressure() { - var sender = new WrappingHttpSender(new StoppableHttpClient()); + var sender = new WrappingHttpSender<>(new SuspendableHttpClient()); var encoder = defaultToStringEncoder(); var appender = appender(1, 4000L, encoder, sender); appender.setBatchMaxBytes(120); appender.setSendQueueMaxBytes(150); appender.start(); - sender.client.wait.set(true); + sender.client.suspend(); // hanging sender appender.append(events[0]); try { Thread.sleep(100L); } catch (InterruptedException e1) { } @@ -234,7 +237,7 @@ public void testBackpressure() { appender.append(events[2]); try { Thread.sleep(100L); } catch (InterruptedException e1) { } - sender.client.wait.set(false); + sender.client.resume(); try { Thread.sleep(100L); } catch (InterruptedException e1) { } assertEquals("some events dropped", 3, appender.droppedEventsCount()); @@ -243,136 +246,94 @@ public void testBackpressure() { } @Test - public void testRetry() throws InterruptedException, BrokenBarrierException, TimeoutException { + public void testConnectionExceptionRetry() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException { + StringPayload expectedPayload = StringPayload.builder() + .stream("[level, INFO, app, my-app]", + "ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ") + .build(); + StringPayload expectedPayload2 = StringPayload.builder() + .stream("[level, WARN, app, my-app]", + "ts=104 l=WARN c=test.TestApp t=thread-2 | Test message 2 ") + .build(); + var failingHttpClient = new FailingHttpClient(); - var sender = new WrappingHttpSender(failingHttpClient); + var sender = new WrappingHttpSender<>(failingHttpClient); var encoder = defaultToStringEncoder(); var appender = appender(1, 4000L, encoder, sender); appender.setPipelineBuilder(PipelineConfig.builder().setSleep((a, b) -> true)); appender.start(); - sender.client.fail.set(true); - appender.append(events[0]); - // all retries failed - StringPayload expectedPayload = StringPayload.builder() - .stream("[level, INFO, app, my-app]", - "ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ") - .build(); - failingHttpClient.await(); - failingHttpClient.await(); - assertEquals("send", 1, sender.client.sendCount); - assertEquals("send", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset)); + sender.client.setFailure(FailureType.CONNECTION_EXCEPTION); + + var sendCapture = sender.client.captureSendInvocation(); + appender.append(events[0]); - failingHttpClient.await(); - failingHttpClient.await(); - assertEquals("retry1", 2, sender.client.sendCount); - assertEquals("retry1", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset)); + var send1 = sendCapture.waitForNextSend(100); + assertEquals("send", 1, send1.sendNo); + assertEquals("send", expectedPayload, StringPayload.parse(send1.data, encoder.charset)); - failingHttpClient.await(); - failingHttpClient.await(); - assertEquals("retry2", 3, sender.client.sendCount); - assertEquals("retry2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset)); + var send2 = send1.waitForNextSend(100); + assertEquals("retry1", 2, send2.sendNo); + assertEquals("retry1", expectedPayload, StringPayload.parse(send2.data, encoder.charset)); + + var send3 = send2.waitForNextSend(100); + assertEquals("retry2", 3, send3.sendNo); + assertEquals("retry2", expectedPayload, StringPayload.parse(send3.data, encoder.charset)); // first retry is successful - StringPayload expected2 = StringPayload.builder() - .stream("[level, WARN, app, my-app]", - "ts=104 l=WARN c=test.TestApp t=thread-2 | Test message 2 ") - .build(); + appender.append(events[1]); - failingHttpClient.await(); - failingHttpClient.await(); - assertEquals("send-2", 4, sender.client.sendCount); - assertEquals("send-2", expected2, StringPayload.parse(sender.client.lastBatch, encoder.charset)); - failingHttpClient.await(); + var send4 = send3.waitForNextSend(100); + assertEquals("send-2", 4, send4.sendNo); + assertEquals("send-2", expectedPayload2, StringPayload.parse(send4.data, encoder.charset)); - sender.client.fail.set(false); + sender.client.setFailure(FailureType.NONE); - failingHttpClient.await(); - assertEquals("retry1-2", 5, sender.client.sendCount); - assertEquals("retry1-2", expected2, StringPayload.parse(sender.client.lastBatch, encoder.charset)); + var send5 = send4.waitForNextSend(100); + assertEquals("retry1-2", 5, send5.sendNo); + assertEquals("retry1-2", expectedPayload2, StringPayload.parse(send5.data, encoder.charset)); appender.stop(); } @Test - public void testRateLimitedRetry() throws InterruptedException, BrokenBarrierException, TimeoutException { - var failingHttpClient = new FailingHttpClient(); - var sender = new WrappingHttpSender<>(failingHttpClient); - var encoder = defaultToStringEncoder(); - - // retries rate limited requests by default - var appender = buildRateLimitedAppender(false, encoder, sender); - appender.setPipelineBuilder(PipelineConfig.builder().setSleep((a, b) -> true)); - appender.start(); - appender.append(events[0]); - - // all retries failed + public void testRateLimitedRetry() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException { StringPayload expectedPayload = StringPayload.builder() .stream("[level, INFO, app, my-app]", "ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ") .build(); - failingHttpClient.await(); - failingHttpClient.await(); - assertEquals("send", 1, sender.client.sendCount); - assertEquals("send", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset)); - - failingHttpClient.await(); - failingHttpClient.await(); - assertEquals("retry1", 2, sender.client.sendCount); - assertEquals("retry1", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset)); - - failingHttpClient.await(); - failingHttpClient.await(); - assertEquals("retry2", 3, sender.client.sendCount); - assertEquals("retry2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset)); - - appender.stop(); - } - - @Test - public void testRateLimitedNoRetries() throws InterruptedException, BrokenBarrierException, TimeoutException { - var encoder = defaultToStringEncoder(); var failingHttpClient = new FailingHttpClient(); var sender = new WrappingHttpSender<>(failingHttpClient); + var encoder = defaultToStringEncoder(); - // retries rate limited requests - var appender = buildRateLimitedAppender(true, encoder, sender); - appender.setDropRateLimitedBatches(true); - BiFunction failIfSleep = (i, j) -> { - throw new IllegalStateException("It should not attempt to retry."); - }; - appender.setPipelineBuilder(PipelineConfig.builder().setSleep(failIfSleep)); - StringPayload expectedPayload = StringPayload.builder() - .stream("[level, INFO, app, my-app]", - "ts=100 l=INFO c=test.TestApp t=thread-1 | Test message 1 ") - .build(); - + var appender = appender(1, 4000L, encoder, sender); + // retries rate limited requests by default + appender.setDropRateLimitedBatches(false); + appender.setPipelineBuilder(PipelineConfig.builder().setSleep((a, b) -> true)); appender.start(); + sender.client.setFailure(FailureType.RATE_LIMITED); + var sendCapture = sender.client.captureSendInvocation(); + appender.append(events[0]); - failingHttpClient.await(); - failingHttpClient.await(); - assertEquals("send-2", 1, sender.client.sendCount); - assertEquals("send-2", expectedPayload, StringPayload.parse(sender.client.lastBatch, encoder.charset)); - appender.stop(); - } + var send1 = sendCapture.waitForNextSend(100); + assertEquals("send", 1, send1.sendNo); + assertEquals("send", expectedPayload, StringPayload.parse(send1.data, encoder.charset)); - private Loki4jAppender buildRateLimitedAppender( - boolean dropRateLimitedBatches, - AbstractLoki4jEncoder encoder, - WrappingHttpSender sender) { - var appender = appender(1, 4000L, encoder, sender); - appender.setDropRateLimitedBatches(dropRateLimitedBatches); + var send2 = send1.waitForNextSend(100); + assertEquals("retry1", 2, send2.sendNo); + assertEquals("retry1", expectedPayload, StringPayload.parse(send2.data, encoder.charset)); - sender.client.fail.set(true); - sender.client.rateLimited.set(true); + var send3 = send2.waitForNextSend(100); + assertEquals("retry2", 3, send3.sendNo); + assertEquals("retry2", expectedPayload, StringPayload.parse(send3.data, encoder.charset)); - return appender; + appender.stop(); } } diff --git a/loki-logback-appender/src/test/java/com/github/loki4j/logback/PatternLayoutTest.java b/loki-logback-appender/src/test/java/com/github/loki4j/logback/PatternLayoutTest.java index 3ea6bfba..942d2d1c 100644 --- a/loki-logback-appender/src/test/java/com/github/loki4j/logback/PatternLayoutTest.java +++ b/loki-logback-appender/src/test/java/com/github/loki4j/logback/PatternLayoutTest.java @@ -36,7 +36,7 @@ public void testEncodeEscapes() { "['100100002','l=INFO c=TestApp t=main | m3-line1\\rline2\\r ']]}]}" ).replace('\'', '"'); - var actual = new String(sender.lastBatch(), encoder.charset); + var actual = new String(sender.lastSendData(), encoder.charset); //System.out.println(expected); //System.out.println(actual); assertEquals("escape", expected, actual); diff --git a/testkit/build.gradle b/testkit/build.gradle index 05eebd68..c8f4abb1 100644 --- a/testkit/build.gradle +++ b/testkit/build.gradle @@ -10,6 +10,8 @@ repositories { dependencies { + implementation project(":loki-client") + testImplementation libs.junit } diff --git a/testkit/src/main/java/com/github/loki4j/testkit/dummy/DummyHttpClient.java b/testkit/src/main/java/com/github/loki4j/testkit/dummy/DummyHttpClient.java new file mode 100644 index 00000000..35ac7257 --- /dev/null +++ b/testkit/src/main/java/com/github/loki4j/testkit/dummy/DummyHttpClient.java @@ -0,0 +1,74 @@ +package com.github.loki4j.testkit.dummy; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import com.github.loki4j.client.http.HttpConfig; +import com.github.loki4j.client.http.Loki4jHttpClient; +import com.github.loki4j.client.http.LokiResponse; + +public class DummyHttpClient implements Loki4jHttpClient { + + private volatile SendInvocation lastSend; + private volatile CompletableFuture nextSendFuture = new CompletableFuture<>(); + private volatile int sendNo = 0; + + @Override + public void close() throws Exception { } + + @Override + public HttpConfig getConfig() { + return HttpConfig.builder().build("test"); + } + + @Override + public LokiResponse send(ByteBuffer batch) throws Exception { + var data = new byte[batch.remaining()]; + batch.get(data); + var newSendFuture = new CompletableFuture(); + var send = new SendInvocation(++sendNo, data, newSendFuture); + nextSendFuture.complete(send); + nextSendFuture = newSendFuture; + lastSend = send; + return new LokiResponse(204, ""); + } + + public SendInvocation lastSend() { + return lastSend; + } + + public byte[] lastSendData() { + return lastSend == null ? null : lastSend.data; + } + + public SendInvocation captureSendInvocation() { + return new SendInvocation(-1, new byte[0], nextSendFuture); + } + + static SendInvocation waitForFuture(CompletableFuture sendFuture, long timeoutMs) { + try { + return sendFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new RuntimeException("Concurrency exception occurred while waiting for the next send", e); + } + } + + public static class SendInvocation { + + public final int sendNo; + public final byte[] data; + + private final CompletableFuture next; + + public SendInvocation(int sendNo, byte[] batch, CompletableFuture next) { + this.sendNo = sendNo; + this.data = batch; + this.next = next; + } + + public SendInvocation waitForNextSend(long timeoutMs) { + return waitForFuture(next, timeoutMs); + } + } +} diff --git a/testkit/src/main/java/com/github/loki4j/testkit/dummy/FailingHttpClient.java b/testkit/src/main/java/com/github/loki4j/testkit/dummy/FailingHttpClient.java new file mode 100644 index 00000000..8afeb74f --- /dev/null +++ b/testkit/src/main/java/com/github/loki4j/testkit/dummy/FailingHttpClient.java @@ -0,0 +1,32 @@ +package com.github.loki4j.testkit.dummy; + +import java.net.ConnectException; +import java.nio.ByteBuffer; + +import com.github.loki4j.client.http.LokiResponse; + +public class FailingHttpClient extends DummyHttpClient { + + private volatile FailureType failureType = FailureType.NONE; + + @Override + public LokiResponse send(ByteBuffer batch) throws Exception { + var response = super.send(batch); + if (failureType == FailureType.CONNECTION_EXCEPTION) + throw new ConnectException("Text exception"); + else if (failureType == FailureType.RATE_LIMITED) + return new LokiResponse(429, "Rate Limited Request"); + return response; + } + + public void setFailure(FailureType failureType) { + this.failureType = failureType; + } + + public enum FailureType { + NONE, + CONNECTION_EXCEPTION, + RATE_LIMITED + } + +} diff --git a/testkit/src/main/java/com/github/loki4j/testkit/dummy/SuspendableHttpClient.java b/testkit/src/main/java/com/github/loki4j/testkit/dummy/SuspendableHttpClient.java new file mode 100644 index 00000000..e7b26361 --- /dev/null +++ b/testkit/src/main/java/com/github/loki4j/testkit/dummy/SuspendableHttpClient.java @@ -0,0 +1,26 @@ +package com.github.loki4j.testkit.dummy; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +import com.github.loki4j.client.http.LokiResponse; + +public class SuspendableHttpClient extends DummyHttpClient { + private final AtomicBoolean wait = new AtomicBoolean(false); + + @Override + public LokiResponse send(ByteBuffer batch) throws Exception { + while(wait.get()) + LockSupport.parkNanos(1000); + return super.send(batch); + } + + public void suspend() { + wait.set(true); + } + + public void resume() { + wait.set(false); + } +}