diff options
| author | 2026-02-05 14:35:00 +0300 | |
|---|---|---|
| committer | 2026-02-05 14:35:00 +0300 | |
| commit | 641a95063946bf7f4b8fdd1e938a917bc3f75cb2 (patch) | |
| tree | d19a6b92845de37e66b57b305fbcaf9f4816ffe9 /README.md | |
| download | fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.gz fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.bz2 fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.xz fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.zip | |
Diffstat (limited to '')
| -rw-r--r-- | README.md | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/README.md b/README.md new file mode 100644 index 0000000..24db586 --- /dev/null +++ b/README.md @@ -0,0 +1,119 @@ +# Обзор проекта fqueue + +## Что делает проект + +`fqueue` – это простая файловая очередь, реализованная на Go. Она позволяет +публиковать задачи в виде бинарных данных, которые сохраняются в виде отдельных +файлов в каталоге `[имя очереди]/queue`. Затем задачи последовательно читаются и +обрабатываются через итератор `NextTask`, который перемещает файлы в каталог +`[имя очереди]/processing` для гарантии эксклюзивного доступа. + +## Быстрый пример + +```go +import ( + "bytes" + "context" + "encoding/gob" + "log" + "os" + "os/signal" + + "go.neonxp.ru/fqueue" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + + // Создание экземпляра шины + bus, err := fqueue.New("tasks") + if err != nil { + panic(err) + } + + // Отправка двух задач (Encode() возвращает просто []byte в данном случае) + { + tid, err := bus.Publish(ctx, Task{ID: 1, Val: "first message"}.Encode()) + if err != nil { + panic(err) + } + log.Println("publish task", tid) + } + { + tid, err := bus.Publish(ctx, Task{ID: 2, Val: "second message"}.Encode()) + if err != nil { + panic(err) + } + log.Println("publish task", tid) + } + + // Цикл чтения задач итератором. + // Возвращается задача []byte и err error если произошла ошибка получения задачи. + for t, err := range bus.NextTask(ctx) { + if err != nil { + panic(err) + } + tt := Task{} + if err := gob.NewDecoder(bytes.NewBuffer(t)).Decode(&tt); err != nil { + panic(err) + } + log.Println("got", tt) + } +} + +// Ниже просто сервисный код, это не интересно +func init() { + gob.Register(Task{}) +} + +type Task struct { + ID int + Val string +} + +func (t Task) Encode() []byte { + buf := bytes.Buffer{} + if err := gob.NewEncoder(&buf).Encode(t); err != nil { + panic(err) + } + + return buf.Bytes() +} + +``` + +## Как работает публикация + +1. **Получение следующего номера** – `nextSeq` открывает (или создаёт) файл + счётчика, берёт эксклюзивную блокировку и атомарно увеличивает значение. +2. **Создание файла задачи** – `os.OpenFile` вызывается с флагами + `O_CREATE|O_EXCL`. Файл создаётся с правами `0o600` (чтение/запись только для + владельца). +3. **Запись данных** – данные задачи записываются в файл, после чего дескриптор + закрывается. + +## Как работает чтение + +1. **Обработка «застрявших» задач** – если в `tasks/processing` находятся файлы + старше `maxProcessTime` (1 секунда), они возвращаются в очередь. +2. **Выбор первой задачи** – читается содержимое каталога `tasks/queue`, берётся + первый элемент. +3. **Атомарное перемещение** – `os.Rename` перемещает файл в `tasks/processing`. + Если перемещение удалось, задача считается эксклюзивной. +4. **Чтение и удаление** – файл открывается, читается полностью, затем + удаляется. + +## Лицензия + +Этот проект лицензирован в соответствии с GNU General Public License версии 3 +(GPLv3). Подробности смотрите в файле [LICENSE](LICENSE). + +``` + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2026 Alexander NeonXP Kiryukhin <i@neonxp.ru> + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. +``` |
