diff options
-rwxr-xr-x | README.md | 128 | ||||
-rw-r--r-- | event_string.go | 29 | ||||
-rw-r--r-- | events.go | 22 | ||||
-rwxr-xr-x | example/http_server.go | 11 | ||||
-rw-r--r-- | example/policies.go | 51 | ||||
-rwxr-xr-x | mixins.go | 90 | ||||
-rw-r--r-- | options.go | 85 | ||||
-rwxr-xr-x | rutina.go | 252 | ||||
-rwxr-xr-x | rutina_test.go | 25 |
9 files changed, 308 insertions, 385 deletions
@@ -18,18 +18,22 @@ Usually, when your program consists of several routines (i.e.: http server, metr ### New instance +With default options: + ```go -r := rutina.New() +r := rutina.New(nil) ``` -or with optional mixins (see below): +or with custom options: ```go -r := rutina.New(...Mixins) -``` -or -```go -r.With(...Mixins) +r := rutina.New( + rutina.Opt. + SetParentContext(ctx context.Context). // Pass parent context to Rutina (otherwise it uses own new context) + SetListenOsSignals(listenOsSignals bool). // Auto listen OS signals and close context on Kill, Term signal + SetLogger(l logger). // Pass logger for debug, i.e. `log.Printf` + SetErrors(errCh chan error) // Set errors channel for errors from routines in Restart/DoNothing errors policy +) ``` ### Start new routine @@ -37,45 +41,46 @@ r.With(...Mixins) ```go r.Go(func (ctx context.Context) error { ...do something... -}, ...runOptions) +}, *runOptions) ``` -Available options of run policy: - -* `ShutdownIfError` - Shutdown all routines if this routine returns error -* `RestartIfError` - Restart this routine if this routine returns error -* `DoNothingIfError` - Do nothing just stop this routine if this routine returns error -* `ShutdownIfDone` - Shutdown all routines if this routine done without errors -* `RestartIfDone` - Restart if this routine done without errors -* `DoNothingIfDone` - Do nothing if this routine done without errors +#### Run Options -Default policy: +```go +RunOpt. + SetOnDone(policy Policy). // Run policy if returns no error + SetOnError(policy Policy). // Run policy if returns error + SetTimeout(timeout time.Duration). // Timeout to routine (after it context will be closed) + SetMaxCount(maxCount int) // Max tries on Restart policy +``` -`ShutdownIfError` && `ShutdownIfDone` +#### Run policies -(just like [errgroup](https://godoc.org/golang.org/x/sync/errgroup)) +* `DoNothing` - do not affect other routines +* `Restart` - restart current routine +* `Shutdown` - shutdown all routines #### Example of run policies ```go r.Go(func(ctx context.Context) error { - // If this routine produce no error - it just restarts + // If this routine produce no error - it just completes, other routines not affected // If it returns error - all other routines will shutdown (because context cancels) -}, rutina.RestartIfDone, rutina.ShutdownIfError) +}, nil) r.Go(func(ctx context.Context) error { - // If this routine produce no error - it just completes + // If this routine produce no error - it restarts // If it returns error - all other routines will shutdown (because context cancels) -}, rutina.DoNothingIfDone, rutina.ShutdownIfError) +}, rutina.RunOpt.SetOnDone(rutina.Restart)) r.Go(func(ctx context.Context) error { // If this routine produce no error - all other routines will shutdown (because context cancels) // If it returns error - it will be restarted -}, rutina.RestartIfError) +}, rutina.RunOpt.SetOnDone(rutina.Shutdown).SetOnError(rutina.Restart)) r.Go(func(ctx context.Context) error { // If this routine stopped by any case - all other routines will shutdown (because context cancels) -}, rutina.ShutdownIfDone) +}, rutina.RunOpt.SetOnDone(rutina.Shutdown)) r.ListenOsSignals() // Shutdown all routines by OS signal ``` @@ -88,84 +93,29 @@ err := r.Wait() Here err = error that shutdowns all routines (may be will be changed at future) -### Get errors channel - -```go -err := <- r.Errors() -``` - -Disabled by default. Use `r.With(rutina.WithErrChan())` to turn on. - -## Lifecycle events - -Rutina has own simple lifecycle events: - -* `EventRoutineStart` - Fires when starts new routine -* `EventRoutineStop` - Fires when routine stopped with any result -* `EventRoutineComplete` - Fires when routine stopped without errors -* `EventRoutineError` - Fires when routine stopped with error -* `EventAppStop` - Fires when all routines stopped with any result -* `EventAppComplete` - Fires when all routines stopped with no errors -* `EventAppError` - Fires when all routines stopped with error - -## Mixins - -### Usage - -```go -r := rutina.New(mixin1, mixin2, ...) -``` -or -```go -r := rutina.New() -r = r.With(mixin1, mixin2, ...) // Returns new instance of Rutina! -``` - -### Logger - -```go -r = r.With(rutina.WithStdLogger()) -``` -or -```go -r = r.With(rutina.WithLogger(logger log.Logger)) -``` - -Sets standard or custom logger. By default there is no logger. - -### Custom context +### Kill routines ```go -r = r.With(rutina.WithContext(ctx context.Context)) -```` - -Propagates your own context to Rutina. By default it use own context. - -### Enable errors channel - -```go -r = r.With(rutina.WithErrChan()) +id := r.Go(func (ctx context.Context) error { ... }) ... -err := <- r.Errors() +r.Kill(id) // Closes individual context for #id routine that must shutdown it ``` -Turn on errors channel - -### Lifecycle listener +### List of routines ```go -r = r.With(rutina.WithLifecycleListener(func (event rutina.Event, rid int) { ... })) +list := r.Processes() ``` -Simple lifecycle listener +Returns ids of working routines -### Auto listen OS signals +### Get errors channel ```go -r = r.With(rutina.WithListenOsSignals()) +err := <- r.Errors() ``` -Automatically listen OS signals. There is no `r.ListenOsSignals()` needed. +Disabled by default. Used when passed errors channel to rutina options ## Example diff --git a/event_string.go b/event_string.go deleted file mode 100644 index 6501cad..0000000 --- a/event_string.go +++ /dev/null @@ -1,29 +0,0 @@ -// Code generated by "stringer -type=Event"; DO NOT EDIT. - -package rutina - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[EventRoutineStart-0] - _ = x[EventRoutineStop-1] - _ = x[EventRoutineComplete-2] - _ = x[EventRoutineError-3] - _ = x[EventAppStop-4] - _ = x[EventAppComplete-5] - _ = x[EventAppError-6] -} - -const _Event_name = "EventRoutineStartEventRoutineStopEventRoutineCompleteEventRoutineErrorEventAppStopEventAppCompleteEventAppError" - -var _Event_index = [...]uint8{0, 17, 33, 53, 70, 82, 98, 111} - -func (i Event) String() string { - if i < 0 || i >= Event(len(_Event_index)-1) { - return "Event(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _Event_name[_Event_index[i]:_Event_index[i+1]] -} diff --git a/events.go b/events.go deleted file mode 100644 index 6159343..0000000 --- a/events.go +++ /dev/null @@ -1,22 +0,0 @@ -//go:generate stringer -type=Event -package rutina - -// Event represents lifecycle events -type Event int - -const ( - EventRoutineStart Event = iota - EventRoutineStop - EventRoutineComplete - EventRoutineError - EventAppStop - EventAppComplete - EventAppError -) - -// Hook is function that calls when event fired -// Params: -// ev Event - fired event -// r *Rutina - pointer to rutina -// rid int - ID of routine if present, 0 - otherwise -type Hook func(ev Event, r *Rutina, rid int) error diff --git a/example/http_server.go b/example/http_server.go index fe1d456..6bb7157 100755 --- a/example/http_server.go +++ b/example/http_server.go @@ -13,7 +13,7 @@ import ( func main() { // New instance with builtin context - r := rutina.New(rutina.WithStdLogger()) + r := rutina.New(rutina.Opt.SetListenOsSignals(true)) srv := &http.Server{Addr: ":8080"} http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -27,17 +27,14 @@ func main() { } log.Println("Server stopped") return nil - }) + }, rutina.RunOpt.SetOnDone(rutina.Shutdown)) - // Gracefully stoping server when context canceled + // Gracefully stopping server when context canceled r.Go(func(ctx context.Context) error { <-ctx.Done() log.Println("Stopping server...") return srv.Shutdown(ctx) - }) - - // OS signals subscriber - r.ListenOsSignals() + }, nil) if err := r.Wait(); err != nil { log.Fatal(err) diff --git a/example/policies.go b/example/policies.go index 6addea7..8bb4643 100644 --- a/example/policies.go +++ b/example/policies.go @@ -13,58 +13,41 @@ import ( func main() { // New instance with builtin context - r := rutina.New() - - r = r.With(rutina.WithErrChan(), rutina.WithStdLogger()) + r := rutina.New(rutina.Opt.SetLogger(log.Printf).SetListenOsSignals(true)) r.Go(func(ctx context.Context) error { <-time.After(1 * time.Second) log.Println("Do something 1 second without errors and restart") return nil - }, rutina.RestartIfDone, rutina.ShutdownIfError) + }, nil) r.Go(func(ctx context.Context) error { <-time.After(2 * time.Second) log.Println("Do something 2 seconds without errors and do nothing") return nil - }, rutina.DoNothingIfDone, rutina.ShutdownIfError) - - r.Go(func(ctx context.Context) error { - <-time.After(3 * time.Second) - log.Println("Do something 3 seconds with error and restart") - return errors.New("Error #1!") - }, rutina.RestartIfError) + }, nil) r.Go(func(ctx context.Context) error { - <-time.After(4 * time.Second) - log.Println("Do something 4 seconds with error and do nothing") - return errors.New("Error #2!") - }, rutina.DoNothingIfError) - - r.Go(func(ctx context.Context) error { - <-time.After(10 * time.Second) - log.Println("Do something 10 seconds with error and close context") - return errors.New("Successfully shutdown at proper place") - }, rutina.ShutdownIfError) + select { + case <-time.After(time.Second): + return errors.New("max 10 times") + case <-ctx.Done(): + return nil + } + }, rutina.RunOpt.SetOnError(rutina.Restart).SetMaxCount(10)) r.Go(func(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - log.Println("Shutdown chan listener") - return nil - case err := <-r.Errors(): - log.Printf("Error in chan: %v", err) - } + select { + case <-time.After(time.Second): + return errors.New("max 10 seconds") + case <-ctx.Done(): + return nil } - }) - - // OS signals subscriber - r.ListenOsSignals() + }, rutina.RunOpt.SetOnError(rutina.Restart).SetTimeout(10*time.Second)) if err := r.Wait(); err != nil { log.Fatal(err) } else { - log.Println("Routines stopped but not correct") + log.Println("Routines stopped") } } diff --git a/mixins.go b/mixins.go deleted file mode 100755 index aea08ef..0000000 --- a/mixins.go +++ /dev/null @@ -1,90 +0,0 @@ -package rutina - -import ( - "context" - "log" - "os" - "syscall" -) - -// Mixin interface -type Mixin interface { - apply(*Rutina) -} - -// MixinContext propagates user defined context to rutina -type MixinContext struct { - Context context.Context -} - -// WithContext propagates user defined context to rutina -func WithContext(context context.Context) *MixinContext { - return &MixinContext{Context: context} -} - -func (o MixinContext) apply(r *Rutina) { - ctx, cancel := context.WithCancel(o.Context) - r.ctx = ctx - r.Cancel = cancel -} - -// MixinLogger adds logger to rutina -type MixinLogger struct { - Logger *log.Logger -} - -// WithLogger adds custom logger to rutina -func WithLogger(logger *log.Logger) *MixinLogger { - return &MixinLogger{Logger: logger} -} - -// WithStdLogger adds standard logger to rutina -func WithStdLogger() *MixinLogger { - return &MixinLogger{Logger: log.New(os.Stdout, "[rutina]", log.LstdFlags)} -} - -func (o MixinLogger) apply(r *Rutina) { - r.logger = o.Logger -} - -// MixinErrChan turns on errors channel on rutina -type MixinErrChan struct { -} - -// WithErrChan turns on errors channel on rutina -func WithErrChan() *MixinErrChan { - return &MixinErrChan{} -} - -func (o MixinErrChan) apply(r *Rutina) { - r.errCh = make(chan error, 1) -} - -type LifecycleListener func(event Event, routineID int) - -type LifecycleMixin struct { - Listener LifecycleListener -} - -func (l LifecycleMixin) apply(r *Rutina) { - r.lifecycleListener = l.Listener -} - -func WithLifecycleListener(listener LifecycleListener) *LifecycleMixin { - return &LifecycleMixin{Listener: listener} -} - -type ListenOsSignalsMixin struct { - Signals []os.Signal -} - -func (l ListenOsSignalsMixin) apply(r *Rutina) { - r.autoListenSignals = l.Signals -} - -func WithListenOsSignals(signals ...os.Signal) *ListenOsSignalsMixin { - if len(signals) == 0 { - signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM} - } - return &ListenOsSignalsMixin{Signals: signals} -} @@ -1,13 +1,82 @@ package rutina -// Options sets custom run policies -type Options int +import ( + "context" + "time" +) + +type Options struct { + ParentContext context.Context + ListenOsSignals bool + Logger func(format string, v ...interface{}) + Errors chan error +} + +func (o *Options) SetParentContext(ctx context.Context) *Options { + o.ParentContext = ctx + return o +} + +func (o *Options) SetListenOsSignals(listenOsSignals bool) *Options { + o.ListenOsSignals = listenOsSignals + return o +} + +func (o *Options) SetLogger(l logger) *Options { + o.Logger = l + return o +} + +func (o *Options) SetErrors(errCh chan error) *Options { + o.Errors = errCh + return o +} + +var Opt = &Options{ + ParentContext: context.Background(), + ListenOsSignals: false, + Logger: nil, + Errors: nil, +} + +type Policy int const ( - ShutdownIfError Options = iota // Shutdown all routines if fail - RestartIfError // Restart this routine if fail - DoNothingIfError // Do nothing on fail - ShutdownIfDone // Shutdown all routines if this routine done without errors - RestartIfDone // Restart if this routine done without errors - DoNothingIfDone // Do nothing if this routine done without errors + DoNothing Policy = iota + Shutdown + Restart ) + +type RunOptions struct { + OnDone Policy + OnError Policy + Timeout *time.Duration + MaxCount *int +} + +func (rp *RunOptions) SetOnDone(policy Policy) *RunOptions { + rp.OnDone = policy + return rp +} + +func (rp *RunOptions) SetOnError(policy Policy) *RunOptions { + rp.OnError = policy + return rp +} + +func (rp *RunOptions) SetTimeout(timeout time.Duration) *RunOptions { + rp.Timeout = &timeout + return rp +} + +func (rp *RunOptions) SetMaxCount(maxCount int) *RunOptions { + rp.MaxCount = &maxCount + return rp +} + +var RunOpt = &RunOptions{ + OnDone: DoNothing, + OnError: Shutdown, + Timeout: nil, + MaxCount: nil, +} @@ -2,114 +2,121 @@ package rutina import ( "context" - "log" + "errors" "os" "os/signal" "sync" "sync/atomic" "syscall" + "time" ) +var ( + ErrRunLimit = errors.New("rutina run limit") + ErrTimeoutOrKilled = errors.New("rutina timeouted or killed") + ErrProcessNotFound = errors.New("process not found") + ErrShutdown = errors.New("shutdown") +) + +type logger func(format string, v ...interface{}) + +var nopLogger = func(format string, v ...interface{}) {} + //Rutina is routine manager type Rutina struct { - ctx context.Context // State of application (started/stopped) - Cancel func() // Cancel func that stops all routines - wg sync.WaitGroup // WaitGroup that wait all routines to complete - onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines - onceWait sync.Once // Flag that prevents wait already waited rutina - err error // First error that shutdowns all routines - logger *log.Logger // Optional logger - counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger - errCh chan error // Optional channel for errors when RestartIfError and DoNothingIfError - lifecycleListener LifecycleListener // Optional listener for events - autoListenSignals []os.Signal // Optional listening os signals, default disabled + ctx context.Context // State of application (started/stopped) + Cancel func() // Cancel func that stops all routines + wg sync.WaitGroup // WaitGroup that wait all routines to complete + onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines + onceWait sync.Once // Flag that prevents wait already waited rutina + err error // First error that shutdowns all routines + logger logger // Optional logger + counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger + errCh chan error // Optional channel for errors when RestartIfError and DoNothingIfError + autoListenSignals []os.Signal // Optional listening os signals, default disabled + processes map[uint64]*process + mu sync.Mutex } // New instance with builtin context -func New(mixins ...Mixin) *Rutina { - ctx, cancel := context.WithCancel(context.Background()) +func New(opts *Options) *Rutina { + if opts == nil { + opts = Opt + } + ctx, cancel := context.WithCancel(opts.ParentContext) var counter uint64 - r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil} - return r.With(mixins...) -} - -// With applies mixins -func (r *Rutina) With(mixins ...Mixin) *Rutina { - for _, m := range mixins { - m.apply(r) + if opts.Logger == nil { + opts.Logger = nopLogger } - if r.autoListenSignals != nil { - r.ListenOsSignals(r.autoListenSignals...) + var signals []os.Signal + if opts.ListenOsSignals { + signals = []os.Signal{os.Kill, os.Interrupt} + } + return &Rutina{ + ctx: ctx, + Cancel: cancel, + wg: sync.WaitGroup{}, + onceErr: sync.Once{}, + onceWait: sync.Once{}, + err: nil, + logger: opts.Logger, + counter: &counter, + errCh: opts.Errors, + autoListenSignals: signals, + processes: map[uint64]*process{}, + mu: sync.Mutex{}, } - return r } // Go routine -func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) { +func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint64 { + if opts == nil { + opts = RunOpt + } // Check that context is not canceled yet if r.ctx.Err() != nil { - return - } - onFail := ShutdownIfError - for _, o := range opts { - switch o { - case ShutdownIfError: - onFail = ShutdownIfError - case RestartIfError: - onFail = RestartIfError - case DoNothingIfError: - onFail = DoNothingIfError - } + return 0 } - onDone := ShutdownIfDone - for _, o := range opts { - switch o { - case ShutdownIfDone: - onDone = ShutdownIfDone - case RestartIfDone: - onDone = RestartIfDone - case DoNothingIfDone: - onDone = DoNothingIfDone - } + + r.mu.Lock() + id := atomic.AddUint64(r.counter, 1) + process := process{ + id: id, + doer: doer, + onDone: opts.OnDone, + onError: opts.OnError, + restartLimit: opts.MaxCount, + restartCount: 0, + timeout: opts.Timeout, } + r.processes[id] = &process + r.mu.Unlock() r.wg.Add(1) go func() { defer r.wg.Done() - id := atomic.AddUint64(r.counter, 1) - r.lifecycleEvent(EventRoutineStart, int(id)) - if err := doer(r.ctx); err != nil { - r.lifecycleEvent(EventRoutineError, int(id)) - r.lifecycleEvent(EventRoutineStop, int(id)) - // errors history - if r.errCh != nil { - r.errCh <- err - } - // region routine failed - switch onFail { - case ShutdownIfError: - // Save error only if shutdown all routines + if err := process.run(r.ctx, r.errCh, r.logger); err != nil { + if err != ErrShutdown { r.onceErr.Do(func() { r.err = err }) - r.Cancel() - case RestartIfError: - r.Go(doer, opts...) } - // endregion - } else { - r.lifecycleEvent(EventRoutineComplete, int(id)) - r.lifecycleEvent(EventRoutineStop, int(id)) - // region routine successfully done - switch onDone { - case ShutdownIfDone: - r.Cancel() - case RestartIfDone: - r.Go(doer, opts...) - } - // endregion + r.Cancel() } + r.mu.Lock() + defer r.mu.Unlock() + delete(r.processes, process.id) + r.logger("completed #%d", process.id) }() + return id +} + +func (r *Rutina) Processes() []uint64 { + var procesess []uint64 + for id, _ := range r.processes { + procesess = append(procesess, id) + } + return procesess } // Errors returns chan for all errors, event if DoNothingIfError or RestartIfError set. @@ -126,10 +133,10 @@ func (r *Rutina) ListenOsSignals(signals ...os.Signal) { go func() { sig := make(chan os.Signal, 1) signal.Notify(sig, signals...) - r.log("starting OS signals listener") + r.logger("starting OS signals listener") select { case s := <-sig: - r.log("stopping by OS signal (%v)", s) + r.logger("stopping by OS signal (%v)", s) r.Cancel() case <-r.ctx.Done(): } @@ -138,14 +145,11 @@ func (r *Rutina) ListenOsSignals(signals ...os.Signal) { // Wait all routines and returns first error or nil if all routines completes without errors func (r *Rutina) Wait() error { + if len(r.autoListenSignals) > 0 { + r.ListenOsSignals(r.autoListenSignals...) + } r.onceWait.Do(func() { r.wg.Wait() - r.lifecycleEvent(EventAppStop, 0) - if r.err == nil { - r.lifecycleEvent(EventAppComplete, 0) - } else { - r.lifecycleEvent(EventAppError, 0) - } if r.errCh != nil { close(r.errCh) } @@ -153,16 +157,80 @@ func (r *Rutina) Wait() error { return r.err } -func (r *Rutina) lifecycleEvent(ev Event, rid int) { - r.log("Event = %s Routine ID = %d", ev.String(), rid) - if r.lifecycleListener != nil { - r.lifecycleListener(ev, rid) +// Kill process by id +func (r *Rutina) Kill(id uint64) error { + p, ok := r.processes[id] + if !ok { + return ErrProcessNotFound } + if p.cancel != nil { + p.cancel() + } + return nil +} + +type process struct { + id uint64 + doer func(ctx context.Context) error + cancel func() + onDone Policy + onError Policy + restartLimit *int + restartCount int + timeout *time.Duration } -// Log if can -func (r *Rutina) log(format string, args ...interface{}) { - if r.logger != nil { - r.logger.Printf(format, args...) +func (p *process) run(pctx context.Context, errCh chan error, logger logger) error { + var ctx context.Context + if p.timeout != nil { + ctx, p.cancel = context.WithTimeout(pctx, *p.timeout) + defer p.cancel() + } else { + ctx, p.cancel = context.WithCancel(pctx) + } + for { + logger("starting process #%d", p.id) + p.restartCount++ + currentAction := p.onDone + err := p.doer(ctx) + if err != nil { + if p.onError == Shutdown { + return err + } + currentAction = p.onError + logger("error on process #%d: %s", p.id, err) + if errCh != nil { + errCh <- err + } + } + switch currentAction { + case DoNothing: + return nil + case Shutdown: + return ErrShutdown + case Restart: + if ctx.Err() != nil { + if p.onError == Shutdown { + return ErrTimeoutOrKilled + } else { + if errCh != nil { + errCh <- ErrTimeoutOrKilled + } + return nil + } + } + if p.restartLimit == nil || p.restartCount > *p.restartLimit { + logger("run count limit process #%d", p.id) + if p.onError == Shutdown { + return ErrRunLimit + } else { + if errCh != nil { + errCh <- ErrRunLimit + } + return nil + } + } + logger("restarting process #%d", p.id) + } } } diff --git a/rutina_test.go b/rutina_test.go index 0c6d94a..3cbdf40 100755 --- a/rutina_test.go +++ b/rutina_test.go @@ -8,10 +8,7 @@ import ( ) func TestSuccess(t *testing.T) { - r := New( - WithStdLogger(), - WithContext(context.Background()), - ) + r := New(nil) counter := 0 f := func(name string, ttl time.Duration) error { counter++ @@ -22,13 +19,13 @@ func TestSuccess(t *testing.T) { } r.Go(func(ctx context.Context) error { return f("one", 1*time.Second) - }) + }, nil) r.Go(func(ctx context.Context) error { return f("two", 2*time.Second) - }) + }, nil) r.Go(func(ctx context.Context) error { return f("three", 3*time.Second) - }) + }, nil) if err := r.Wait(); err != nil { t.Error("Unexpected error", err) } @@ -40,7 +37,7 @@ func TestSuccess(t *testing.T) { } func TestError(t *testing.T) { - r := New() + r := New(nil) f := func(name string, ttl time.Duration) error { <-time.After(ttl) t.Log(name) @@ -48,13 +45,13 @@ func TestError(t *testing.T) { } r.Go(func(ctx context.Context) error { return f("one", 1*time.Second) - }) + }, nil) r.Go(func(ctx context.Context) error { return f("two", 2*time.Second) - }) + }, nil) r.Go(func(ctx context.Context) error { return f("three", 3*time.Second) - }) + }, nil) if err := r.Wait(); err != nil { if err.Error() != "error from one" { t.Error("Must be error from first routine") @@ -65,12 +62,12 @@ func TestError(t *testing.T) { } func TestContext(t *testing.T) { - r := New() + r := New(nil) cc := false r.Go(func(ctx context.Context) error { <-time.After(1 * time.Second) return nil - }, ShutdownIfDone) + }, RunOpt.SetOnDone(Shutdown)) r.Go(func(ctx context.Context) error { select { case <-ctx.Done(): @@ -79,7 +76,7 @@ func TestContext(t *testing.T) { case <-time.After(3 * time.Second): return errors.New("Timeout") } - }, ShutdownIfDone) + }, nil) if err := r.Wait(); err != nil { t.Error("Unexpected error", err) } |