From 641a95063946bf7f4b8fdd1e938a917bc3f75cb2 Mon Sep 17 00:00:00 2001 From: Alexander Neonxp Kiryukhin Date: Thu, 5 Feb 2026 14:35:00 +0300 Subject: initial --- publisher.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 publisher.go (limited to 'publisher.go') diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..611e4d1 --- /dev/null +++ b/publisher.go @@ -0,0 +1,84 @@ +package fqueue + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "sync" +) + +// Publish записывает задачу в очередь. Использует исключительную блокировку на счетчике. +func (b *Bus) Publish(ctx context.Context, task []byte) (seq uint64, err error) { + seq, err = b.nextSeq(ctx, b.seqFile) + if err != nil { + return 0, errors.Join(err, ErrCantGetExLock) + } + + fp, err := os.OpenFile(filepath.Join(b.queuePath, strconv.Itoa(int(seq))), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) + if err != nil { + return seq, fmt.Errorf("failed create task file: %w", err) + } + defer func() { + err2 := errors.Join(err, fp.Close()) + err = err2 + }() + + if _, err := fp.Write(task); err != nil { + return 0, fmt.Errorf("failed write task file: %w", err) + } + + return seq, nil +} + +func (*Bus) nextSeq(ctx context.Context, filename string) (seq uint64, err error) { + fd, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0o600) + if err != nil { + // Не получилось взять блокировку + return 0, err + } + ch := make(chan struct{}) + closer := sync.OnceFunc(func() { + fd.Close() + }) + defer close(ch) + go func() { + select { + case <-ch: + return + case <-ctx.Done(): + closer() + } + }() + defer closer() + + if err := lock(int(fd.Fd()), true); err != nil { + return 0, err + } + + defer unlock(int(fd.Fd())) + + buf := make([]byte, 8) + n, err := fd.Read(buf) + if err == nil && n == 8 { + // Успешно прочитали число + seq = binary.BigEndian.Uint64(buf) + } + if err != nil && !errors.Is(err, io.EOF) { + return 0, fmt.Errorf("failed read seq file: %w", err) + } + seq++ + if _, err := fd.Seek(0, 0); err != nil { + return seq, err + } + binary.BigEndian.PutUint64(buf, seq) + if _, err := fd.Write(buf); err != nil { + return 0, fmt.Errorf("failed write seq file: %w", err) + } + + return seq, nil +} -- cgit v1.2.3