package fqueue import ( "context" "encoding/binary" "errors" "fmt" "io" "os" "path/filepath" "strconv" "sync" ) // Publish записывает задачу в очередь. Использует исключительную блокировку на счетчике. func (b *Bus) Publish(ctx context.Context, task []byte) (seq uint64, err error) { seq, err = b.nextSeq(ctx, b.seqFile) if err != nil { return 0, errors.Join(err, ErrCantGetExLock) } fp, err := os.OpenFile(filepath.Join(b.queuePath, strconv.Itoa(int(seq))), os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) if err != nil { return seq, fmt.Errorf("failed create task file: %w", err) } defer func() { err2 := errors.Join(err, fp.Close()) err = err2 }() if _, err := fp.Write(task); err != nil { return 0, fmt.Errorf("failed write task file: %w", err) } return seq, nil } func (*Bus) nextSeq(ctx context.Context, filename string) (seq uint64, err error) { fd, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0o600) if err != nil { // Не получилось взять блокировку return 0, err } ch := make(chan struct{}) closer := sync.OnceFunc(func() { fd.Close() }) defer close(ch) go func() { select { case <-ch: return case <-ctx.Done(): closer() } }() defer closer() if err := lock(int(fd.Fd()), true); err != nil { return 0, err } defer unlock(int(fd.Fd())) buf := make([]byte, 8) n, err := fd.Read(buf) if err == nil && n == 8 { // Успешно прочитали число seq = binary.BigEndian.Uint64(buf) } if err != nil && !errors.Is(err, io.EOF) { return 0, fmt.Errorf("failed read seq file: %w", err) } seq++ if _, err := fd.Seek(0, 0); err != nil { return seq, err } binary.BigEndian.PutUint64(buf, seq) if _, err := fd.Write(buf); err != nil { return 0, fmt.Errorf("failed write seq file: %w", err) } return seq, nil }