diff options
Diffstat (limited to '')
| -rw-r--r-- | listener.go | 116 |
1 files changed, 116 insertions, 0 deletions
diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..c1206ca --- /dev/null +++ b/listener.go @@ -0,0 +1,116 @@ +package fqueue + +import ( + "context" + "fmt" + "io" + "iter" + "os" + "path/filepath" + "time" +) + +// NextTask возвращает итератор который безопасно вычитывает задачи из очереди. +func (b *Bus) NextTask(ctx context.Context) iter.Seq2[[]byte, error] { + return func(yield func([]byte, error) bool) { + for { + select { + case <-ctx.Done(): + return + default: + } + + // Сначала смотрим есть ли в процессинге задачи. Если их просто нет, + // то хорошо, можно продолжать. А если уже есть, то надо посмотреть, + // может задача-то старая (например, другой обработчик был невовремя + // прибит), тогда её надо попробовать вернуть в очередь и потом + // обработать. + if !b.processOldTasks() { + continue + } + + // Читаем директорию с очередью. + entries, err := os.ReadDir(b.queuePath) + if err != nil || len(entries) == 0 { + // Задач в очереди нет. + continue + } + + first := entries[0] + // Пытаемся захватить задачу и обработать. + if err := b.tryAcquire(first, yield); err != nil { + return + } + // Задача успешно обработана. Идём за следующей. + } + } +} + +func (b *Bus) processOldTasks() bool { + processing, err := os.ReadDir(b.processingPath) + if err != nil { + return false + } + + if len(processing) == 0 { + // Ничего не обрабатывается - даём зелёный свет дальнейшей работе. + return true + } + + tooOld := time.Now().Add(-maxProcessTime) + for _, ent := range processing { + info, err := ent.Info() + if err != nil { + // Возможно, её в параллель уже обработали и удалили. + // Нас не интересует. + continue + } + if info.ModTime().Before(tooOld) { + // Зависшая задача. Вернем в очередь. + oldPath := filepath.Join(b.processingPath, ent.Name()) + newPath := filepath.Join(b.queuePath, ent.Name()) + if err := os.Rename(oldPath, newPath); err != nil { + // Не получилось вернуть. + // Значит её уже кто-то обработал так или иначе. + continue + } + } + // В процессинге уже есть нестарая задача. Запрещаем родительской + // функции брать задачу в обработку, чтобы сохранить корректный порядок. + return false + } + // Если мы оказались в этой точке - значит все задачи в процессинге были + // старые и успешно возвращены в очередь. И можно разрешить родительской + // функции взять задачу в обработку если есть. + return true +} + +func (b *Bus) tryAcquire(first os.DirEntry, yield func([]byte, error) bool) error { + oldPath := filepath.Join(b.queuePath, first.Name()) + newPath := filepath.Join(b.processingPath, first.Name()) + // Попытка захватить задачу атомарно. + if err := os.Rename(oldPath, newPath); err == nil { + // Задача эксклюзивна наша и уже никуда не убежит - нужно обработать. + if task, err := b.processTask(newPath); !yield(task, err) || err != nil { + return err + } + } + return nil +} + +func (*Bus) processTask(taskPath string) ([]byte, error) { + fp, err := os.Open(taskPath) + if err != nil { + return nil, fmt.Errorf("failed open task file: %w", err) + } + defer fp.Close() + buf, err := io.ReadAll(fp) + if err != nil { + return nil, fmt.Errorf("failed read task file: %w", err) + } + if err := os.Remove(taskPath); err != nil { + return nil, fmt.Errorf("failed remove task file after read: %w", err) + } + + return buf, nil +} |
