package fqueue import ( "context" "fmt" "io" "iter" "os" "path/filepath" "time" ) // NextTask возвращает итератор который безопасно вычитывает задачи из очереди. func (b *Bus) NextTask(ctx context.Context) iter.Seq2[[]byte, error] { return func(yield func([]byte, error) bool) { for { select { case <-ctx.Done(): return default: } // Сначала смотрим есть ли в процессинге задачи. Если их просто нет, // то хорошо, можно продолжать. А если уже есть, то надо посмотреть, // может задача-то старая (например, другой обработчик был невовремя // прибит), тогда её надо попробовать вернуть в очередь и потом // обработать. if !b.processOldTasks() { continue } // Читаем директорию с очередью. entries, err := os.ReadDir(b.queuePath) if err != nil || len(entries) == 0 { // Задач в очереди нет. continue } first := entries[0] // Пытаемся захватить задачу и обработать. if err := b.tryAcquire(first, yield); err != nil { return } // Задача успешно обработана. Идём за следующей. } } } func (b *Bus) processOldTasks() bool { processing, err := os.ReadDir(b.processingPath) if err != nil { return false } if len(processing) == 0 { // Ничего не обрабатывается - даём зелёный свет дальнейшей работе. return true } tooOld := time.Now().Add(-maxProcessTime) for _, ent := range processing { info, err := ent.Info() if err != nil { // Возможно, её в параллель уже обработали и удалили. // Нас не интересует. continue } if info.ModTime().Before(tooOld) { // Зависшая задача. Вернем в очередь. oldPath := filepath.Join(b.processingPath, ent.Name()) newPath := filepath.Join(b.queuePath, ent.Name()) if err := os.Rename(oldPath, newPath); err != nil { // Не получилось вернуть. // Значит её уже кто-то обработал так или иначе. continue } } // В процессинге уже есть нестарая задача. Запрещаем родительской // функции брать задачу в обработку, чтобы сохранить корректный порядок. return false } // Если мы оказались в этой точке - значит все задачи в процессинге были // старые и успешно возвращены в очередь. И можно разрешить родительской // функции взять задачу в обработку если есть. return true } func (b *Bus) tryAcquire(first os.DirEntry, yield func([]byte, error) bool) error { oldPath := filepath.Join(b.queuePath, first.Name()) newPath := filepath.Join(b.processingPath, first.Name()) // Попытка захватить задачу атомарно. if err := os.Rename(oldPath, newPath); err == nil { // Задача эксклюзивна наша и уже никуда не убежит - нужно обработать. if task, err := b.processTask(newPath); !yield(task, err) || err != nil { return err } } return nil } func (*Bus) processTask(taskPath string) ([]byte, error) { fp, err := os.Open(taskPath) if err != nil { return nil, fmt.Errorf("failed open task file: %w", err) } defer fp.Close() buf, err := io.ReadAll(fp) if err != nil { return nil, fmt.Errorf("failed read task file: %w", err) } if err := os.Remove(taskPath); err != nil { return nil, fmt.Errorf("failed remove task file after read: %w", err) } return buf, nil }