aboutsummaryrefslogtreecommitdiff
path: root/listener.go
diff options
context:
space:
mode:
author2026-02-05 14:35:00 +0300
committer2026-02-05 14:35:00 +0300
commit641a95063946bf7f4b8fdd1e938a917bc3f75cb2 (patch)
treed19a6b92845de37e66b57b305fbcaf9f4816ffe9 /listener.go
downloadfqueue-master.tar.gz
fqueue-master.tar.bz2
fqueue-master.tar.xz
fqueue-master.zip
initialHEADmaster
Diffstat (limited to 'listener.go')
-rw-r--r--listener.go116
1 files changed, 116 insertions, 0 deletions
diff --git a/listener.go b/listener.go
new file mode 100644
index 0000000..c1206ca
--- /dev/null
+++ b/listener.go
@@ -0,0 +1,116 @@
+package fqueue
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "iter"
+ "os"
+ "path/filepath"
+ "time"
+)
+
+// NextTask возвращает итератор который безопасно вычитывает задачи из очереди.
+func (b *Bus) NextTask(ctx context.Context) iter.Seq2[[]byte, error] {
+ return func(yield func([]byte, error) bool) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ // Сначала смотрим есть ли в процессинге задачи. Если их просто нет,
+ // то хорошо, можно продолжать. А если уже есть, то надо посмотреть,
+ // может задача-то старая (например, другой обработчик был невовремя
+ // прибит), тогда её надо попробовать вернуть в очередь и потом
+ // обработать.
+ if !b.processOldTasks() {
+ continue
+ }
+
+ // Читаем директорию с очередью.
+ entries, err := os.ReadDir(b.queuePath)
+ if err != nil || len(entries) == 0 {
+ // Задач в очереди нет.
+ continue
+ }
+
+ first := entries[0]
+ // Пытаемся захватить задачу и обработать.
+ if err := b.tryAcquire(first, yield); err != nil {
+ return
+ }
+ // Задача успешно обработана. Идём за следующей.
+ }
+ }
+}
+
+func (b *Bus) processOldTasks() bool {
+ processing, err := os.ReadDir(b.processingPath)
+ if err != nil {
+ return false
+ }
+
+ if len(processing) == 0 {
+ // Ничего не обрабатывается - даём зелёный свет дальнейшей работе.
+ return true
+ }
+
+ tooOld := time.Now().Add(-maxProcessTime)
+ for _, ent := range processing {
+ info, err := ent.Info()
+ if err != nil {
+ // Возможно, её в параллель уже обработали и удалили.
+ // Нас не интересует.
+ continue
+ }
+ if info.ModTime().Before(tooOld) {
+ // Зависшая задача. Вернем в очередь.
+ oldPath := filepath.Join(b.processingPath, ent.Name())
+ newPath := filepath.Join(b.queuePath, ent.Name())
+ if err := os.Rename(oldPath, newPath); err != nil {
+ // Не получилось вернуть.
+ // Значит её уже кто-то обработал так или иначе.
+ continue
+ }
+ }
+ // В процессинге уже есть нестарая задача. Запрещаем родительской
+ // функции брать задачу в обработку, чтобы сохранить корректный порядок.
+ return false
+ }
+ // Если мы оказались в этой точке - значит все задачи в процессинге были
+ // старые и успешно возвращены в очередь. И можно разрешить родительской
+ // функции взять задачу в обработку если есть.
+ return true
+}
+
+func (b *Bus) tryAcquire(first os.DirEntry, yield func([]byte, error) bool) error {
+ oldPath := filepath.Join(b.queuePath, first.Name())
+ newPath := filepath.Join(b.processingPath, first.Name())
+ // Попытка захватить задачу атомарно.
+ if err := os.Rename(oldPath, newPath); err == nil {
+ // Задача эксклюзивна наша и уже никуда не убежит - нужно обработать.
+ if task, err := b.processTask(newPath); !yield(task, err) || err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (*Bus) processTask(taskPath string) ([]byte, error) {
+ fp, err := os.Open(taskPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed open task file: %w", err)
+ }
+ defer fp.Close()
+ buf, err := io.ReadAll(fp)
+ if err != nil {
+ return nil, fmt.Errorf("failed read task file: %w", err)
+ }
+ if err := os.Remove(taskPath); err != nil {
+ return nil, fmt.Errorf("failed remove task file after read: %w", err)
+ }
+
+ return buf, nil
+}