diff options
-rwxr-xr-x | README.md | 19 | ||||
-rw-r--r-- | example/policies.go | 22 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rwxr-xr-x | mixins.go | 12 | ||||
-rwxr-xr-x | rutina.go | 80 |
5 files changed, 85 insertions, 50 deletions
@@ -49,7 +49,9 @@ Available options of run policy: Default policy: -`ShutdownIfFail` && `DoNothingIfDone` +`ShutdownIfFail` && `ShutdownIfDone` + +(just like [errgroup](https://godoc.org/golang.org/x/sync/errgroup)) #### Example of run policies @@ -100,11 +102,11 @@ r = r.With(mixin1, mixin2, ...) // Returns new instance of Rutina! ### Logger ```go -r.With(rutina.WithStdLogger()) +r = r.With(rutina.WithStdLogger()) ``` or ```go -r.With(rutina.WithLogger(logger log.Logger)) +r = r.With(rutina.WithLogger(logger log.Logger)) ``` Sets standard or custom logger. By default there is no logger. @@ -112,11 +114,20 @@ Sets standard or custom logger. By default there is no logger. ### Custom context ```go -r.With(rutina.WithContext(ctx context.Context)) +r = r.With(rutina.WithContext(ctx context.Context)) ```` Propagates your own context to Rutina. By default it use own context. +### Errors channel + +```go +errChan := make(chan error) +r = r.With(rutina.WithErrChan(errChan)) +``` + +This channel will receive all errors from all routines with any `...Fail` run policy. + ## Example HTTP server with graceful shutdown [`example/http_server.go`](https://github.com/NeonXP/rutina/blob/master/example/http_server.go) diff --git a/example/policies.go b/example/policies.go index 5f09b8a..50a4e07 100644 --- a/example/policies.go +++ b/example/policies.go @@ -14,6 +14,10 @@ func main() { // New instance with builtin context r := rutina.New() + errsChan := make(chan error, 1) + + r = r.With(rutina.WithErrChan(errsChan)) + r.Go(func(ctx context.Context) error { <-time.After(1 * time.Second) log.Println("Do something 1 second without errors and restart") @@ -24,18 +28,18 @@ func main() { <-time.After(2 * time.Second) log.Println("Do something 2 seconds without errors and do nothing") return nil - }, rutina.ShutdownIfFail) + }, rutina.DoNothingIfDone, rutina.ShutdownIfFail) r.Go(func(ctx context.Context) error { <-time.After(3 * time.Second) log.Println("Do something 3 seconds with error and restart") - return errors.New("Error!") + return errors.New("Error #1!") }, rutina.RestartIfFail) r.Go(func(ctx context.Context) error { <-time.After(4 * time.Second) log.Println("Do something 4 seconds with error and do nothing") - return errors.New("Error!") + return errors.New("Error #2!") }, rutina.DoNothingIfFail) r.Go(func(ctx context.Context) error { @@ -44,6 +48,18 @@ func main() { return errors.New("Successfully shutdown at proper place") }, rutina.ShutdownIfFail) + r.Go(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + log.Println("Shutdown chan listener") + return nil + case err := <-errsChan: + log.Printf("Error in chan: %v", err) + } + } + }) + // OS signals subscriber r.ListenOsSignals() @@ -1 +1,3 @@ module github.com/neonxp/rutina + +go 1.12 @@ -39,3 +39,15 @@ func WithStdLogger() *MixinLogger { func (o MixinLogger) apply(r *Rutina) { r.logger = o.Logger } + +type MixinErrChan struct { + errCh chan error +} + +func WithErrChan(errCh chan error) *MixinErrChan { + return &MixinErrChan{errCh: errCh} +} + +func (o MixinErrChan) apply(r *Rutina) { + r.errCh = o.errCh +} @@ -7,25 +7,26 @@ import ( "os/signal" "sync" "sync/atomic" + "syscall" ) //Rutina is routine manager type Rutina struct { - ctx context.Context - Cancel func() - wg sync.WaitGroup - o sync.Once - err error - logger *log.Logger - counter *uint64 - cancelByError bool + ctx context.Context // State of application (started/stopped) + Cancel func() // Cancel func that stops all routines + wg sync.WaitGroup // WaitGroup that wait all routines to complete + o sync.Once // Flag that prevents overwrite first error that shutdowns all routines + err error // First error that shutdowns all routines + logger *log.Logger // Optional logger + counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger + errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail } // New instance with builtin context func New(mixins ...Mixin) *Rutina { ctx, cancel := context.WithCancel(context.Background()) var counter uint64 = 0 - r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false} + r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil} return r.With(mixins...) } @@ -50,7 +51,7 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { onFail = DoNothingIfFail } } - onDone := DoNothingIfDone + onDone := ShutdownIfDone for _, o := range opts { switch o { case ShutdownIfDone: @@ -70,52 +71,42 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { return } id := atomic.AddUint64(r.counter, 1) - if r.logger != nil { - r.logger.Printf("starting #%d", id) - } + r.log("starting #%d", id) if err := doer(r.ctx); err != nil { - if r.logger != nil { - r.logger.Printf("error at #%d : %v", id, err) + // errors history + if r.errCh != nil { + r.errCh <- err } + // region routine failed + r.log("error at #%d : %v", id, err) switch onFail { case ShutdownIfFail: - if r.logger != nil { - r.logger.Printf("stopping #%d", id) - } + r.log("stopping #%d", id) // Save error only if shutdown all routines r.o.Do(func() { r.err = err }) r.Cancel() case RestartIfFail: - // TODO maybe store errors on restart? - if r.logger != nil { - r.logger.Printf("restarting #%d", id) - } + r.log("restarting #%d", id) r.Go(doer, opts...) case DoNothingIfFail: - // TODO maybe store errors on nothing to do? - if r.logger != nil { - r.logger.Printf("stopping #%d", id) - } + r.log("stopping #%d", id) } + // endregion } else { + // region routine successfully done switch onDone { case ShutdownIfDone: - if r.logger != nil { - r.logger.Printf("stopping #%d with shutdown", id) - } + r.log("stopping #%d with shutdown", id) r.Cancel() case RestartIfDone: - if r.logger != nil { - r.logger.Printf("restarting #%d", id) - } + r.log("restarting #%d", id) r.Go(doer, opts...) case DoNothingIfDone: - if r.logger != nil { - r.logger.Printf("stopping #%d", id) - } + r.log("stopping #%d", id) } + // endregion } }() @@ -124,23 +115,19 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { // OS signals handler func (r *Rutina) ListenOsSignals(signals ...os.Signal) { if len(signals) == 0 { - signals = []os.Signal{os.Kill, os.Interrupt} + signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM} } r.Go(func(ctx context.Context) error { sig := make(chan os.Signal, 1) signal.Notify(sig, signals...) + r.log("starting OS signals listener") select { case s := <-sig: - if r.logger != nil { - r.logger.Printf("stopping by OS signal (%v)", s) - } - if r.cancelByError { - r.Cancel() - } + r.log("stopping by OS signal (%v)", s) case <-ctx.Done(): } return nil - }, ShutdownIfDone) + }, ShutdownIfDone, ShutdownIfFail) } // Wait all routines and returns first error or nil if all routines completes without errors @@ -148,3 +135,10 @@ func (r *Rutina) Wait() error { r.wg.Wait() return r.err } + +// Log if can +func (r *Rutina) log(format string, args ...interface{}) { + if r.logger != nil { + r.logger.Printf(format, args...) + } +} |