diff options
| author | 2026-02-05 14:35:00 +0300 | |
|---|---|---|
| committer | 2026-02-05 14:35:00 +0300 | |
| commit | 641a95063946bf7f4b8fdd1e938a917bc3f75cb2 (patch) | |
| tree | d19a6b92845de37e66b57b305fbcaf9f4816ffe9 /publisher.go | |
| download | fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.gz fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.bz2 fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.tar.xz fqueue-641a95063946bf7f4b8fdd1e938a917bc3f75cb2.zip | |
Diffstat (limited to '')
| -rw-r--r-- | publisher.go | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..611e4d1 --- /dev/null +++ b/publisher.go @@ -0,0 +1,84 @@ +package fqueue + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "sync" +) + +// Publish записывает задачу в очередь. Использует исключительную блокировку на счетчике. +func (b *Bus) Publish(ctx context.Context, task []byte) (seq uint64, err error) { + seq, err = b.nextSeq(ctx, b.seqFile) + if err != nil { + return 0, errors.Join(err, ErrCantGetExLock) + } + + fp, err := os.OpenFile(filepath.Join(b.queuePath, strconv.Itoa(int(seq))), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) + if err != nil { + return seq, fmt.Errorf("failed create task file: %w", err) + } + defer func() { + err2 := errors.Join(err, fp.Close()) + err = err2 + }() + + if _, err := fp.Write(task); err != nil { + return 0, fmt.Errorf("failed write task file: %w", err) + } + + return seq, nil +} + +func (*Bus) nextSeq(ctx context.Context, filename string) (seq uint64, err error) { + fd, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0o600) + if err != nil { + // Не получилось взять блокировку + return 0, err + } + ch := make(chan struct{}) + closer := sync.OnceFunc(func() { + fd.Close() + }) + defer close(ch) + go func() { + select { + case <-ch: + return + case <-ctx.Done(): + closer() + } + }() + defer closer() + + if err := lock(int(fd.Fd()), true); err != nil { + return 0, err + } + + defer unlock(int(fd.Fd())) + + buf := make([]byte, 8) + n, err := fd.Read(buf) + if err == nil && n == 8 { + // Успешно прочитали число + seq = binary.BigEndian.Uint64(buf) + } + if err != nil && !errors.Is(err, io.EOF) { + return 0, fmt.Errorf("failed read seq file: %w", err) + } + seq++ + if _, err := fd.Seek(0, 0); err != nil { + return seq, err + } + binary.BigEndian.PutUint64(buf, seq) + if _, err := fd.Write(buf); err != nil { + return 0, fmt.Errorf("failed write seq file: %w", err) + } + + return seq, nil +} |
