1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# Обзор проекта fqueue
## Что делает проект
`fqueue` – это простая файловая очередь, реализованная на Go. Она позволяет
публиковать задачи в виде бинарных данных, которые сохраняются в виде отдельных
файлов в каталоге `[имя очереди]/queue`. Затем задачи последовательно читаются и
обрабатываются через итератор `NextTask`, который перемещает файлы в каталог
`[имя очереди]/processing` для гарантии эксклюзивного доступа.
## Быстрый пример
```go
import (
"bytes"
"context"
"encoding/gob"
"log"
"os"
"os/signal"
"go.neonxp.ru/fqueue"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
// Создание экземпляра шины
bus, err := fqueue.New("tasks")
if err != nil {
panic(err)
}
// Отправка двух задач (Encode() возвращает просто []byte в данном случае)
{
tid, err := bus.Publish(ctx, Task{ID: 1, Val: "first message"}.Encode())
if err != nil {
panic(err)
}
log.Println("publish task", tid)
}
{
tid, err := bus.Publish(ctx, Task{ID: 2, Val: "second message"}.Encode())
if err != nil {
panic(err)
}
log.Println("publish task", tid)
}
// Цикл чтения задач итератором.
// Возвращается задача []byte и err error если произошла ошибка получения задачи.
for t, err := range bus.NextTask(ctx) {
if err != nil {
panic(err)
}
tt := Task{}
if err := gob.NewDecoder(bytes.NewBuffer(t)).Decode(&tt); err != nil {
panic(err)
}
log.Println("got", tt)
}
}
// Ниже просто сервисный код, это не интересно
func init() {
gob.Register(Task{})
}
type Task struct {
ID int
Val string
}
func (t Task) Encode() []byte {
buf := bytes.Buffer{}
if err := gob.NewEncoder(&buf).Encode(t); err != nil {
panic(err)
}
return buf.Bytes()
}
```
## Как работает публикация
1. **Получение следующего номера** – `nextSeq` открывает (или создаёт) файл
счётчика, берёт эксклюзивную блокировку и атомарно увеличивает значение.
2. **Создание файла задачи** – `os.OpenFile` вызывается с флагами
`O_CREATE|O_EXCL`. Файл создаётся с правами `0o600` (чтение/запись только для
владельца).
3. **Запись данных** – данные задачи записываются в файл, после чего дескриптор
закрывается.
## Как работает чтение
1. **Обработка «застрявших» задач** – если в `tasks/processing` находятся файлы
старше `maxProcessTime` (1 секунда), они возвращаются в очередь.
2. **Выбор первой задачи** – читается содержимое каталога `tasks/queue`, берётся
первый элемент.
3. **Атомарное перемещение** – `os.Rename` перемещает файл в `tasks/processing`.
Если перемещение удалось, задача считается эксклюзивной.
4. **Чтение и удаление** – файл открывается, читается полностью, затем
удаляется.
## Лицензия
Этот проект лицензирован в соответствии с GNU General Public License версии 3
(GPLv3). Подробности смотрите в файле [LICENSE](LICENSE).
```
GNU GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2026 Alexander NeonXP Kiryukhin <i@neonxp.ru>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
```
|