Skip to content

Commit

Permalink
Add PURGE stream
Browse files Browse the repository at this point in the history
  • Loading branch information
g41797 authored and nekufa committed Mar 25, 2024
1 parent 229fc82 commit 543d62d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/Stream/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ public function delete(): self
return $this;
}

public function purge(): self
{
if ($this->exists()) {
$this->client->api("STREAM.PURGE." . $this->getName());
}

return $this;
}

public function exists(): bool
{
return in_array($this->getName(), $this->client->getApi()->getStreamNames());
Expand Down
26 changes: 26 additions & 0 deletions tests/Functional/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,32 @@ public function testNack()
$this->assertSame(0, $consumer->info()->num_pending);
}

public function testPurge()
{
$client = $this->createClient();
$stream = $client->getApi()->getStream('purge');
$stream->getConfiguration()->setSubjects(['purge'])->setRetentionPolicy(RetentionPolicy::WORK_QUEUE);
$stream->create();

$consumer = $stream->getConsumer('purge');
$consumer->setExpires(5);
$consumer->getConfiguration()
->setSubjectFilter('purge')
->setReplayPolicy(ReplayPolicy::INSTANT)
->setAckPolicy(AckPolicy::EXPLICIT);

$consumer->create();

$stream->publish('purge', 'first');
$stream->publish('purge', 'second');

$this->assertSame(2, $consumer->info()->num_pending);

$stream->purge();

$this->assertSame(0, $consumer->info()->num_pending);
}

public function testConsumerExpiration()
{
$client = $this->createClient(['timeout' => 0.2, 'delay' => 0.1]);
Expand Down

0 comments on commit 543d62d

Please sign in to comment.