Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for max_bytes when creating queues #122

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ajc/queue_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func configureQueueCommand(app *fisk.Application) {
add.Flag("memory", "Store the Queue in memory").BoolVar(&c.memory)
add.Flag("replicas", "Number of storage replicas to configure").Default("1").IntVar(&c.replicas)
add.Flag("discard-old", "When full, discard old entries").BoolVar(&c.discardOld)
add.Flag("max-bytes", "Maximum bytes that can be stored in the queue, -1 for unlimited").Default("-1").Int64Var(&c.maxBytes)
add.Flag("max-bytes", "Maximum bytes that can be stored in the queue, defaults to 10Mb").Default(fmt.Sprintf("%d", asyncjobs.DefaultMaxBytes)).Int64Var(&c.maxBytes)

queues.Command("list", "List Queues").Alias("ls").Action(c.lsAction)

Expand All @@ -67,7 +67,7 @@ func configureQueueCommand(app *fisk.Application) {
cfg.Flag("run-time", "Maximum run-time to allow per task").Default("-1s").DurationVar(&c.maxTime)
cfg.Flag("concurrent", "Maximum concurrent jobs that can be ran").Default("-2").IntVar(&c.maxConcurrent)
cfg.Flag("replicas", "Number of storage replicas to configure").Default("-1").IntVar(&c.replicas)
cfg.Flag("max-bytes", "Maximum bytes that can be stored in the queue, -1 for unlimited").Default("-1").Int64Var(&c.maxBytes)
cfg.Flag("max-bytes", "Maximum bytes that can be stored in the queue, defaults to 10mb").Default(fmt.Sprintf("%d", asyncjobs.DefaultMaxBytes)).Int64Var(&c.maxBytes)
}

func (c *queueCommand) addAction(_ *fisk.ParseContext) error {
Expand Down
114 changes: 58 additions & 56 deletions asyncjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,83 +5,85 @@
package asyncjobs

import (
"context"
"regexp"
"time"
"context"
"regexp"
"time"

"github.com/nats-io/jsm.go"
"github.com/nats-io/nats.go"
"github.com/nats-io/jsm.go"
"github.com/nats-io/nats.go"
)

const (
// ShortedScheduledDeadline is the shortest deadline a scheduled task may have
ShortedScheduledDeadline = 30 * time.Second
// DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get
DefaultJobRunTime = time.Hour
// DefaultMaxTries when not configured for a task this is the default tries it will get
DefaultMaxTries = 10
// DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting
DefaultQueueMaxConcurrent = 100
// ShortedScheduledDeadline is the shortest deadline a scheduled task may have
ShortedScheduledDeadline = 30 * time.Second
// DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get
DefaultJobRunTime = time.Hour
// DefaultMaxTries when not configured for a task this is the default tries it will get
DefaultMaxTries = 10
// DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting
DefaultQueueMaxConcurrent = 100
// DefaultMaxBytes when not configured for a queue defaults to 10Mb
DefaultMaxBytes = 10485760
)

// StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream
// but that's ok, we're not really intending to change the storage or support more
type StorageAdmin interface {
Queues() ([]*QueueInfo, error)
QueueNames() ([]string, error)
QueueInfo(name string) (*QueueInfo, error)
PurgeQueue(name string) error
DeleteQueue(name string) error
PrepareQueue(q *Queue, replicas int, memory bool) error
ConfigurationInfo() (*nats.KeyValueBucketStatus, error)
PrepareConfigurationStore(memory bool, replicas int) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
DeleteTaskByID(id string) error
TasksInfo() (*TasksInfo, error)
Tasks(ctx context.Context, limit int32) (chan *Task, error)
TasksStore() (*jsm.Manager, *jsm.Stream, error)
ElectionStorage() (nats.KeyValue, error)
Queues() ([]*QueueInfo, error)
QueueNames() ([]string, error)
QueueInfo(name string) (*QueueInfo, error)
PurgeQueue(name string) error
DeleteQueue(name string) error
PrepareQueue(q *Queue, replicas int, memory bool) error
ConfigurationInfo() (*nats.KeyValueBucketStatus, error)
PrepareConfigurationStore(memory bool, replicas int) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
DeleteTaskByID(id string) error
TasksInfo() (*TasksInfo, error)
Tasks(ctx context.Context, limit int32) (chan *Task, error)
TasksStore() (*jsm.Manager, *jsm.Stream, error)
ElectionStorage() (nats.KeyValue, error)
}

type ScheduledTaskStorage interface {
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
ElectionStorage() (nats.KeyValue, error)
PublishLeaderElectedEvent(ctx context.Context, name string, component string) error
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
ElectionStorage() (nats.KeyValue, error)
PublishLeaderElectedEvent(ctx context.Context, name string, component string) error
}

// Storage implements the backend access
type Storage interface {
SaveTaskState(ctx context.Context, task *Task, notify bool) error
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
RetryTaskByID(ctx context.Context, queue *Queue, id string) error
LoadTaskByID(id string) (*Task, error)
DeleteTaskByID(id string) error
PublishTaskStateChangeEvent(ctx context.Context, task *Task) error
AckItem(ctx context.Context, item *ProcessItem) error
NakBlockedItem(ctx context.Context, item *ProcessItem) error
NakItem(ctx context.Context, item *ProcessItem) error
TerminateItem(ctx context.Context, item *ProcessItem) error
PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error)
PrepareQueue(q *Queue, replicas int, memory bool) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
PrepareConfigurationStore(memory bool, replicas int) error
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
SaveTaskState(ctx context.Context, task *Task, notify bool) error
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
RetryTaskByID(ctx context.Context, queue *Queue, id string) error
LoadTaskByID(id string) (*Task, error)
DeleteTaskByID(id string) error
PublishTaskStateChangeEvent(ctx context.Context, task *Task) error
AckItem(ctx context.Context, item *ProcessItem) error
NakBlockedItem(ctx context.Context, item *ProcessItem) error
NakItem(ctx context.Context, item *ProcessItem) error
TerminateItem(ctx context.Context, item *ProcessItem) error
PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error)
PrepareQueue(q *Queue, replicas int, memory bool) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
PrepareConfigurationStore(memory bool, replicas int) error
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
}

