aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Kiryukhin <alexander@kiryukhin.su>2019-04-03 23:55:11 +0300
committerAlexander Kiryukhin <alexander@kiryukhin.su>2019-04-03 23:55:11 +0300
commit05212e50c9761181b1a2c0c9e8b43ae31fb24017 (patch)
treec25c9b189046d99a42d0b8bcc29422dc8d74d9cc
parent1772990500c97a79adce15c73919339d6e1618d7 (diff)
Added:
- Mixin with errors channel Changed: - Default run policy now `ShutdownIfDone` && `ShutdownIfFail` Fixed: - Fixed OS signals listener - Removed some dead code
-rwxr-xr-xREADME.md19
-rw-r--r--example/policies.go22
-rw-r--r--go.mod2
-rwxr-xr-xmixins.go12
-rwxr-xr-xrutina.go80
5 files changed, 85 insertions, 50 deletions
diff --git a/README.md b/README.md
index 060d797..6d922eb 100755
--- a/README.md
+++ b/README.md
@@ -49,7 +49,9 @@ Available options of run policy:
Default policy:
-`ShutdownIfFail` && `DoNothingIfDone`
+`ShutdownIfFail` && `ShutdownIfDone`
+
+(just like [errgroup](https://godoc.org/golang.org/x/sync/errgroup))
#### Example of run policies
@@ -100,11 +102,11 @@ r = r.With(mixin1, mixin2, ...) // Returns new instance of Rutina!
### Logger
```go
-r.With(rutina.WithStdLogger())
+r = r.With(rutina.WithStdLogger())
```
or
```go
-r.With(rutina.WithLogger(logger log.Logger))
+r = r.With(rutina.WithLogger(logger log.Logger))
```
Sets standard or custom logger. By default there is no logger.
@@ -112,11 +114,20 @@ Sets standard or custom logger. By default there is no logger.
### Custom context
```go
-r.With(rutina.WithContext(ctx context.Context))
+r = r.With(rutina.WithContext(ctx context.Context))
````
Propagates your own context to Rutina. By default it use own context.
+### Errors channel
+
+```go
+errChan := make(chan error)
+r = r.With(rutina.WithErrChan(errChan))
+```
+
+This channel will receive all errors from all routines with any `...Fail` run policy.
+
## Example
HTTP server with graceful shutdown [`example/http_server.go`](https://github.com/NeonXP/rutina/blob/master/example/http_server.go)
diff --git a/example/policies.go b/example/policies.go
index 5f09b8a..50a4e07 100644
--- a/example/policies.go
+++ b/example/policies.go
@@ -14,6 +14,10 @@ func main() {
// New instance with builtin context
r := rutina.New()
+ errsChan := make(chan error, 1)
+
+ r = r.With(rutina.WithErrChan(errsChan))
+
r.Go(func(ctx context.Context) error {
<-time.After(1 * time.Second)
log.Println("Do something 1 second without errors and restart")
@@ -24,18 +28,18 @@ func main() {
<-time.After(2 * time.Second)
log.Println("Do something 2 seconds without errors and do nothing")
return nil
- }, rutina.ShutdownIfFail)
+ }, rutina.DoNothingIfDone, rutina.ShutdownIfFail)
r.Go(func(ctx context.Context) error {
<-time.After(3 * time.Second)
log.Println("Do something 3 seconds with error and restart")
- return errors.New("Error!")
+ return errors.New("Error #1!")
}, rutina.RestartIfFail)
r.Go(func(ctx context.Context) error {
<-time.After(4 * time.Second)
log.Println("Do something 4 seconds with error and do nothing")
- return errors.New("Error!")
+ return errors.New("Error #2!")
}, rutina.DoNothingIfFail)
r.Go(func(ctx context.Context) error {
@@ -44,6 +48,18 @@ func main() {
return errors.New("Successfully shutdown at proper place")
}, rutina.ShutdownIfFail)
+ r.Go(func(ctx context.Context) error {
+ for {
+ select {
+ case <-ctx.Done():
+ log.Println("Shutdown chan listener")
+ return nil
+ case err := <-errsChan:
+ log.Printf("Error in chan: %v", err)
+ }
+ }
+ })
+
// OS signals subscriber
r.ListenOsSignals()
diff --git a/go.mod b/go.mod
index e827eb1..825fff3 100644
--- a/go.mod
+++ b/go.mod
@@ -1 +1,3 @@
module github.com/neonxp/rutina
+
+go 1.12
diff --git a/mixins.go b/mixins.go
index 65309e4..a037561 100755
--- a/mixins.go
+++ b/mixins.go
@@ -39,3 +39,15 @@ func WithStdLogger() *MixinLogger {
func (o MixinLogger) apply(r *Rutina) {
r.logger = o.Logger
}
+
+type MixinErrChan struct {
+ errCh chan error
+}
+
+func WithErrChan(errCh chan error) *MixinErrChan {
+ return &MixinErrChan{errCh: errCh}
+}
+
+func (o MixinErrChan) apply(r *Rutina) {
+ r.errCh = o.errCh
+}
diff --git a/rutina.go b/rutina.go
index 703e9fc..256bce0 100755
--- a/rutina.go
+++ b/rutina.go
@@ -7,25 +7,26 @@ import (
"os/signal"
"sync"
"sync/atomic"
+ "syscall"
)
//Rutina is routine manager
type Rutina struct {
- ctx context.Context
- Cancel func()
- wg sync.WaitGroup
- o sync.Once
- err error
- logger *log.Logger
- counter *uint64
- cancelByError bool
+ ctx context.Context // State of application (started/stopped)
+ Cancel func() // Cancel func that stops all routines
+ wg sync.WaitGroup // WaitGroup that wait all routines to complete
+ o sync.Once // Flag that prevents overwrite first error that shutdowns all routines
+ err error // First error that shutdowns all routines
+ logger *log.Logger // Optional logger
+ counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
+ errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail
}
// New instance with builtin context
func New(mixins ...Mixin) *Rutina {
ctx, cancel := context.WithCancel(context.Background())
var counter uint64 = 0
- r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false}
+ r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil}
return r.With(mixins...)
}
@@ -50,7 +51,7 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
onFail = DoNothingIfFail
}
}
- onDone := DoNothingIfDone
+ onDone := ShutdownIfDone
for _, o := range opts {
switch o {
case ShutdownIfDone:
@@ -70,52 +71,42 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
return
}
id := atomic.AddUint64(r.counter, 1)
- if r.logger != nil {
- r.logger.Printf("starting #%d", id)
- }
+ r.log("starting #%d", id)
if err := doer(r.ctx); err != nil {
- if r.logger != nil {
- r.logger.Printf("error at #%d : %v", id, err)
+ // errors history
+ if r.errCh != nil {
+ r.errCh <- err
}
+ // region routine failed
+ r.log("error at #%d : %v", id, err)
switch onFail {
case ShutdownIfFail:
- if r.logger != nil {
- r.logger.Printf("stopping #%d", id)
- }
+ r.log("stopping #%d", id)
// Save error only if shutdown all routines
r.o.Do(func() {
r.err = err
})
r.Cancel()
case RestartIfFail:
- // TODO maybe store errors on restart?
- if r.logger != nil {
- r.logger.Printf("restarting #%d", id)
- }
+ r.log("restarting #%d", id)
r.Go(doer, opts...)
case DoNothingIfFail:
- // TODO maybe store errors on nothing to do?
- if r.logger != nil {
- r.logger.Printf("stopping #%d", id)
- }
+ r.log("stopping #%d", id)
}
+ // endregion
} else {
+ // region routine successfully done
switch onDone {
case ShutdownIfDone:
- if r.logger != nil {
- r.logger.Printf("stopping #%d with shutdown", id)
- }
+ r.log("stopping #%d with shutdown", id)
r.Cancel()
case RestartIfDone:
- if r.logger != nil {
- r.logger.Printf("restarting #%d", id)
- }
+ r.log("restarting #%d", id)
r.Go(doer, opts...)
case DoNothingIfDone:
- if r.logger != nil {
- r.logger.Printf("stopping #%d", id)
- }
+ r.log("stopping #%d", id)
}
+ // endregion
}
}()
@@ -124,23 +115,19 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
// OS signals handler
func (r *Rutina) ListenOsSignals(signals ...os.Signal) {
if len(signals) == 0 {
- signals = []os.Signal{os.Kill, os.Interrupt}
+ signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
}
r.Go(func(ctx context.Context) error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, signals...)
+ r.log("starting OS signals listener")
select {
case s := <-sig:
- if r.logger != nil {
- r.logger.Printf("stopping by OS signal (%v)", s)
- }
- if r.cancelByError {
- r.Cancel()
- }
+ r.log("stopping by OS signal (%v)", s)
case <-ctx.Done():
}
return nil
- }, ShutdownIfDone)
+ }, ShutdownIfDone, ShutdownIfFail)
}
// Wait all routines and returns first error or nil if all routines completes without errors
@@ -148,3 +135,10 @@ func (r *Rutina) Wait() error {
r.wg.Wait()
return r.err
}
+
+// Log if can
+func (r *Rutina) log(format string, args ...interface{}) {
+ if r.logger != nil {
+ r.logger.Printf(format, args...)
+ }
+}