1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
package fqueue
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
)
// Publish записывает задачу в очередь. Использует исключительную блокировку на счетчике.
func (b *Bus) Publish(ctx context.Context, task []byte) (seq uint64, err error) {
seq, err = b.nextSeq(ctx, b.seqFile)
if err != nil {
return 0, errors.Join(err, ErrCantGetExLock)
}
fp, err := os.OpenFile(filepath.Join(b.queuePath, strconv.Itoa(int(seq))), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600)
if err != nil {
return seq, fmt.Errorf("failed create task file: %w", err)
}
defer func() {
err2 := errors.Join(err, fp.Close())
err = err2
}()
if _, err := fp.Write(task); err != nil {
return 0, fmt.Errorf("failed write task file: %w", err)
}
return seq, nil
}
func (*Bus) nextSeq(ctx context.Context, filename string) (seq uint64, err error) {
fd, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0o600)
if err != nil {
// Не получилось взять блокировку
return 0, err
}
ch := make(chan struct{})
closer := sync.OnceFunc(func() {
fd.Close()
})
defer close(ch)
go func() {
select {
case <-ch:
return
case <-ctx.Done():
closer()
}
}()
defer closer()
if err := lock(int(fd.Fd()), true); err != nil {
return 0, err
}
defer unlock(int(fd.Fd()))
buf := make([]byte, 8)
n, err := fd.Read(buf)
if err == nil && n == 8 {
// Успешно прочитали число
seq = binary.BigEndian.Uint64(buf)
}
if err != nil && !errors.Is(err, io.EOF) {
return 0, fmt.Errorf("failed read seq file: %w", err)
}
seq++
if _, err := fd.Seek(0, 0); err != nil {
return seq, err
}
binary.BigEndian.PutUint64(buf, seq)
if _, err := fd.Write(buf); err != nil {
return 0, fmt.Errorf("failed write seq file: %w", err)
}
return seq, nil
}
|