Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for max_bytes when creating queues #122

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ajc/queue_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type queueCommand struct {
memory bool
replicas int
discardOld bool
maxBytes int64
maxBytesSet bool
}

func configureQueueCommand(app *fisk.Application) {
Expand All @@ -43,6 +45,7 @@ func configureQueueCommand(app *fisk.Application) {
add.Flag("memory", "Store the Queue in memory").BoolVar(&c.memory)
add.Flag("replicas", "Number of storage replicas to configure").Default("1").IntVar(&c.replicas)
add.Flag("discard-old", "When full, discard old entries").BoolVar(&c.discardOld)
add.Flag("max-bytes", "Maximum bytes that can be stored in the queue, -1 for unlimited").Default("-1").IsSetByUser(&c.maxBytesSet).Int64Var(&c.maxBytes)

queues.Command("list", "List Queues").Alias("ls").Action(c.lsAction)

Expand All @@ -65,6 +68,7 @@ func configureQueueCommand(app *fisk.Application) {
cfg.Flag("run-time", "Maximum run-time to allow per task").Default("-1s").DurationVar(&c.maxTime)
cfg.Flag("concurrent", "Maximum concurrent jobs that can be ran").Default("-2").IntVar(&c.maxConcurrent)
cfg.Flag("replicas", "Number of storage replicas to configure").Default("-1").IntVar(&c.replicas)
cfg.Flag("max-bytes", "Maximum bytes that can be stored in the queue, -1 for unlimited").Default("-1").IsSetByUser(&c.maxBytesSet).Int64Var(&c.maxBytes)
}

func (c *queueCommand) addAction(_ *fisk.ParseContext) error {
Expand All @@ -88,6 +92,10 @@ func (c *queueCommand) addAction(_ *fisk.ParseContext) error {
MaxConcurrent: c.maxConcurrent,
}

if c.maxBytesSet {
queue.MaxBytes = c.maxBytes
}

err = admin.PrepareQueue(queue, c.replicas, c.memory)
if err != nil {
return err
Expand Down Expand Up @@ -146,6 +154,10 @@ func (c *queueCommand) configureAction(_ *fisk.ParseContext) error {
ccfg.MaxAckPending = c.maxConcurrent
}

if c.maxBytesSet {
scfg.MaxBytes = c.maxBytes
}

mgr, _, err := admin.TasksStore()
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion ajc/task_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type taskCommand struct {
ed25519Seed string
ed25519PubKey string
optionalSigs bool
maxBytes int64
maxBytesSet bool

limit int
json bool
Expand Down Expand Up @@ -85,6 +87,7 @@ func configureTaskCommand(app *fisk.Application) {
init.Flag("memory", "Use memory as a storage backend").BoolVar(&c.memory)
init.Flag("retention", "Sets how long Tasks are kept in the Task Store").DurationVar(&c.retention)
init.Flag("replicas", "How many replicas to keep in a JetStream cluster").Default("1").IntVar(&c.replicas)
init.Flag("max-bytes", "Maximum bytes that can be stored in the queue, -1 for unlimited").Default("-1").IsSetByUser(&c.maxBytesSet).Int64Var(&c.maxBytes)

config := tasks.Command("configure", "Configures the Task storage").Alias("config").Alias("cfg").Action(c.configAction)
config.Arg("retention", "Sets how long Tasks are kept in the Task Store").Required().DurationVar(&c.retention)
Expand Down Expand Up @@ -172,7 +175,7 @@ func (c *taskCommand) initAction(_ *fisk.ParseContext) error {
return err
}

err = admin.PrepareTasks(c.memory, c.replicas, c.retention)
err = admin.PrepareTasks(c.memory, c.replicas, c.retention, c.maxBytes, c.maxBytesSet)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions ajc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func showQueue(q *asyncjobs.QueueInfo) {
fmt.Printf(" Memory Based: %t\n", q.Stream.Config.Storage == api.MemoryStorage)
fmt.Printf(" Replicas: %d\n", q.Stream.Config.Replicas)
fmt.Printf(" Archive Period: %s\n", humanizeDuration(q.Stream.Config.MaxAge))
fmt.Printf(" Max Bytes: %d\n", q.Stream.Config.MaxBytes)
fmt.Printf(" Max Task Tries: %d\n", q.Consumer.Config.MaxDeliver)
fmt.Printf(" Max Run Time: %s\n", humanizeDuration(q.Consumer.Config.AckWait))
fmt.Printf(" Max Concurrent: %d\n", q.Consumer.Config.MaxAckPending)
Expand Down
10 changes: 6 additions & 4 deletions asyncjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
DefaultMaxTries = 10
// DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting
DefaultQueueMaxConcurrent = 100
// DefaultMaxBytes when not configured for a queue defaults to 10Mb
DefaultMaxBytes = 10485760
)

// StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream
Expand All @@ -34,8 +36,8 @@ type StorageAdmin interface {
DeleteQueue(name string) error
PrepareQueue(q *Queue, replicas int, memory bool) error
ConfigurationInfo() (*nats.KeyValueBucketStatus, error)
PrepareConfigurationStore(memory bool, replicas int) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
PrepareConfigurationStore(memory bool, replicas int, maxBytes int64, maxBytesSet bool) error
PrepareTasks(memory bool, replicas int, retention time.Duration, maxBytes int64, maxBytesSet bool) error
DeleteTaskByID(id string) error
TasksInfo() (*TasksInfo, error)
Tasks(ctx context.Context, limit int32) (chan *Task, error)
Expand Down Expand Up @@ -68,8 +70,8 @@ type Storage interface {
TerminateItem(ctx context.Context, item *ProcessItem) error
PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error)
PrepareQueue(q *Queue, replicas int, memory bool) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
PrepareConfigurationStore(memory bool, replicas int) error
PrepareTasks(memory bool, replicas int, retention time.Duration, maxBytes int64, maxBytesSet bool) error
PrepareConfigurationStore(memory bool, replicas int, maxBytes int64, maxBytesSet bool) error
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
Expand Down
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ func (c *Client) startPrometheus() {
}

func (c *Client) setupStreams() error {
err := c.storage.PrepareTasks(c.opts.memoryStore, c.opts.replicas, c.opts.taskRetention)
err := c.storage.PrepareTasks(c.opts.memoryStore, c.opts.replicas, c.opts.taskRetention, c.opts.maxBytes, c.opts.maxBytesSet)
if err != nil {
return err
}

return c.storage.PrepareConfigurationStore(c.opts.memoryStore, c.opts.replicas)
return c.storage.PrepareConfigurationStore(c.opts.memoryStore, c.opts.replicas, c.opts.maxBytes, c.opts.maxBytesSet)
}

func nowPointer() *time.Time {
Expand Down
10 changes: 10 additions & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type ClientOpts struct {
publicKey ed25519.PublicKey
publicKeyFile string
optionalTaskSignatures bool
maxBytes int64
maxBytesSet bool

nc *nats.Conn
}
Expand Down Expand Up @@ -313,3 +315,11 @@ func TaskSignaturesOptional() ClientOpt {
return nil
}
}

func MaxBytes(maxBytes int64) ClientOpt {
return func(opts *ClientOpts) error {
opts.maxBytes = maxBytes
opts.maxBytesSet = true
return nil
}
}
3 changes: 3 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Queue struct {
MaxConcurrent int `json:"max_concurrent"`
// NoCreate will not try to create a queue, will bind to an existing one or fail
NoCreate bool
// MaxBytes is the maximum amount of bytes that can be stored in the queue
MaxBytes int64 `json:"max_bytes"`

mu sync.Mutex
storage Storage
Expand Down Expand Up @@ -62,6 +64,7 @@ func newDefaultQueue() *Queue {
MaxRunTime: time.Minute,
MaxTries: 100,
MaxConcurrent: DefaultQueueMaxConcurrent,
MaxBytes: DefaultMaxBytes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only set this when set by the user, not by default.

MaxAge: 0,
DiscardOld: false,
mu: sync.Mutex{},
Expand Down
31 changes: 25 additions & 6 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ func (s *jetStreamStorage) createQueue(q *Queue, replicas int, memory bool) erro
} else {
opts = append(opts, jsm.DiscardNew())
}
if q.MaxBytes > -1 {
opts = append(opts, jsm.MaxBytes(q.MaxBytes))
}

var err error

Expand Down Expand Up @@ -726,7 +729,7 @@ func (s *jetStreamStorage) SaveScheduledTask(st *ScheduledTask, update bool) err
return nil
}

func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int) error {
func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int, maxBytes int64, maxBytesSet bool) error {
var err error

if replicas == 0 {
Expand All @@ -745,12 +748,18 @@ func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int)

kv, err := js.KeyValue(ConfigBucketName)
if err == nats.ErrBucketNotFound {
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
cfg := &nats.KeyValueConfig{
Bucket: ConfigBucketName,
Description: "Choria Async Jobs Configuration",
Storage: storage,
Replicas: replicas,
})
}

if maxBytes > -1 {
cfg.MaxBytes = maxBytes
}

kv, err = js.CreateKeyValue(cfg)
}
if err != nil {
return err
Expand All @@ -760,13 +769,19 @@ func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int)

kv, err = js.KeyValue(LeaderElectionBucketName)
if err == nats.ErrBucketNotFound {
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
cfg := &nats.KeyValueConfig{
Bucket: LeaderElectionBucketName,
Description: "Choria Async Jobs Leader Elections",
Storage: storage,
Replicas: replicas,
TTL: 10 * time.Second,
})
}

if maxBytesSet {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a mix of approaches here, in one place you check for > -1 (line 758) and here you check if its set. I dont think the isSet here really add value

I think now that we never rely on default down here etc we can probably just act on > -1 as you did above. Eitherway we should be consistent whichever we pick

cfg.MaxBytes = maxBytes
}

kv, err = js.CreateKeyValue(cfg)
}
if err != nil {
return err
Expand All @@ -778,7 +793,7 @@ func (s *jetStreamStorage) PrepareConfigurationStore(memory bool, replicas int)

}

func (s *jetStreamStorage) PrepareTasks(memory bool, replicas int, retention time.Duration) error {
func (s *jetStreamStorage) PrepareTasks(memory bool, replicas int, retention time.Duration, maxBytes int64, maxBytesSet bool) error {
var err error

if replicas == 0 {
Expand All @@ -800,6 +815,10 @@ func (s *jetStreamStorage) PrepareTasks(memory bool, replicas int, retention tim

opts = append(opts, jsm.MaxAge(retention))

if maxBytesSet {
opts = append(opts, jsm.MaxBytes(maxBytes))
}

s.tasks = &taskStorage{mgr: s.mgr}
s.tasks.stream, err = s.mgr.LoadOrNewStream(TasksStreamName, opts...)
if err != nil {
Expand Down
Loading
Loading