aboutsummaryrefslogtreecommitdiff
path: root/README.md
blob: 24db5861906c7b64bc56e894f9a5124b7f0714e2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Обзор проекта fqueue

## Что делает проект

`fqueue` – это простая файловая очередь, реализованная на Go. Она позволяет
публиковать задачи в виде бинарных данных, которые сохраняются в виде отдельных
файлов в каталоге `[имя очереди]/queue`. Затем задачи последовательно читаются и
обрабатываются через итератор `NextTask`, который перемещает файлы в каталог
`[имя очереди]/processing` для гарантии эксклюзивного доступа.

## Быстрый пример

```go
import (
	"bytes"
	"context"
	"encoding/gob"
	"log"
	"os"
	"os/signal"

	"go.neonxp.ru/fqueue"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer cancel()

	// Создание экземпляра шины
	bus, err := fqueue.New("tasks")
	if err != nil {
		panic(err)
	}

	// Отправка двух задач (Encode() возвращает просто []byte в данном случае)
	{
		tid, err := bus.Publish(ctx, Task{ID: 1, Val: "first message"}.Encode())
		if err != nil {
			panic(err)
		}
		log.Println("publish task", tid)
	}
	{
		tid, err := bus.Publish(ctx, Task{ID: 2, Val: "second message"}.Encode())
		if err != nil {
			panic(err)
		}
		log.Println("publish task", tid)
	}

	// Цикл чтения задач итератором.
	// Возвращается задача []byte и err error если произошла ошибка получения задачи.
	for t, err := range bus.NextTask(ctx) {
		if err != nil {
			panic(err)
		}
		tt := Task{}
		if err := gob.NewDecoder(bytes.NewBuffer(t)).Decode(&tt); err != nil {
			panic(err)
		}
		log.Println("got", tt)
	}
}

// Ниже просто сервисный код, это не интересно
func init() {
	gob.Register(Task{})
}

type Task struct {
	ID  int
	Val string
}

func (t Task) Encode() []byte {
	buf := bytes.Buffer{}
	if err := gob.NewEncoder(&buf).Encode(t); err != nil {
		panic(err)
	}

	return buf.Bytes()
}

```

## Как работает публикация

1. **Получение следующего номера** – `nextSeq` открывает (или создаёт) файл
   счётчика, берёт эксклюзивную блокировку и атомарно увеличивает значение.
2. **Создание файла задачи** – `os.OpenFile` вызывается с флагами
   `O_CREATE|O_EXCL`. Файл создаётся с правами `0o600` (чтение/запись только для
   владельца).
3. **Запись данных** – данные задачи записываются в файл, после чего дескриптор
   закрывается.

## Как работает чтение

1. **Обработка «застрявших» задач** – если в `tasks/processing` находятся файлы
   старше `maxProcessTime` (1 секунда), они возвращаются в очередь.
2. **Выбор первой задачи** – читается содержимое каталога `tasks/queue`, берётся
   первый элемент.
3. **Атомарное перемещение** – `os.Rename` перемещает файл в `tasks/processing`.
   Если перемещение удалось, задача считается эксклюзивной.
4. **Чтение и удаление** – файл открывается, читается полностью, затем
   удаляется.

## Лицензия

Этот проект лицензирован в соответствии с GNU General Public License версии 3
(GPLv3). Подробности смотрите в файле [LICENSE](LICENSE).

```
                    GNU GENERAL PUBLIC LICENSE
                       Version 3, 29 June 2007

 Copyright (C) 2026 Alexander NeonXP Kiryukhin <i@neonxp.ru>
 Everyone is permitted to copy and distribute verbatim copies
 of this license document, but changing it is not allowed.
```