diff options
author | Alexander Kiryukhin <alexander@kiryukhin.su> | 2019-04-03 23:55:11 +0300 |
---|---|---|
committer | Alexander Kiryukhin <alexander@kiryukhin.su> | 2019-04-03 23:55:11 +0300 |
commit | 05212e50c9761181b1a2c0c9e8b43ae31fb24017 (patch) | |
tree | c25c9b189046d99a42d0b8bcc29422dc8d74d9cc /rutina.go | |
parent | 1772990500c97a79adce15c73919339d6e1618d7 (diff) |
Added:
- Mixin with errors channel
Changed:
- Default run policy now `ShutdownIfDone` && `ShutdownIfFail`
Fixed:
- Fixed OS signals listener
- Removed some dead code
Diffstat (limited to 'rutina.go')
-rwxr-xr-x | rutina.go | 80 |
1 files changed, 37 insertions, 43 deletions
@@ -7,25 +7,26 @@ import ( "os/signal" "sync" "sync/atomic" + "syscall" ) //Rutina is routine manager type Rutina struct { - ctx context.Context - Cancel func() - wg sync.WaitGroup - o sync.Once - err error - logger *log.Logger - counter *uint64 - cancelByError bool + ctx context.Context // State of application (started/stopped) + Cancel func() // Cancel func that stops all routines + wg sync.WaitGroup // WaitGroup that wait all routines to complete + o sync.Once // Flag that prevents overwrite first error that shutdowns all routines + err error // First error that shutdowns all routines + logger *log.Logger // Optional logger + counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger + errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail } // New instance with builtin context func New(mixins ...Mixin) *Rutina { ctx, cancel := context.WithCancel(context.Background()) var counter uint64 = 0 - r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false} + r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil} return r.With(mixins...) } @@ -50,7 +51,7 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { onFail = DoNothingIfFail } } - onDone := DoNothingIfDone + onDone := ShutdownIfDone for _, o := range opts { switch o { case ShutdownIfDone: @@ -70,52 +71,42 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { return } id := atomic.AddUint64(r.counter, 1) - if r.logger != nil { - r.logger.Printf("starting #%d", id) - } + r.log("starting #%d", id) if err := doer(r.ctx); err != nil { - if r.logger != nil { - r.logger.Printf("error at #%d : %v", id, err) + // errors history + if r.errCh != nil { + r.errCh <- err } + // region routine failed + r.log("error at #%d : %v", id, err) switch onFail { case ShutdownIfFail: - if r.logger != nil { - r.logger.Printf("stopping #%d", id) - } + r.log("stopping #%d", id) // Save error only if shutdown all routines r.o.Do(func() { r.err = err }) r.Cancel() case RestartIfFail: - // TODO maybe store errors on restart? - if r.logger != nil { - r.logger.Printf("restarting #%d", id) - } + r.log("restarting #%d", id) r.Go(doer, opts...) case DoNothingIfFail: - // TODO maybe store errors on nothing to do? - if r.logger != nil { - r.logger.Printf("stopping #%d", id) - } + r.log("stopping #%d", id) } + // endregion } else { + // region routine successfully done switch onDone { case ShutdownIfDone: - if r.logger != nil { - r.logger.Printf("stopping #%d with shutdown", id) - } + r.log("stopping #%d with shutdown", id) r.Cancel() case RestartIfDone: - if r.logger != nil { - r.logger.Printf("restarting #%d", id) - } + r.log("restarting #%d", id) r.Go(doer, opts...) case DoNothingIfDone: - if r.logger != nil { - r.logger.Printf("stopping #%d", id) - } + r.log("stopping #%d", id) } + // endregion } }() @@ -124,23 +115,19 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { // OS signals handler func (r *Rutina) ListenOsSignals(signals ...os.Signal) { if len(signals) == 0 { - signals = []os.Signal{os.Kill, os.Interrupt} + signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM} } r.Go(func(ctx context.Context) error { sig := make(chan os.Signal, 1) signal.Notify(sig, signals...) + r.log("starting OS signals listener") select { case s := <-sig: - if r.logger != nil { - r.logger.Printf("stopping by OS signal (%v)", s) - } - if r.cancelByError { - r.Cancel() - } + r.log("stopping by OS signal (%v)", s) case <-ctx.Done(): } return nil - }, ShutdownIfDone) + }, ShutdownIfDone, ShutdownIfFail) } // Wait all routines and returns first error or nil if all routines completes without errors @@ -148,3 +135,10 @@ func (r *Rutina) Wait() error { r.wg.Wait() return r.err } + +// Log if can +func (r *Rutina) log(format string, args ...interface{}) { + if r.logger != nil { + r.logger.Printf(format, args...) + } +} |