aboutsummaryrefslogtreecommitdiff
path: root/rutina.go
diff options
context:
space:
mode:
authorAlexander Kiryukhin <a.kiryukhin@mail.ru>2020-01-12 16:40:01 +0300
committerAlexander Kiryukhin <a.kiryukhin@mail.ru>2020-01-12 16:40:01 +0300
commit837cfe3223db11bf7f5edb6ba738d3e328b80ea2 (patch)
tree8207e53a6411ea5cd9f1da96f0de9d957c7301f2 /rutina.go
parent01eeeaf5e136928abe75f95d58f3f9cce11c6fe6 (diff)
Refactored
New clean API Timeouts and restart limits
Diffstat (limited to 'rutina.go')
-rwxr-xr-xrutina.go252
1 files changed, 160 insertions, 92 deletions
diff --git a/rutina.go b/rutina.go
index 34d7723..cd5748e 100755
--- a/rutina.go
+++ b/rutina.go
@@ -2,114 +2,121 @@ package rutina
import (
"context"
- "log"
+ "errors"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
+ "time"
)
+var (
+ ErrRunLimit = errors.New("rutina run limit")
+ ErrTimeoutOrKilled = errors.New("rutina timeouted or killed")
+ ErrProcessNotFound = errors.New("process not found")
+ ErrShutdown = errors.New("shutdown")
+)
+
+type logger func(format string, v ...interface{})
+
+var nopLogger = func(format string, v ...interface{}) {}
+
//Rutina is routine manager
type Rutina struct {
- ctx context.Context // State of application (started/stopped)
- Cancel func() // Cancel func that stops all routines
- wg sync.WaitGroup // WaitGroup that wait all routines to complete
- onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines
- onceWait sync.Once // Flag that prevents wait already waited rutina
- err error // First error that shutdowns all routines
- logger *log.Logger // Optional logger
- counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
- errCh chan error // Optional channel for errors when RestartIfError and DoNothingIfError
- lifecycleListener LifecycleListener // Optional listener for events
- autoListenSignals []os.Signal // Optional listening os signals, default disabled
+ ctx context.Context // State of application (started/stopped)
+ Cancel func() // Cancel func that stops all routines
+ wg sync.WaitGroup // WaitGroup that wait all routines to complete
+ onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines
+ onceWait sync.Once // Flag that prevents wait already waited rutina
+ err error // First error that shutdowns all routines
+ logger logger // Optional logger
+ counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
+ errCh chan error // Optional channel for errors when RestartIfError and DoNothingIfError
+ autoListenSignals []os.Signal // Optional listening os signals, default disabled
+ processes map[uint64]*process
+ mu sync.Mutex
}
// New instance with builtin context
-func New(mixins ...Mixin) *Rutina {
- ctx, cancel := context.WithCancel(context.Background())
+func New(opts *Options) *Rutina {
+ if opts == nil {
+ opts = Opt
+ }
+ ctx, cancel := context.WithCancel(opts.ParentContext)
var counter uint64
- r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil}
- return r.With(mixins...)
-}
-
-// With applies mixins
-func (r *Rutina) With(mixins ...Mixin) *Rutina {
- for _, m := range mixins {
- m.apply(r)
+ if opts.Logger == nil {
+ opts.Logger = nopLogger
}
- if r.autoListenSignals != nil {
- r.ListenOsSignals(r.autoListenSignals...)
+ var signals []os.Signal
+ if opts.ListenOsSignals {
+ signals = []os.Signal{os.Kill, os.Interrupt}
+ }
+ return &Rutina{
+ ctx: ctx,
+ Cancel: cancel,
+ wg: sync.WaitGroup{},
+ onceErr: sync.Once{},
+ onceWait: sync.Once{},
+ err: nil,
+ logger: opts.Logger,
+ counter: &counter,
+ errCh: opts.Errors,
+ autoListenSignals: signals,
+ processes: map[uint64]*process{},
+ mu: sync.Mutex{},
}
- return r
}
// Go routine
-func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
+func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint64 {
+ if opts == nil {
+ opts = RunOpt
+ }
// Check that context is not canceled yet
if r.ctx.Err() != nil {
- return
- }
- onFail := ShutdownIfError
- for _, o := range opts {
- switch o {
- case ShutdownIfError:
- onFail = ShutdownIfError
- case RestartIfError:
- onFail = RestartIfError
- case DoNothingIfError:
- onFail = DoNothingIfError
- }
+ return 0
}
- onDone := ShutdownIfDone
- for _, o := range opts {
- switch o {
- case ShutdownIfDone:
- onDone = ShutdownIfDone
- case RestartIfDone:
- onDone = RestartIfDone
- case DoNothingIfDone:
- onDone = DoNothingIfDone
- }
+
+ r.mu.Lock()
+ id := atomic.AddUint64(r.counter, 1)
+ process := process{
+ id: id,
+ doer: doer,
+ onDone: opts.OnDone,
+ onError: opts.OnError,
+ restartLimit: opts.MaxCount,
+ restartCount: 0,
+ timeout: opts.Timeout,
}
+ r.processes[id] = &process
+ r.mu.Unlock()
r.wg.Add(1)
go func() {
defer r.wg.Done()
- id := atomic.AddUint64(r.counter, 1)
- r.lifecycleEvent(EventRoutineStart, int(id))
- if err := doer(r.ctx); err != nil {
- r.lifecycleEvent(EventRoutineError, int(id))
- r.lifecycleEvent(EventRoutineStop, int(id))
- // errors history
- if r.errCh != nil {
- r.errCh <- err
- }
- // region routine failed
- switch onFail {
- case ShutdownIfError:
- // Save error only if shutdown all routines
+ if err := process.run(r.ctx, r.errCh, r.logger); err != nil {
+ if err != ErrShutdown {
r.onceErr.Do(func() {
r.err = err
})
- r.Cancel()
- case RestartIfError:
- r.Go(doer, opts...)
}
- // endregion
- } else {
- r.lifecycleEvent(EventRoutineComplete, int(id))
- r.lifecycleEvent(EventRoutineStop, int(id))
- // region routine successfully done
- switch onDone {
- case ShutdownIfDone:
- r.Cancel()
- case RestartIfDone:
- r.Go(doer, opts...)
- }
- // endregion
+ r.Cancel()
}
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ delete(r.processes, process.id)
+ r.logger("completed #%d", process.id)
}()
+ return id
+}
+
+func (r *Rutina) Processes() []uint64 {
+ var procesess []uint64
+ for id, _ := range r.processes {
+ procesess = append(procesess, id)
+ }
+ return procesess
}
// Errors returns chan for all errors, event if DoNothingIfError or RestartIfError set.
@@ -126,10 +133,10 @@ func (r *Rutina) ListenOsSignals(signals ...os.Signal) {
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, signals...)
- r.log("starting OS signals listener")
+ r.logger("starting OS signals listener")
select {
case s := <-sig:
- r.log("stopping by OS signal (%v)", s)
+ r.logger("stopping by OS signal (%v)", s)
r.Cancel()
case <-r.ctx.Done():
}
@@ -138,14 +145,11 @@ func (r *Rutina) ListenOsSignals(signals ...os.Signal) {
// Wait all routines and returns first error or nil if all routines completes without errors
func (r *Rutina) Wait() error {
+ if len(r.autoListenSignals) > 0 {
+ r.ListenOsSignals(r.autoListenSignals...)
+ }
r.onceWait.Do(func() {
r.wg.Wait()
- r.lifecycleEvent(EventAppStop, 0)
- if r.err == nil {
- r.lifecycleEvent(EventAppComplete, 0)
- } else {
- r.lifecycleEvent(EventAppError, 0)
- }
if r.errCh != nil {
close(r.errCh)
}
@@ -153,16 +157,80 @@ func (r *Rutina) Wait() error {
return r.err
}
-func (r *Rutina) lifecycleEvent(ev Event, rid int) {
- r.log("Event = %s Routine ID = %d", ev.String(), rid)
- if r.lifecycleListener != nil {
- r.lifecycleListener(ev, rid)
+// Kill process by id
+func (r *Rutina) Kill(id uint64) error {
+ p, ok := r.processes[id]
+ if !ok {
+ return ErrProcessNotFound
}
+ if p.cancel != nil {
+ p.cancel()
+ }
+ return nil
+}
+
+type process struct {
+ id uint64
+ doer func(ctx context.Context) error
+ cancel func()
+ onDone Policy
+ onError Policy
+ restartLimit *int
+ restartCount int
+ timeout *time.Duration
}
-// Log if can
-func (r *Rutina) log(format string, args ...interface{}) {
- if r.logger != nil {
- r.logger.Printf(format, args...)
+func (p *process) run(pctx context.Context, errCh chan error, logger logger) error {
+ var ctx context.Context
+ if p.timeout != nil {
+ ctx, p.cancel = context.WithTimeout(pctx, *p.timeout)
+ defer p.cancel()
+ } else {
+ ctx, p.cancel = context.WithCancel(pctx)
+ }
+ for {
+ logger("starting process #%d", p.id)
+ p.restartCount++
+ currentAction := p.onDone
+ err := p.doer(ctx)
+ if err != nil {
+ if p.onError == Shutdown {
+ return err
+ }
+ currentAction = p.onError
+ logger("error on process #%d: %s", p.id, err)
+ if errCh != nil {
+ errCh <- err
+ }
+ }
+ switch currentAction {
+ case DoNothing:
+ return nil
+ case Shutdown:
+ return ErrShutdown
+ case Restart:
+ if ctx.Err() != nil {
+ if p.onError == Shutdown {
+ return ErrTimeoutOrKilled
+ } else {
+ if errCh != nil {
+ errCh <- ErrTimeoutOrKilled
+ }
+ return nil
+ }
+ }
+ if p.restartLimit == nil || p.restartCount > *p.restartLimit {
+ logger("run count limit process #%d", p.id)
+ if p.onError == Shutdown {
+ return ErrRunLimit
+ } else {
+ if errCh != nil {
+ errCh <- ErrRunLimit
+ }
+ return nil
+ }
+ }
+ logger("restarting process #%d", p.id)
+ }
}
}