aboutsummaryrefslogtreecommitdiff

Обзор проекта fqueue

Что делает проект

fqueue – это простая файловая очередь, реализованная на Go. Она позволяет публиковать задачи в виде бинарных данных, которые сохраняются в виде отдельных файлов в каталоге [имя очереди]/queue. Затем задачи последовательно читаются и обрабатываются через итератор NextTask, который перемещает файлы в каталог [имя очереди]/processing для гарантии эксклюзивного доступа.

Быстрый пример

import (
    "bytes"
    "context"
    "encoding/gob"
    "log"
    "os"
    "os/signal"

    "go.neonxp.ru/fqueue"
)

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
    defer cancel()

    // Создание экземпляра шины
    bus, err := fqueue.New("tasks")
    if err != nil {
        panic(err)
    }

    // Отправка двух задач (Encode() возвращает просто []byte в данном случае)
    {
        tid, err := bus.Publish(ctx, Task{ID: 1, Val: "first message"}.Encode())
        if err != nil {
            panic(err)
        }
        log.Println("publish task", tid)
    }
    {
        tid, err := bus.Publish(ctx, Task{ID: 2, Val: "second message"}.Encode())
        if err != nil {
            panic(err)
        }
        log.Println("publish task", tid)
    }

    // Цикл чтения задач итератором.
    // Возвращается задача []byte и err error если произошла ошибка получения задачи.
    for t, err := range bus.NextTask(ctx) {
        if err != nil {
            panic(err)
        }
        tt := Task{}
        if err := gob.NewDecoder(bytes.NewBuffer(t)).Decode(&tt); err != nil {
            panic(err)
        }
        log.Println("got", tt)
    }
}

// Ниже просто сервисный код, это не интересно
func init() {
    gob.Register(Task{})
}

type Task struct {
    ID  int
    Val string
}

func (t Task) Encode() []byte {
    buf := bytes.Buffer{}
    if err := gob.NewEncoder(&buf).Encode(t); err != nil {
        panic(err)
    }

    return buf.Bytes()
}

Как работает публикация

  1. Получение следующего номераnextSeq открывает (или создаёт) файл счётчика, берёт эксклюзивную блокировку и атомарно увеличивает значение.
  2. Создание файла задачиos.OpenFile вызывается с флагами O_CREATE|O_EXCL. Файл создаётся с правами 0o600 (чтение/запись только для владельца).
  3. Запись данных – данные задачи записываются в файл, после чего дескриптор закрывается.

Как работает чтение

  1. Обработка «застрявших» задач – если в tasks/processing находятся файлы старше maxProcessTime (1 секунда), они возвращаются в очередь.
  2. Выбор первой задачи – читается содержимое каталога tasks/queue, берётся первый элемент.
  3. Атомарное перемещениеos.Rename перемещает файл в tasks/processing. Если перемещение удалось, задача считается эксклюзивной.
  4. Чтение и удаление – файл открывается, читается полностью, затем удаляется.

Лицензия

Этот проект лицензирован в соответствии с GNU General Public License версии 3 (GPLv3). Подробности смотрите в файле LICENSE.

                    GNU GENERAL PUBLIC LICENSE
                       Version 3, 29 June 2007

 Copyright (C) 2026 Alexander NeonXP Kiryukhin <i@neonxp.ru>
 Everyone is permitted to copy and distribute verbatim copies
 of this license document, but changing it is not allowed.