aboutsummaryrefslogtreecommitdiff
path: root/rutina.go
diff options
context:
space:
mode:
authorAlexander Kiryukhin <alexander@kiryukhin.su>2019-04-03 23:55:11 +0300
committerAlexander Kiryukhin <alexander@kiryukhin.su>2019-04-03 23:55:11 +0300
commit05212e50c9761181b1a2c0c9e8b43ae31fb24017 (patch)
treec25c9b189046d99a42d0b8bcc29422dc8d74d9cc /rutina.go
parent1772990500c97a79adce15c73919339d6e1618d7 (diff)
Added:
- Mixin with errors channel Changed: - Default run policy now `ShutdownIfDone` && `ShutdownIfFail` Fixed: - Fixed OS signals listener - Removed some dead code
Diffstat (limited to 'rutina.go')
-rwxr-xr-xrutina.go80
1 files changed, 37 insertions, 43 deletions
diff --git a/rutina.go b/rutina.go
index 703e9fc..256bce0 100755
--- a/rutina.go
+++ b/rutina.go
@@ -7,25 +7,26 @@ import (
"os/signal"
"sync"
"sync/atomic"
+ "syscall"
)
//Rutina is routine manager
type Rutina struct {
- ctx context.Context
- Cancel func()
- wg sync.WaitGroup
- o sync.Once
- err error
- logger *log.Logger
- counter *uint64
- cancelByError bool
+ ctx context.Context // State of application (started/stopped)
+ Cancel func() // Cancel func that stops all routines
+ wg sync.WaitGroup // WaitGroup that wait all routines to complete
+ o sync.Once // Flag that prevents overwrite first error that shutdowns all routines
+ err error // First error that shutdowns all routines
+ logger *log.Logger // Optional logger
+ counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
+ errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail
}
// New instance with builtin context
func New(mixins ...Mixin) *Rutina {
ctx, cancel := context.WithCancel(context.Background())
var counter uint64 = 0
- r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false}
+ r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil}
return r.With(mixins...)
}
@@ -50,7 +51,7 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
onFail = DoNothingIfFail
}
}
- onDone := DoNothingIfDone
+ onDone := ShutdownIfDone
for _, o := range opts {
switch o {
case ShutdownIfDone:
@@ -70,52 +71,42 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
return
}
id := atomic.AddUint64(r.counter, 1)
- if r.logger != nil {
- r.logger.Printf("starting #%d", id)
- }
+ r.log("starting #%d", id)
if err := doer(r.ctx); err != nil {
- if r.logger != nil {
- r.logger.Printf("error at #%d : %v", id, err)
+ // errors history
+ if r.errCh != nil {
+ r.errCh <- err
}
+ // region routine failed
+ r.log("error at #%d : %v", id, err)
switch onFail {
case ShutdownIfFail:
- if r.logger != nil {
- r.logger.Printf("stopping #%d", id)
- }
+ r.log("stopping #%d", id)
// Save error only if shutdown all routines
r.o.Do(func() {
r.err = err
})
r.Cancel()
case RestartIfFail:
- // TODO maybe store errors on restart?
- if r.logger != nil {
- r.logger.Printf("restarting #%d", id)
- }
+ r.log("restarting #%d", id)
r.Go(doer, opts...)
case DoNothingIfFail:
- // TODO maybe store errors on nothing to do?
- if r.logger != nil {
- r.logger.Printf("stopping #%d", id)
- }
+ r.log("stopping #%d", id)
}
+ // endregion
} else {
+ // region routine successfully done
switch onDone {
case ShutdownIfDone:
- if r.logger != nil {
- r.logger.Printf("stopping #%d with shutdown", id)
- }
+ r.log("stopping #%d with shutdown", id)
r.Cancel()
case RestartIfDone:
- if r.logger != nil {
- r.logger.Printf("restarting #%d", id)
- }
+ r.log("restarting #%d", id)
r.Go(doer, opts...)
case DoNothingIfDone:
- if r.logger != nil {
- r.logger.Printf("stopping #%d", id)
- }
+ r.log("stopping #%d", id)
}
+ // endregion
}
}()
@@ -124,23 +115,19 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
// OS signals handler
func (r *Rutina) ListenOsSignals(signals ...os.Signal) {
if len(signals) == 0 {
- signals = []os.Signal{os.Kill, os.Interrupt}
+ signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
}
r.Go(func(ctx context.Context) error {
sig := make(chan os.Signal, 1)
signal.Notify(sig, signals...)
+ r.log("starting OS signals listener")
select {
case s := <-sig:
- if r.logger != nil {
- r.logger.Printf("stopping by OS signal (%v)", s)
- }
- if r.cancelByError {
- r.Cancel()
- }
+ r.log("stopping by OS signal (%v)", s)
case <-ctx.Done():
}
return nil
- }, ShutdownIfDone)
+ }, ShutdownIfDone, ShutdownIfFail)
}
// Wait all routines and returns first error or nil if all routines completes without errors
@@ -148,3 +135,10 @@ func (r *Rutina) Wait() error {
r.wg.Wait()
return r.err
}
+
+// Log if can
+func (r *Rutina) log(format string, args ...interface{}) {
+ if r.logger != nil {
+ r.logger.Printf(format, args...)
+ }
+}