aboutsummaryrefslogtreecommitdiff
path: root/publisher.go
diff options
context:
space:
mode:
author2026-02-05 14:35:00 +0300
committer2026-02-05 14:35:00 +0300
commit641a95063946bf7f4b8fdd1e938a917bc3f75cb2 (patch)
treed19a6b92845de37e66b57b305fbcaf9f4816ffe9 /publisher.go
downloadfqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.gz
fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.bz2
fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.xz
fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.zip
initialHEADmaster
Diffstat (limited to '')
-rw-r--r--publisher.go84
1 files changed, 84 insertions, 0 deletions
diff --git a/publisher.go b/publisher.go
new file mode 100644
index 0000000..611e4d1
--- /dev/null
+++ b/publisher.go
@@ -0,0 +1,84 @@
+package fqueue
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "strconv"
+ "sync"
+)
+
+// Publish записывает задачу в очередь. Использует исключительную блокировку на счетчике.
+func (b *Bus) Publish(ctx context.Context, task []byte) (seq uint64, err error) {
+ seq, err = b.nextSeq(ctx, b.seqFile)
+ if err != nil {
+ return 0, errors.Join(err, ErrCantGetExLock)
+ }
+
+ fp, err := os.OpenFile(filepath.Join(b.queuePath, strconv.Itoa(int(seq))), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600)
+ if err != nil {
+ return seq, fmt.Errorf("failed create task file: %w", err)
+ }
+ defer func() {
+ err2 := errors.Join(err, fp.Close())
+ err = err2
+ }()
+
+ if _, err := fp.Write(task); err != nil {
+ return 0, fmt.Errorf("failed write task file: %w", err)
+ }
+
+ return seq, nil
+}
+
+func (*Bus) nextSeq(ctx context.Context, filename string) (seq uint64, err error) {
+ fd, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0o600)
+ if err != nil {
+ // Не получилось взять блокировку
+ return 0, err
+ }
+ ch := make(chan struct{})
+ closer := sync.OnceFunc(func() {
+ fd.Close()
+ })
+ defer close(ch)
+ go func() {
+ select {
+ case <-ch:
+ return
+ case <-ctx.Done():
+ closer()
+ }
+ }()
+ defer closer()
+
+ if err := lock(int(fd.Fd()), true); err != nil {
+ return 0, err
+ }
+
+ defer unlock(int(fd.Fd()))
+
+ buf := make([]byte, 8)
+ n, err := fd.Read(buf)
+ if err == nil && n == 8 {
+ // Успешно прочитали число
+ seq = binary.BigEndian.Uint64(buf)
+ }
+ if err != nil && !errors.Is(err, io.EOF) {
+ return 0, fmt.Errorf("failed read seq file: %w", err)
+ }
+ seq++
+ if _, err := fd.Seek(0, 0); err != nil {
+ return seq, err
+ }
+ binary.BigEndian.PutUint64(buf, seq)
+ if _, err := fd.Write(buf); err != nil {
+ return 0, fmt.Errorf("failed write seq file: %w", err)
+ }
+
+ return seq, nil
+}