diff options
author | Alexander Kiryukhin <alexander@kiryukhin.su> | 2019-04-04 22:56:53 +0300 |
---|---|---|
committer | Alexander Kiryukhin <alexander@kiryukhin.su> | 2019-04-04 22:56:53 +0300 |
commit | c691d422395cb72283512d8956a255db10b70b44 (patch) | |
tree | c913d176c23495a8aa82ecc98089de10074c0929 /rutina.go | |
parent | 04db7b633980e7c39e582479ea41608907aa45b2 (diff) |
v0.4.3v0.4.3
Added:
- Hooks for lifecycle events
Fixed:
- Close errors channel
- Small fixes
Diffstat (limited to 'rutina.go')
-rwxr-xr-x | rutina.go | 76 |
1 files changed, 50 insertions, 26 deletions
@@ -12,21 +12,23 @@ import ( //Rutina is routine manager type Rutina struct { - ctx context.Context // State of application (started/stopped) - Cancel func() // Cancel func that stops all routines - wg sync.WaitGroup // WaitGroup that wait all routines to complete - o sync.Once // Flag that prevents overwrite first error that shutdowns all routines - err error // First error that shutdowns all routines - logger *log.Logger // Optional logger - counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger - errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail + ctx context.Context // State of application (started/stopped) + Cancel func() // Cancel func that stops all routines + wg sync.WaitGroup // WaitGroup that wait all routines to complete + onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines + onceWait sync.Once // Flag that prevents wait already waited rutina + err error // First error that shutdowns all routines + logger *log.Logger // Optional logger + counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger + errCh chan error // Optional channel for errors when RestartIfFail and DoNothingIfFail + hooks map[Event][]Hook // Lifecycle hooks } // New instance with builtin context func New(mixins ...Mixin) *Rutina { ctx, cancel := context.WithCancel(context.Background()) var counter uint64 - r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil} + r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil, hooks: map[Event][]Hook{}} return r.With(mixins...) } @@ -38,8 +40,16 @@ func (r *Rutina) With(mixins ...Mixin) *Rutina { return r } +func (r *Rutina) RegisterHook(ev Event, hook Hook) { + r.hooks[ev] = append(r.hooks[ev], hook) +} + // Go routine func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { + // Check that context is not canceled yet + if r.ctx.Err() != nil { + return + } onFail := ShutdownIfFail for _, o := range opts { switch o { @@ -66,49 +76,39 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { r.wg.Add(1) go func() { defer r.wg.Done() - // Check that context is not canceled yet - if r.ctx.Err() != nil { - return - } id := atomic.AddUint64(r.counter, 1) - r.log("starting #%d", id) + r.fire(EventRoutineStart, int(id)) if err := doer(r.ctx); err != nil { + r.fire(EventRoutineFail, int(id)) + r.fire(EventRoutineStop, int(id)) // errors history if r.errCh != nil { r.errCh <- err } // region routine failed - r.log("error at #%d : %v", id, err) switch onFail { case ShutdownIfFail: - r.log("stopping #%d", id) // Save error only if shutdown all routines - r.o.Do(func() { + r.onceErr.Do(func() { r.err = err }) r.Cancel() case RestartIfFail: - r.log("restarting #%d", id) r.Go(doer, opts...) - case DoNothingIfFail: - r.log("stopping #%d", id) } // endregion } else { + r.fire(EventRoutineComplete, int(id)) + r.fire(EventRoutineStop, int(id)) // region routine successfully done switch onDone { case ShutdownIfDone: - r.log("stopping #%d with shutdown", id) r.Cancel() case RestartIfDone: - r.log("restarting #%d", id) r.Go(doer, opts...) - case DoNothingIfDone: - r.log("stopping #%d", id) } // endregion } - }() } @@ -138,10 +138,34 @@ func (r *Rutina) ListenOsSignals(signals ...os.Signal) { // Wait all routines and returns first error or nil if all routines completes without errors func (r *Rutina) Wait() error { - r.wg.Wait() + r.onceWait.Do(func() { + r.wg.Wait() + r.fire(EventAppStop, 0) + if r.err == nil { + r.fire(EventAppComplete, 0) + } else { + r.fire(EventAppFail, 0) + } + if r.errCh != nil { + close(r.errCh) + } + }) return r.err } +func (r *Rutina) fire(ev Event, rid int) { + r.log("Event = %s Routine ID = %d", ev.String(), rid) + if hooks, ok := r.hooks[ev]; ok == true { + for _, h := range hooks { + if err := h(ev, r, rid); err != nil { + if r.errCh != nil { + r.errCh <- err + } + } + } + } +} + // Log if can func (r *Rutina) log(format string, args ...interface{}) { if r.logger != nil { |