diff options
-rwxr-xr-x | README.md | 131 | ||||
-rwxr-xr-x | example/http_server.go | 2 | ||||
-rw-r--r-- | example/policies.go | 55 | ||||
-rw-r--r--[-rwxr-xr-x] | go.mod | 6 | ||||
-rwxr-xr-x | mixins.go | 41 | ||||
-rw-r--r--[-rwxr-xr-x] | options.go | 57 | ||||
-rwxr-xr-x | rutina.go | 93 | ||||
-rwxr-xr-x | rutina_test.go | 10 |
8 files changed, 266 insertions, 129 deletions
@@ -5,7 +5,7 @@ Package Rutina (russian "рутина" - ordinary boring everyday work) is routi It seems like https://godoc.org/golang.org/x/sync/errgroup with some different: 1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`). -2) by default cancels context when any routine ends with any result (not only when error result). Can be configured by option `OptionCancelByError`. +2) has flexible run/stop policy. i.e. one routine restarts when it fails (useful on daemons) but if fails another - all routines will be cancelled 3) already has optional signal handler `ListenOsSignals()` ## When it need? @@ -16,88 +16,109 @@ Usually, when your program consists of several routines (i.e.: http server, metr ### New instance -`r := rutina.New()` +```go +r := rutina.New() +``` -or with options (see below): +or with optional mixins (see below): -`r := rutina.New(...Option)` or `r.WithOptions(...Option)` +```go +r := rutina.New(...Mixins) +``` +or +```go +r.With(...Mixins) +``` ### Start new routine -``` +```go r.Go(func (ctx context.Context) error { ...do something... -}) +}, ...runOptions) ``` -### Wait routines to complete +Available options of run policy: + +* `ShutdownIfFail` - Shutdown all routines if this routine fails +* `RestartIfFail` - Restart this routine if it fail +* `DoNothingIfFail` - Do nothing just stop this routine if it fail +* `ShutdownIfDone` - Shutdown all routines if this routine done without errors +* `RestartIfDone` - Restart if this routine done without errors +* `DoNothingIfDone` - Do nothing if this routine done without errors + +Default policy: + +`ShutdownIfFail` && `DoNothingIfDone` + +#### Example of run policies + +```go + r.Go(func(ctx context.Context) error { + // If this routine produce no error - it just restarts + // If it returns error - all other routines will shutdown (because context cancels) + }, rutina.RestartIfDone, rutina.ShutdownIfFail) + + r.Go(func(ctx context.Context) error { + // If this routine produce no error - it just completes + // If it returns error - all other routines will shutdown (because context cancels) + }, rutina.DoNothingIfDone, rutina.ShutdownIfFail) + r.Go(func(ctx context.Context) error { + // If this routine produce no error - all other routines will shutdown (because context cancels) + // If it returns error - it will be restarted + }, rutina.RestartIfFail) + + r.Go(func(ctx context.Context) error { + // If this routine stopped by any case - all other routines will shutdown (because context cancels) + }, rutina.ShutdownIfDone) + + r.ListenOsSignals() // Shutdown all routines by OS signal ``` + +### Wait routines to complete + +```go err := r.Wait() ``` -Here err = first error in any routine +Here err = error that shutdowns all routines (may be will be changed at future) -## Options +## Mixins -### Usage options +### Usage -`r := rutina.New(option1, option2, ...)` -or +```go +r := rutina.New(mixin1, mixin2, ...) ``` +or +```go r := rutina.New() -r = r.WithOptions(option1, option2, ...) // Returns new instance of Rutina! +r = r.With(mixin1, mixin2, ...) // Returns new instance of Rutina! ``` ### Logger -`rutina.WithLogger(logger log.Logger) Option` or `rutina.WithStdLogger() Option` - -### Custom context +```go +r.With(rutina.WithStdLogger()) +``` +or +```go +r.With(rutina.WithLogger(logger log.Logger)) +``` -`rutina.WithContext(ctx context.Context) Option` +Sets standard or custom logger. By default there is no logger. -### Cancel only by errors +### Custom context -`rutina.WithCancelByError() Option` +```go +r.With(rutina.WithContext(ctx context.Context)) +```` -If this option set, rutina doesnt cancel context if routine completed without error. +Propagates your own context to Rutina. By default it use own context. ## Example -HTTP server with graceful shutdown (`example/http_server.go`): +HTTP server with graceful shutdown [`example/http_server.go`](https://github.com/NeonXP/rutina/blob/master/example/http_server.go) -``` -// New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx) -r := rutina.New(rutina.WithStdLogger()) - -srv := &http.Server{Addr: ":8080"} -http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - io.WriteString(w, "hello world\n") -}) - -// Starting http server and listen connections -r.Go(func(ctx context.Context) error { - if err := srv.ListenAndServe(); err != nil { - return err - } - log.Println("Server stopped") - return nil -}) - -// Gracefully stoping server when context canceled -r.Go(func(ctx context.Context) error { - <-ctx.Done() - log.Println("Stopping server...") - return srv.Shutdown(ctx) -}) - -// OS signals listener -r.ListenOsSignals() - -if err := r.Wait(); err != nil { - log.Fatal(err) -} - -log.Println("All routines successfully stopped") -```
\ No newline at end of file +Different run policies [`example/policies.go`](https://github.com/NeonXP/rutina/blob/master/example/policies.go) diff --git a/example/http_server.go b/example/http_server.go index 86255e6..fe1d456 100755 --- a/example/http_server.go +++ b/example/http_server.go @@ -12,7 +12,7 @@ import ( ) func main() { - // New instance with builtin context. Alternative: r, ctx := rutina.OptionContext(ctx) + // New instance with builtin context r := rutina.New(rutina.WithStdLogger()) srv := &http.Server{Addr: ":8080"} diff --git a/example/policies.go b/example/policies.go new file mode 100644 index 0000000..5f09b8a --- /dev/null +++ b/example/policies.go @@ -0,0 +1,55 @@ +// +build ignore + +package main + +import ( + "context" + "errors" + "github.com/neonxp/rutina" + "log" + "time" +) + +func main() { + // New instance with builtin context + r := rutina.New() + + r.Go(func(ctx context.Context) error { + <-time.After(1 * time.Second) + log.Println("Do something 1 second without errors and restart") + return nil + }, rutina.RestartIfDone, rutina.ShutdownIfFail) + + r.Go(func(ctx context.Context) error { + <-time.After(2 * time.Second) + log.Println("Do something 2 seconds without errors and do nothing") + return nil + }, rutina.ShutdownIfFail) + + r.Go(func(ctx context.Context) error { + <-time.After(3 * time.Second) + log.Println("Do something 3 seconds with error and restart") + return errors.New("Error!") + }, rutina.RestartIfFail) + + r.Go(func(ctx context.Context) error { + <-time.After(4 * time.Second) + log.Println("Do something 4 seconds with error and do nothing") + return errors.New("Error!") + }, rutina.DoNothingIfFail) + + r.Go(func(ctx context.Context) error { + <-time.After(10 * time.Second) + log.Println("Do something 10 seconds with error and close context") + return errors.New("Successfully shutdown at proper place") + }, rutina.ShutdownIfFail) + + // OS signals subscriber + r.ListenOsSignals() + + if err := r.Wait(); err != nil { + log.Fatal(err) + } else { + log.Println("Routines stopped but not correct") + } +} @@ -1 +1,7 @@ module github.com/NeonXP/rutina + +go 1.12 + +replace github.com/neonxp/rutina => ./ + +require github.com/neonxp/rutina v0.0.0-00010101000000-000000000000 // indirect diff --git a/mixins.go b/mixins.go new file mode 100755 index 0000000..65309e4 --- /dev/null +++ b/mixins.go @@ -0,0 +1,41 @@ +package rutina + +import ( + "context" + "log" + "os" +) + +type Mixin interface { + apply(*Rutina) +} + +type MixinContext struct { + Context context.Context +} + +func WithContext(context context.Context) *MixinContext { + return &MixinContext{Context: context} +} + +func (o MixinContext) apply(r *Rutina) { + ctx, cancel := context.WithCancel(o.Context) + r.ctx = ctx + r.Cancel = cancel +} + +type MixinLogger struct { + Logger *log.Logger +} + +func WithLogger(logger *log.Logger) *MixinLogger { + return &MixinLogger{Logger: logger} +} + +func WithStdLogger() *MixinLogger { + return &MixinLogger{Logger: log.New(os.Stdout, "rutina", log.LstdFlags)} +} + +func (o MixinLogger) apply(r *Rutina) { + r.logger = o.Logger +} diff --git a/options.go b/options.go index 8144dde..d1030ba 100755..100644 --- a/options.go +++ b/options.go @@ -1,51 +1,12 @@ package rutina -import ( - "context" - "log" - "os" +type Options int + +const ( + ShutdownIfFail Options = iota // Shutdown all routines if fail + RestartIfFail // Restart this routine if fail + DoNothingIfFail // Do nothing on fail + ShutdownIfDone // Shutdown all routines if this routine done without errors + RestartIfDone // Restart if this routine done without errors + DoNothingIfDone // Do nothing if this routine done without errors ) - -type Option interface { - apply(*Rutina) -} - -type OptionContext struct { - Context context.Context -} - -func WithContext(context context.Context) *OptionContext { - return &OptionContext{Context: context} -} - -func (o OptionContext) apply(r *Rutina) { - ctx, cancel := context.WithCancel(o.Context) - r.ctx = ctx - r.Cancel = cancel -} - -type OptionLogger struct { - Logger *log.Logger -} - -func WithLogger(logger *log.Logger) *OptionLogger { - return &OptionLogger{Logger: logger} -} - -func WithStdLogger() *OptionLogger { - return &OptionLogger{Logger: log.New(os.Stdout, "rutina", log.LstdFlags)} -} - -func (o OptionLogger) apply(r *Rutina) { - r.logger = o.Logger -} - -type OptionCancelByError struct{} - -func WithCancelByError() *OptionCancelByError { - return &OptionCancelByError{} -} - -func (OptionCancelByError) apply(r *Rutina) { - r.cancelByError = true -} @@ -22,35 +22,54 @@ type Rutina struct { } // New instance with builtin context -func New(opts ...Option) *Rutina { +func New(mixins ...Mixin) *Rutina { ctx, cancel := context.WithCancel(context.Background()) var counter uint64 = 0 r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false} - return r.WithOptions(opts...) + return r.With(mixins...) } -func (r *Rutina) WithOptions(opts ...Option) *Rutina { +func (r *Rutina) With(mixins ...Mixin) *Rutina { nr := *r - for _, o := range opts { - o.apply(&nr) + for _, m := range mixins { + m.apply(&nr) } return &nr } // Go routine -func (r *Rutina) Go(doer func(ctx context.Context) error) { +func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { + onFail := ShutdownIfFail + for _, o := range opts { + switch o { + case ShutdownIfFail: + onFail = ShutdownIfFail + case RestartIfFail: + onFail = RestartIfFail + case DoNothingIfFail: + onFail = DoNothingIfFail + } + } + onDone := DoNothingIfDone + for _, o := range opts { + switch o { + case ShutdownIfDone: + onDone = ShutdownIfDone + case RestartIfDone: + onDone = RestartIfDone + case DoNothingIfDone: + onDone = DoNothingIfDone + } + } + r.wg.Add(1) go func() { + defer r.wg.Done() + // Check that context is not canceled yet + if r.ctx.Err() != nil { + return + } id := atomic.AddUint64(r.counter, 1) - defer func() { - if r.logger != nil { - r.logger.Printf("stopping #%d", id) - } - r.wg.Done() - if !r.cancelByError { - r.Cancel() - } - }() if r.logger != nil { r.logger.Printf("starting #%d", id) } @@ -58,13 +77,47 @@ func (r *Rutina) Go(doer func(ctx context.Context) error) { if r.logger != nil { r.logger.Printf("error at #%d : %v", id, err) } - r.o.Do(func() { - r.err = err - }) - if r.cancelByError { + switch onFail { + case ShutdownIfFail: + if r.logger != nil { + r.logger.Printf("stopping #%d", id) + } + // Save error only if shutdown all routines + r.o.Do(func() { + r.err = err + }) + r.Cancel() + case RestartIfFail: + // TODO maybe store errors on restart? + if r.logger != nil { + r.logger.Printf("restarting #%d", id) + } + r.Go(doer, opts...) + case DoNothingIfFail: + // TODO maybe store errors on nothing to do? + if r.logger != nil { + r.logger.Printf("stopping #%d", id) + } + } + } else { + switch onDone { + case ShutdownIfDone: + if r.logger != nil { + r.logger.Printf("stopping #%d with shutdown", id) + } r.Cancel() + case RestartIfDone: + if r.logger != nil { + r.logger.Printf("restarting #%d", id) + } + r.Go(doer, opts...) + case DoNothingIfDone: + if r.logger != nil { + r.logger.Printf("stopping #%d", id) + } } } + }() } @@ -84,7 +137,7 @@ func (r *Rutina) ListenOsSignals() { case <-ctx.Done(): } return nil - }) + }, ShutdownIfDone) } // Wait all routines and returns first error or nil if all routines completes without errors diff --git a/rutina_test.go b/rutina_test.go index 077f68e..0c6d94a 100755 --- a/rutina_test.go +++ b/rutina_test.go @@ -8,7 +8,7 @@ import ( ) func TestSuccess(t *testing.T) { - r, _ := New( + r := New( WithStdLogger(), WithContext(context.Background()), ) @@ -40,7 +40,7 @@ func TestSuccess(t *testing.T) { } func TestError(t *testing.T) { - r, _ := New(WithCancelByError()) + r := New() f := func(name string, ttl time.Duration) error { <-time.After(ttl) t.Log(name) @@ -65,12 +65,12 @@ func TestError(t *testing.T) { } func TestContext(t *testing.T) { - r, _ := New() + r := New() cc := false r.Go(func(ctx context.Context) error { <-time.After(1 * time.Second) return nil - }) + }, ShutdownIfDone) r.Go(func(ctx context.Context) error { select { case <-ctx.Done(): @@ -79,7 +79,7 @@ func TestContext(t *testing.T) { case <-time.After(3 * time.Second): return errors.New("Timeout") } - }) + }, ShutdownIfDone) if err := r.Wait(); err != nil { t.Error("Unexpected error", err) } |