diff options
-rwxr-xr-x | README.md | 66 | ||||
-rwxr-xr-x[-rw-r--r--] | example/http_server.go | 21 | ||||
-rwxr-xr-x[-rw-r--r--] | go.mod | 0 | ||||
-rwxr-xr-x[-rw-r--r--] | go.sum | 0 | ||||
-rwxr-xr-x | options.go | 51 | ||||
-rwxr-xr-x | rutina.go | 66 | ||||
-rwxr-xr-x | rutina_test.go | 7 |
7 files changed, 165 insertions, 46 deletions
@@ -1,13 +1,67 @@ # rutina -Package Rutina (russian "рутина" - ordinary boring everyday work) works like https://godoc.org/golang.org/x/sync/errgroup with small differences: +Package Rutina (russian "рутина" - ordinary boring everyday work) is routine orchestrator for your application. -1) propagates context to routines -2) cancels context when any routine ends with any result (not only when error result) +It seems like https://godoc.org/golang.org/x/sync/errgroup with some different: + +1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`). +2) by default cancels context when any routine ends with any result (not only when error result). Can be configured by option `OptionCancelByError`. +3) already has optional signal handler `ListenOsSignals()` ## When it need? -Usually, when yout program consists of several routines (i.e.: http server, metrics server and os signals subscriber) and you want to stop all routines when one of them ends (i.e.: by TERM os signal in signal subscriber). +Usually, when your program consists of several routines (i.e.: http server, metrics server and os signals subscriber) and you want to stop all routines when one of them ends (i.e.: by TERM os signal in signal subscriber). + +## Usage + +### New instance + +`r := rutina.New()` + +or with options (see below): + +`r := rutina.New(...Option)` or `r.WithOptions(...Option)` + +### Start new routine + +``` +r.Go(func (ctx context.Context) error { + ...do something... +}) +``` + +### Wait routines to complete + +``` +err := r.Wait() +``` + +Here err = first error in any routine + +## Options + +### Usage options + +`r := rutina.New(option1, option2, ...)` +or +``` +r := rutina.New() +r = r.WithOptions(option1, option2, ...) // Returns new instance of Rutina! +``` + +### Logger + +`rutina.WithLogger(logger log.Logger) Option` or `rutina.WithStdLogger() Option` + +### Custom context + +`rutina.WithContext(ctx context.Context) Option` + +### Cancel only by errors + +`rutina.WithCancelByError() Option` + +If this option set, rutina doesnt cancel context if routine completed without error. ## Example @@ -15,7 +69,7 @@ HTTP server with graceful shutdown (`example/http_server.go`): ``` // New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx) -r, _ := rutina.New() +r, _ := rutina.New(rutina.WithStdLogger()) srv := &http.Server{Addr: ":8080"} http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -39,7 +93,7 @@ r.Go(func(ctx context.Context) error { }) // OS signals listener -r.ListenTermSignals() +r.ListenOsSignals() if err := r.Wait(); err != nil { log.Fatal(err) diff --git a/example/http_server.go b/example/http_server.go index a8072bb..ef25567 100644..100755 --- a/example/http_server.go +++ b/example/http_server.go @@ -4,19 +4,15 @@ package main import ( "context" + "github.com/neonxp/rutina" "io" "log" "net/http" - "os" - "os/signal" - "syscall" - - "github.com/neonxp/rutina" ) func main() { - // New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx) - r, _ := rutina.New() + // New instance with builtin context. Alternative: r, ctx := rutina.OptionContext(ctx) + r, _ := rutina.New(rutina.WithStdLogger()) srv := &http.Server{Addr: ":8080"} http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -40,16 +36,7 @@ func main() { }) // OS signals subscriber - r.Go(func(ctx context.Context) error { - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) - select { - case <-sig: - log.Println("TERM or INT signal received") - case <-ctx.Done(): - } - return nil - }) + r.ListenOsSignals() if err := r.Wait(); err != nil { log.Fatal(err) diff --git a/options.go b/options.go new file mode 100755 index 0000000..8144dde --- /dev/null +++ b/options.go @@ -0,0 +1,51 @@ +package rutina + +import ( + "context" + "log" + "os" +) + +type Option interface { + apply(*Rutina) +} + +type OptionContext struct { + Context context.Context +} + +func WithContext(context context.Context) *OptionContext { + return &OptionContext{Context: context} +} + +func (o OptionContext) apply(r *Rutina) { + ctx, cancel := context.WithCancel(o.Context) + r.ctx = ctx + r.Cancel = cancel +} + +type OptionLogger struct { + Logger *log.Logger +} + +func WithLogger(logger *log.Logger) *OptionLogger { + return &OptionLogger{Logger: logger} +} + +func WithStdLogger() *OptionLogger { + return &OptionLogger{Logger: log.New(os.Stdout, "rutina", log.LstdFlags)} +} + +func (o OptionLogger) apply(r *Rutina) { + r.logger = o.Logger +} + +type OptionCancelByError struct{} + +func WithCancelByError() *OptionCancelByError { + return &OptionCancelByError{} +} + +func (OptionCancelByError) apply(r *Rutina) { + r.cancelByError = true +} @@ -2,58 +2,85 @@ package rutina import ( "context" + "log" "os" "os/signal" "sync" - "syscall" + "sync/atomic" ) //Rutina is routine manager type Rutina struct { - ctx context.Context - cancel func() - wg sync.WaitGroup - o sync.Once - err error + ctx context.Context + Cancel func() + wg sync.WaitGroup + o sync.Once + err error + logger *log.Logger + counter *uint64 + cancelByError bool } // New instance with builtin context -func New() (*Rutina, context.Context) { - return WithContext(context.Background()) +func New(opts ...Option) (*Rutina, context.Context) { + ctx, cancel := context.WithCancel(context.Background()) + var counter uint64 = 0 + r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false} + return r.WithOptions(opts...), ctx } -// WithContext is constructor that takes context from outside -func WithContext(ctx context.Context) (*Rutina, context.Context) { - ctx, cancel := context.WithCancel(ctx) - - return &Rutina{ctx: ctx, cancel: cancel}, ctx +func (r *Rutina) WithOptions(opts ...Option) *Rutina { + nr := *r + for _, o := range opts { + o.apply(&nr) + } + return &nr } // Go routine func (r *Rutina) Go(doer func(ctx context.Context) error) { r.wg.Add(1) go func() { + id := atomic.AddUint64(r.counter, 1) defer func() { + if r.logger != nil { + r.logger.Printf("stopping #%d", id) + } r.wg.Done() - if r.cancel != nil { - r.cancel() + if !r.cancelByError { + r.Cancel() } }() + if r.logger != nil { + r.logger.Printf("starting #%d", id) + } if err := doer(r.ctx); err != nil { + if r.logger != nil { + r.logger.Printf("error at #%d : %v", id, err) + } r.o.Do(func() { r.err = err }) + if r.cancelByError { + r.Cancel() + } } }() } // OS signals handler -func (r *Rutina) ListenTermSignals() { +func (r *Rutina) ListenOsSignals() { r.Go(func(ctx context.Context) error { sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) + signal.Notify(sig, os.Interrupt, os.Kill) select { - case <-sig: + case s := <-sig: + if r.logger != nil { + r.logger.Printf("stopping by OS signal (%v)", s) + } + if r.cancelByError { + r.Cancel() + } case <-ctx.Done(): } return nil @@ -63,8 +90,5 @@ func (r *Rutina) ListenTermSignals() { // Wait all routines and returns first error or nil if all routines completes without errors func (r *Rutina) Wait() error { r.wg.Wait() - if r.cancel != nil { - r.cancel() - } return r.err } diff --git a/rutina_test.go b/rutina_test.go index 361d5eb..077f68e 100755 --- a/rutina_test.go +++ b/rutina_test.go @@ -8,7 +8,10 @@ import ( ) func TestSuccess(t *testing.T) { - r, _ := New() + r, _ := New( + WithStdLogger(), + WithContext(context.Background()), + ) counter := 0 f := func(name string, ttl time.Duration) error { counter++ @@ -37,7 +40,7 @@ func TestSuccess(t *testing.T) { } func TestError(t *testing.T) { - r, _ := New() + r, _ := New(WithCancelByError()) f := func(name string, ttl time.Duration) error { <-time.After(ttl) t.Log(name) |