Обзор проекта fqueue
Что делает проект
fqueue – это простая файловая очередь, реализованная на Go. Она позволяет
публиковать задачи в виде бинарных данных, которые сохраняются в виде отдельных
файлов в каталоге [имя очереди]/queue. Затем задачи последовательно читаются и
обрабатываются через итератор NextTask, который перемещает файлы в каталог
[имя очереди]/processing для гарантии эксклюзивного доступа.
Быстрый пример
import (
"bytes"
"context"
"encoding/gob"
"log"
"os"
"os/signal"
"go.neonxp.ru/fqueue"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
// Создание экземпляра шины
bus, err := fqueue.New("tasks")
if err != nil {
panic(err)
}
// Отправка двух задач (Encode() возвращает просто []byte в данном случае)
{
tid, err := bus.Publish(ctx, Task{ID: 1, Val: "first message"}.Encode())
if err != nil {
panic(err)
}
log.Println("publish task", tid)
}
{
tid, err := bus.Publish(ctx, Task{ID: 2, Val: "second message"}.Encode())
if err != nil {
panic(err)
}
log.Println("publish task", tid)
}
// Цикл чтения задач итератором.
// Возвращается задача []byte и err error если произошла ошибка получения задачи.
for t, err := range bus.NextTask(ctx) {
if err != nil {
panic(err)
}
tt := Task{}
if err := gob.NewDecoder(bytes.NewBuffer(t)).Decode(&tt); err != nil {
panic(err)
}
log.Println("got", tt)
}
}
// Ниже просто сервисный код, это не интересно
func init() {
gob.Register(Task{})
}
type Task struct {
ID int
Val string
}
func (t Task) Encode() []byte {
buf := bytes.Buffer{}
if err := gob.NewEncoder(&buf).Encode(t); err != nil {
panic(err)
}
return buf.Bytes()
}
Как работает публикация
- Получение следующего номера –
nextSeqоткрывает (или создаёт) файл счётчика, берёт эксклюзивную блокировку и атомарно увеличивает значение. - Создание файла задачи –
os.OpenFileвызывается с флагамиO_CREATE|O_EXCL. Файл создаётся с правами0o600(чтение/запись только для владельца). - Запись данных – данные задачи записываются в файл, после чего дескриптор закрывается.
Как работает чтение
- Обработка «застрявших» задач – если в
tasks/processingнаходятся файлы старшеmaxProcessTime(1 секунда), они возвращаются в очередь. - Выбор первой задачи – читается содержимое каталога
tasks/queue, берётся первый элемент. - Атомарное перемещение –
os.Renameперемещает файл вtasks/processing. Если перемещение удалось, задача считается эксклюзивной. - Чтение и удаление – файл открывается, читается полностью, затем удаляется.
Лицензия
Этот проект лицензирован в соответствии с GNU General Public License версии 3 (GPLv3). Подробности смотрите в файле LICENSE.
GNU GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2026 Alexander NeonXP Kiryukhin <i@neonxp.ru>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
