aboutsummaryrefslogtreecommitdiff
path: root/publisher.go
blob: 611e4d15b6d7001bb28557318e35de651d61f1fd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package fqueue

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"sync"
)

// Publish записывает задачу в очередь. Использует исключительную блокировку на счетчике.
func (b *Bus) Publish(ctx context.Context, task []byte) (seq uint64, err error) {
	seq, err = b.nextSeq(ctx, b.seqFile)
	if err != nil {
		return 0, errors.Join(err, ErrCantGetExLock)
	}

	fp, err := os.OpenFile(filepath.Join(b.queuePath, strconv.Itoa(int(seq))), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600)
	if err != nil {
		return seq, fmt.Errorf("failed create task file: %w", err)
	}
	defer func() {
		err2 := errors.Join(err, fp.Close())
		err = err2
	}()

	if _, err := fp.Write(task); err != nil {
		return 0, fmt.Errorf("failed write task file: %w", err)
	}

	return seq, nil
}

func (*Bus) nextSeq(ctx context.Context, filename string) (seq uint64, err error) {
	fd, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0o600)
	if err != nil {
		// Не получилось взять блокировку
		return 0, err
	}
	ch := make(chan struct{})
	closer := sync.OnceFunc(func() {
		fd.Close()
	})
	defer close(ch)
	go func() {
		select {
		case <-ch:
			return
		case <-ctx.Done():
			closer()
		}
	}()
	defer closer()

	if err := lock(int(fd.Fd()), true); err != nil {
		return 0, err
	}

	defer unlock(int(fd.Fd()))

	buf := make([]byte, 8)
	n, err := fd.Read(buf)
	if err == nil && n == 8 {
		// Успешно прочитали число
		seq = binary.BigEndian.Uint64(buf)
	}
	if err != nil && !errors.Is(err, io.EOF) {
		return 0, fmt.Errorf("failed read seq file: %w", err)
	}
	seq++
	if _, err := fd.Seek(0, 0); err != nil {
		return seq, err
	}
	binary.BigEndian.PutUint64(buf, seq)
	if _, err := fd.Write(buf); err != nil {
		return 0, fmt.Errorf("failed write seq file: %w", err)
	}

	return seq, nil
}