aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Kiryukhin <alexander@kiryukhin.su>2019-03-27 02:44:38 +0300
committerAlexander Kiryukhin <alexander@kiryukhin.su>2019-03-27 02:44:38 +0300
commitd45d913c9e0d088f339c007f14d2fcaa7b9c8c74 (patch)
tree486d43d7359b5842f6f34b814764389fa27640d2
parent11a32ca219905f7c42698b55b6af9920626b7b51 (diff)
Flexible run policiesv0.3.0
Options refactoring
-rwxr-xr-xREADME.md131
-rwxr-xr-xexample/http_server.go2
-rw-r--r--example/policies.go55
-rw-r--r--[-rwxr-xr-x]go.mod6
-rwxr-xr-xmixins.go41
-rw-r--r--[-rwxr-xr-x]options.go57
-rwxr-xr-xrutina.go93
-rwxr-xr-xrutina_test.go10
8 files changed, 266 insertions, 129 deletions
diff --git a/README.md b/README.md
index 0733929..0ada43f 100755
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@ Package Rutina (russian "рутина" - ordinary boring everyday work) is routi
It seems like https://godoc.org/golang.org/x/sync/errgroup with some different:
1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`).
-2) by default cancels context when any routine ends with any result (not only when error result). Can be configured by option `OptionCancelByError`.
+2) has flexible run/stop policy. i.e. one routine restarts when it fails (useful on daemons) but if fails another - all routines will be cancelled
3) already has optional signal handler `ListenOsSignals()`
## When it need?
@@ -16,88 +16,109 @@ Usually, when your program consists of several routines (i.e.: http server, metr
### New instance
-`r := rutina.New()`
+```go
+r := rutina.New()
+```
-or with options (see below):
+or with optional mixins (see below):
-`r := rutina.New(...Option)` or `r.WithOptions(...Option)`
+```go
+r := rutina.New(...Mixins)
+```
+or
+```go
+r.With(...Mixins)
+```
### Start new routine
-```
+```go
r.Go(func (ctx context.Context) error {
...do something...
-})
+}, ...runOptions)
```
-### Wait routines to complete
+Available options of run policy:
+
+* `ShutdownIfFail` - Shutdown all routines if this routine fails
+* `RestartIfFail` - Restart this routine if it fail
+* `DoNothingIfFail` - Do nothing just stop this routine if it fail
+* `ShutdownIfDone` - Shutdown all routines if this routine done without errors
+* `RestartIfDone` - Restart if this routine done without errors
+* `DoNothingIfDone` - Do nothing if this routine done without errors
+
+Default policy:
+
+`ShutdownIfFail` && `DoNothingIfDone`
+
+#### Example of run policies
+
+```go
+ r.Go(func(ctx context.Context) error {
+ // If this routine produce no error - it just restarts
+ // If it returns error - all other routines will shutdown (because context cancels)
+ }, rutina.RestartIfDone, rutina.ShutdownIfFail)
+
+ r.Go(func(ctx context.Context) error {
+ // If this routine produce no error - it just completes
+ // If it returns error - all other routines will shutdown (because context cancels)
+ }, rutina.DoNothingIfDone, rutina.ShutdownIfFail)
+ r.Go(func(ctx context.Context) error {
+ // If this routine produce no error - all other routines will shutdown (because context cancels)
+ // If it returns error - it will be restarted
+ }, rutina.RestartIfFail)
+
+ r.Go(func(ctx context.Context) error {
+ // If this routine stopped by any case - all other routines will shutdown (because context cancels)
+ }, rutina.ShutdownIfDone)
+
+ r.ListenOsSignals() // Shutdown all routines by OS signal
```
+
+### Wait routines to complete
+
+```go
err := r.Wait()
```
-Here err = first error in any routine
+Here err = error that shutdowns all routines (may be will be changed at future)
-## Options
+## Mixins
-### Usage options
+### Usage
-`r := rutina.New(option1, option2, ...)`
-or
+```go
+r := rutina.New(mixin1, mixin2, ...)
```
+or
+```go
r := rutina.New()
-r = r.WithOptions(option1, option2, ...) // Returns new instance of Rutina!
+r = r.With(mixin1, mixin2, ...) // Returns new instance of Rutina!
```
### Logger
-`rutina.WithLogger(logger log.Logger) Option` or `rutina.WithStdLogger() Option`
-
-### Custom context
+```go
+r.With(rutina.WithStdLogger())
+```
+or
+```go
+r.With(rutina.WithLogger(logger log.Logger))
+```
-`rutina.WithContext(ctx context.Context) Option`
+Sets standard or custom logger. By default there is no logger.
-### Cancel only by errors
+### Custom context
-`rutina.WithCancelByError() Option`
+```go
+r.With(rutina.WithContext(ctx context.Context))
+````
-If this option set, rutina doesnt cancel context if routine completed without error.
+Propagates your own context to Rutina. By default it use own context.
## Example
-HTTP server with graceful shutdown (`example/http_server.go`):
+HTTP server with graceful shutdown [`example/http_server.go`](https://github.com/NeonXP/rutina/blob/master/example/http_server.go)
-```
-// New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx)
-r := rutina.New(rutina.WithStdLogger())
-
-srv := &http.Server{Addr: ":8080"}
-http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
- io.WriteString(w, "hello world\n")
-})
-
-// Starting http server and listen connections
-r.Go(func(ctx context.Context) error {
- if err := srv.ListenAndServe(); err != nil {
- return err
- }
- log.Println("Server stopped")
- return nil
-})
-
-// Gracefully stoping server when context canceled
-r.Go(func(ctx context.Context) error {
- <-ctx.Done()
- log.Println("Stopping server...")
- return srv.Shutdown(ctx)
-})
-
-// OS signals listener
-r.ListenOsSignals()
-
-if err := r.Wait(); err != nil {
- log.Fatal(err)
-}
-
-log.Println("All routines successfully stopped")
-``` \ No newline at end of file
+Different run policies [`example/policies.go`](https://github.com/NeonXP/rutina/blob/master/example/policies.go)
diff --git a/example/http_server.go b/example/http_server.go
index 86255e6..fe1d456 100755
--- a/example/http_server.go
+++ b/example/http_server.go
@@ -12,7 +12,7 @@ import (
)
func main() {
- // New instance with builtin context. Alternative: r, ctx := rutina.OptionContext(ctx)
+ // New instance with builtin context
r := rutina.New(rutina.WithStdLogger())
srv := &http.Server{Addr: ":8080"}
diff --git a/example/policies.go b/example/policies.go
new file mode 100644
index 0000000..5f09b8a
--- /dev/null
+++ b/example/policies.go
@@ -0,0 +1,55 @@
+// +build ignore
+
+package main
+
+import (
+ "context"
+ "errors"
+ "github.com/neonxp/rutina"
+ "log"
+ "time"
+)
+
+func main() {
+ // New instance with builtin context
+ r := rutina.New()
+
+ r.Go(func(ctx context.Context) error {
+ <-time.After(1 * time.Second)
+ log.Println("Do something 1 second without errors and restart")
+ return nil
+ }, rutina.RestartIfDone, rutina.ShutdownIfFail)
+
+ r.Go(func(ctx context.Context) error {
+ <-time.After(2 * time.Second)
+ log.Println("Do something 2 seconds without errors and do nothing")
+ return nil
+ }, rutina.ShutdownIfFail)
+
+ r.Go(func(ctx context.Context) error {
+ <-time.After(3 * time.Second)
+ log.Println("Do something 3 seconds with error and restart")
+ return errors.New("Error!")
+ }, rutina.RestartIfFail)
+
+ r.Go(func(ctx context.Context) error {
+ <-time.After(4 * time.Second)
+ log.Println("Do something 4 seconds with error and do nothing")
+ return errors.New("Error!")
+ }, rutina.DoNothingIfFail)
+
+ r.Go(func(ctx context.Context) error {
+ <-time.After(10 * time.Second)
+ log.Println("Do something 10 seconds with error and close context")
+ return errors.New("Successfully shutdown at proper place")
+ }, rutina.ShutdownIfFail)
+
+ // OS signals subscriber
+ r.ListenOsSignals()
+
+ if err := r.Wait(); err != nil {
+ log.Fatal(err)
+ } else {
+ log.Println("Routines stopped but not correct")
+ }
+}
diff --git a/go.mod b/go.mod
index d807050..2460588 100755..100644
--- a/go.mod
+++ b/go.mod
@@ -1 +1,7 @@
module github.com/NeonXP/rutina
+
+go 1.12
+
+replace github.com/neonxp/rutina => ./
+
+require github.com/neonxp/rutina v0.0.0-00010101000000-000000000000 // indirect
diff --git a/mixins.go b/mixins.go
new file mode 100755
index 0000000..65309e4
--- /dev/null
+++ b/mixins.go
@@ -0,0 +1,41 @@
+package rutina
+
+import (
+ "context"
+ "log"
+ "os"
+)
+
+type Mixin interface {
+ apply(*Rutina)
+}
+
+type MixinContext struct {
+ Context context.Context
+}
+
+func WithContext(context context.Context) *MixinContext {
+ return &MixinContext{Context: context}
+}
+
+func (o MixinContext) apply(r *Rutina) {
+ ctx, cancel := context.WithCancel(o.Context)
+ r.ctx = ctx
+ r.Cancel = cancel
+}
+
+type MixinLogger struct {
+ Logger *log.Logger
+}
+
+func WithLogger(logger *log.Logger) *MixinLogger {
+ return &MixinLogger{Logger: logger}
+}
+
+func WithStdLogger() *MixinLogger {
+ return &MixinLogger{Logger: log.New(os.Stdout, "rutina", log.LstdFlags)}
+}
+
+func (o MixinLogger) apply(r *Rutina) {
+ r.logger = o.Logger
+}
diff --git a/options.go b/options.go
index 8144dde..d1030ba 100755..100644
--- a/options.go
+++ b/options.go
@@ -1,51 +1,12 @@
package rutina
-import (
- "context"
- "log"
- "os"
+type Options int
+
+const (
+ ShutdownIfFail Options = iota // Shutdown all routines if fail
+ RestartIfFail // Restart this routine if fail
+ DoNothingIfFail // Do nothing on fail
+ ShutdownIfDone // Shutdown all routines if this routine done without errors
+ RestartIfDone // Restart if this routine done without errors
+ DoNothingIfDone // Do nothing if this routine done without errors
)
-
-type Option interface {
- apply(*Rutina)
-}
-
-type OptionContext struct {
- Context context.Context
-}
-
-func WithContext(context context.Context) *OptionContext {
- return &OptionContext{Context: context}
-}
-
-func (o OptionContext) apply(r *Rutina) {
- ctx, cancel := context.WithCancel(o.Context)
- r.ctx = ctx
- r.Cancel = cancel
-}
-
-type OptionLogger struct {
- Logger *log.Logger
-}
-
-func WithLogger(logger *log.Logger) *OptionLogger {
- return &OptionLogger{Logger: logger}
-}
-
-func WithStdLogger() *OptionLogger {
- return &OptionLogger{Logger: log.New(os.Stdout, "rutina", log.LstdFlags)}
-}
-
-func (o OptionLogger) apply(r *Rutina) {
- r.logger = o.Logger
-}
-
-type OptionCancelByError struct{}
-
-func WithCancelByError() *OptionCancelByError {
- return &OptionCancelByError{}
-}
-
-func (OptionCancelByError) apply(r *Rutina) {
- r.cancelByError = true
-}
diff --git a/rutina.go b/rutina.go
index 8a8cccc..0bdc7e8 100755
--- a/rutina.go
+++ b/rutina.go
@@ -22,35 +22,54 @@ type Rutina struct {
}
// New instance with builtin context
-func New(opts ...Option) *Rutina {
+func New(mixins ...Mixin) *Rutina {
ctx, cancel := context.WithCancel(context.Background())
var counter uint64 = 0
r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false}
- return r.WithOptions(opts...)
+ return r.With(mixins...)
}
-func (r *Rutina) WithOptions(opts ...Option) *Rutina {
+func (r *Rutina) With(mixins ...Mixin) *Rutina {
nr := *r
- for _, o := range opts {
- o.apply(&nr)
+ for _, m := range mixins {
+ m.apply(&nr)
}
return &nr
}
// Go routine
-func (r *Rutina) Go(doer func(ctx context.Context) error) {
+func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
+ onFail := ShutdownIfFail
+ for _, o := range opts {
+ switch o {
+ case ShutdownIfFail:
+ onFail = ShutdownIfFail
+ case RestartIfFail:
+ onFail = RestartIfFail
+ case DoNothingIfFail:
+ onFail = DoNothingIfFail
+ }
+ }
+ onDone := DoNothingIfDone
+ for _, o := range opts {
+ switch o {
+ case ShutdownIfDone:
+ onDone = ShutdownIfDone
+ case RestartIfDone:
+ onDone = RestartIfDone
+ case DoNothingIfDone:
+ onDone = DoNothingIfDone
+ }
+ }
+
r.wg.Add(1)
go func() {
+ defer r.wg.Done()
+ // Check that context is not canceled yet
+ if r.ctx.Err() != nil {
+ return
+ }
id := atomic.AddUint64(r.counter, 1)
- defer func() {
- if r.logger != nil {
- r.logger.Printf("stopping #%d", id)
- }
- r.wg.Done()
- if !r.cancelByError {
- r.Cancel()
- }
- }()
if r.logger != nil {
r.logger.Printf("starting #%d", id)
}
@@ -58,13 +77,47 @@ func (r *Rutina) Go(doer func(ctx context.Context) error) {
if r.logger != nil {
r.logger.Printf("error at #%d : %v", id, err)
}
- r.o.Do(func() {
- r.err = err
- })
- if r.cancelByError {
+ switch onFail {
+ case ShutdownIfFail:
+ if r.logger != nil {
+ r.logger.Printf("stopping #%d", id)
+ }
+ // Save error only if shutdown all routines
+ r.o.Do(func() {
+ r.err = err
+ })
+ r.Cancel()
+ case RestartIfFail:
+ // TODO maybe store errors on restart?
+ if r.logger != nil {
+ r.logger.Printf("restarting #%d", id)
+ }
+ r.Go(doer, opts...)
+ case DoNothingIfFail:
+ // TODO maybe store errors on nothing to do?
+ if r.logger != nil {
+ r.logger.Printf("stopping #%d", id)
+ }
+ }
+ } else {
+ switch onDone {
+ case ShutdownIfDone:
+ if r.logger != nil {
+ r.logger.Printf("stopping #%d with shutdown", id)
+ }
r.Cancel()
+ case RestartIfDone:
+ if r.logger != nil {
+ r.logger.Printf("restarting #%d", id)
+ }
+ r.Go(doer, opts...)
+ case DoNothingIfDone:
+ if r.logger != nil {
+ r.logger.Printf("stopping #%d", id)
+ }
}
}
+
}()
}
@@ -84,7 +137,7 @@ func (r *Rutina) ListenOsSignals() {
case <-ctx.Done():
}
return nil
- })
+ }, ShutdownIfDone)
}
// Wait all routines and returns first error or nil if all routines completes without errors
diff --git a/rutina_test.go b/rutina_test.go
index 077f68e..0c6d94a 100755
--- a/rutina_test.go
+++ b/rutina_test.go
@@ -8,7 +8,7 @@ import (
)
func TestSuccess(t *testing.T) {
- r, _ := New(
+ r := New(
WithStdLogger(),
WithContext(context.Background()),
)
@@ -40,7 +40,7 @@ func TestSuccess(t *testing.T) {
}
func TestError(t *testing.T) {
- r, _ := New(WithCancelByError())
+ r := New()
f := func(name string, ttl time.Duration) error {
<-time.After(ttl)
t.Log(name)
@@ -65,12 +65,12 @@ func TestError(t *testing.T) {
}
func TestContext(t *testing.T) {
- r, _ := New()
+ r := New()
cc := false
r.Go(func(ctx context.Context) error {
<-time.After(1 * time.Second)
return nil
- })
+ }, ShutdownIfDone)
r.Go(func(ctx context.Context) error {
select {
case <-ctx.Done():
@@ -79,7 +79,7 @@ func TestContext(t *testing.T) {
case <-time.After(3 * time.Second):
return errors.New("Timeout")
}
- })
+ }, ShutdownIfDone)
if err := r.Wait(); err != nil {
t.Error("Unexpected error", err)
}