var (
validNameMatcher = regexp.MustCompile(`^[a-zA-Z0-9_:-]+$`)
validNameMatcher = regexp.MustCompile(`^[a-zA-Z0-9_:-]+$`)
)

// IsValidName is a generic strict name validator for what we want people to put in name - task names etc, things that turn into subjects
func IsValidName(name string) bool {
return validNameMatcher.MatchString(name)
return validNameMatcher.MatchString(name)
}
89 changes: 45 additions & 44 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,68 @@
package asyncjobs

import (
"context"
"sync"
"time"
"context"
"sync"
"time"

"github.com/nats-io/jsm.go/api"
"github.com/nats-io/jsm.go/api"
)

// Queue represents a work queue
type Queue struct {
// Name is a unique name for the work queue, should be in the character range a-zA-Z0-9
Name string `json:"name"`
// MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire
MaxAge time.Duration `json:"max_age"`
// MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied.
MaxEntries int `json:"max_entries"`
// DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected
DiscardOld bool `json:"discard_old"`
// MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries
MaxTries int `json:"max_tries"`
// MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime
MaxRunTime time.Duration `json:"max_runtime"`
// MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent
MaxConcurrent int `json:"max_concurrent"`
// NoCreate will not try to create a queue, will bind to an existing one or fail
NoCreate bool
// MaxBytes is the maximum amount of bytes that can be stored in the queue
MaxBytes int64 `json:"max_bytes"`
// Name is a unique name for the work queue, should be in the character range a-zA-Z0-9
Name string `json:"name"`
// MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire
MaxAge time.Duration `json:"max_age"`
// MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied.
MaxEntries int `json:"max_entries"`
// DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected
DiscardOld bool `json:"discard_old"`
// MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries
MaxTries int `json:"max_tries"`
// MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime
MaxRunTime time.Duration `json:"max_runtime"`
// MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent
MaxConcurrent int `json:"max_concurrent"`
// NoCreate will not try to create a queue, will bind to an existing one or fail
NoCreate bool
// MaxBytes is the maximum amount of bytes that can be stored in the queue
MaxBytes int64 `json:"max_bytes"`

mu sync.Mutex
storage Storage
mu sync.Mutex
storage Storage
}

// QueueInfo holds information about a queue state
type QueueInfo struct {
// Name is the name of the queue
Name string `json:"name"`
// Time is the information was gathered
Time time.Time `json:"time"`
// Stream is the active JetStream Stream Information
Stream *api.StreamInfo `json:"stream_info"`
// Consumer is the worker stream information
Consumer *api.ConsumerInfo `json:"consumer_info"`
// Name is the name of the queue
Name string `json:"name"`
// Time is the information was gathered
Time time.Time `json:"time"`
// Stream is the active JetStream Stream Information
Stream *api.StreamInfo `json:"stream_info"`
// Consumer is the worker stream information
Consumer *api.ConsumerInfo `json:"consumer_info"`
}

func (q *Queue) retryTaskByID(ctx context.Context, id string) error {
return q.storage.RetryTaskByID(ctx, q, id)
return q.storage.RetryTaskByID(ctx, q, id)
}

func (q *Queue) enqueueTask(ctx context.Context, task *Task) error {
task.Queue = q.Name
return q.storage.EnqueueTask(ctx, q, task)
task.Queue = q.Name
return q.storage.EnqueueTask(ctx, q, task)
}

func newDefaultQueue() *Queue {
return &Queue{
Name: "DEFAULT",
MaxRunTime: time.Minute,
MaxTries: 100,
MaxConcurrent: DefaultQueueMaxConcurrent,
MaxAge: 0,
DiscardOld: false,
mu: sync.Mutex{},
}
return &Queue{
Name: "DEFAULT",
MaxRunTime: time.Minute,
MaxTries: 100,
MaxConcurrent: DefaultQueueMaxConcurrent,
MaxBytes: DefaultMaxBytes,
MaxAge: 0,
DiscardOld: false,
mu: sync.Mutex{},
}
}
Loading