aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Kiryukhin <a.kiryukhin@mail.ru>2020-01-12 16:40:01 +0300
committerAlexander Kiryukhin <a.kiryukhin@mail.ru>2020-01-12 16:40:01 +0300
commit837cfe3223db11bf7f5edb6ba738d3e328b80ea2 (patch)
tree8207e53a6411ea5cd9f1da96f0de9d957c7301f2
parent01eeeaf5e136928abe75f95d58f3f9cce11c6fe6 (diff)
Refactored
New clean API Timeouts and restart limits
-rwxr-xr-xREADME.md128
-rw-r--r--event_string.go29
-rw-r--r--events.go22
-rwxr-xr-xexample/http_server.go11
-rw-r--r--example/policies.go51
-rwxr-xr-xmixins.go90
-rw-r--r--options.go85
-rwxr-xr-xrutina.go252
-rwxr-xr-xrutina_test.go25
9 files changed, 308 insertions, 385 deletions
diff --git a/README.md b/README.md
index 5b1cde6..384beed 100755
--- a/README.md
+++ b/README.md
@@ -18,18 +18,22 @@ Usually, when your program consists of several routines (i.e.: http server, metr
### New instance
+With default options:
+
```go
-r := rutina.New()
+r := rutina.New(nil)
```
-or with optional mixins (see below):
+or with custom options:
```go
-r := rutina.New(...Mixins)
-```
-or
-```go
-r.With(...Mixins)
+r := rutina.New(
+ rutina.Opt.
+ SetParentContext(ctx context.Context). // Pass parent context to Rutina (otherwise it uses own new context)
+ SetListenOsSignals(listenOsSignals bool). // Auto listen OS signals and close context on Kill, Term signal
+ SetLogger(l logger). // Pass logger for debug, i.e. `log.Printf`
+ SetErrors(errCh chan error) // Set errors channel for errors from routines in Restart/DoNothing errors policy
+)
```
### Start new routine
@@ -37,45 +41,46 @@ r.With(...Mixins)
```go
r.Go(func (ctx context.Context) error {
...do something...
-}, ...runOptions)
+}, *runOptions)
```
-Available options of run policy:
-
-* `ShutdownIfError` - Shutdown all routines if this routine returns error
-* `RestartIfError` - Restart this routine if this routine returns error
-* `DoNothingIfError` - Do nothing just stop this routine if this routine returns error
-* `ShutdownIfDone` - Shutdown all routines if this routine done without errors
-* `RestartIfDone` - Restart if this routine done without errors
-* `DoNothingIfDone` - Do nothing if this routine done without errors
+#### Run Options
-Default policy:
+```go
+RunOpt.
+ SetOnDone(policy Policy). // Run policy if returns no error
+ SetOnError(policy Policy). // Run policy if returns error
+ SetTimeout(timeout time.Duration). // Timeout to routine (after it context will be closed)
+ SetMaxCount(maxCount int) // Max tries on Restart policy
+```
-`ShutdownIfError` && `ShutdownIfDone`
+#### Run policies
-(just like [errgroup](https://godoc.org/golang.org/x/sync/errgroup))
+* `DoNothing` - do not affect other routines
+* `Restart` - restart current routine
+* `Shutdown` - shutdown all routines
#### Example of run policies
```go
r.Go(func(ctx context.Context) error {
- // If this routine produce no error - it just restarts
+ // If this routine produce no error - it just completes, other routines not affected
// If it returns error - all other routines will shutdown (because context cancels)
-}, rutina.RestartIfDone, rutina.ShutdownIfError)
+}, nil)
r.Go(func(ctx context.Context) error {
- // If this routine produce no error - it just completes
+ // If this routine produce no error - it restarts
// If it returns error - all other routines will shutdown (because context cancels)
-}, rutina.DoNothingIfDone, rutina.ShutdownIfError)
+}, rutina.RunOpt.SetOnDone(rutina.Restart))
r.Go(func(ctx context.Context) error {
// If this routine produce no error - all other routines will shutdown (because context cancels)
// If it returns error - it will be restarted
-}, rutina.RestartIfError)
+}, rutina.RunOpt.SetOnDone(rutina.Shutdown).SetOnError(rutina.Restart))
r.Go(func(ctx context.Context) error {
// If this routine stopped by any case - all other routines will shutdown (because context cancels)
-}, rutina.ShutdownIfDone)
+}, rutina.RunOpt.SetOnDone(rutina.Shutdown))
r.ListenOsSignals() // Shutdown all routines by OS signal
```
@@ -88,84 +93,29 @@ err := r.Wait()
Here err = error that shutdowns all routines (may be will be changed at future)
-### Get errors channel
-
-```go
-err := <- r.Errors()
-```
-
-Disabled by default. Use `r.With(rutina.WithErrChan())` to turn on.
-
-## Lifecycle events
-
-Rutina has own simple lifecycle events:
-
-* `EventRoutineStart` - Fires when starts new routine
-* `EventRoutineStop` - Fires when routine stopped with any result
-* `EventRoutineComplete` - Fires when routine stopped without errors
-* `EventRoutineError` - Fires when routine stopped with error
-* `EventAppStop` - Fires when all routines stopped with any result
-* `EventAppComplete` - Fires when all routines stopped with no errors
-* `EventAppError` - Fires when all routines stopped with error
-
-## Mixins
-
-### Usage
-
-```go
-r := rutina.New(mixin1, mixin2, ...)
-```
-or
-```go
-r := rutina.New()
-r = r.With(mixin1, mixin2, ...) // Returns new instance of Rutina!
-```
-
-### Logger
-
-```go
-r = r.With(rutina.WithStdLogger())
-```
-or
-```go
-r = r.With(rutina.WithLogger(logger log.Logger))
-```
-
-Sets standard or custom logger. By default there is no logger.
-
-### Custom context
+### Kill routines
```go
-r = r.With(rutina.WithContext(ctx context.Context))
-````
-
-Propagates your own context to Rutina. By default it use own context.
-
-### Enable errors channel
-
-```go
-r = r.With(rutina.WithErrChan())
+id := r.Go(func (ctx context.Context) error { ... })
...
-err := <- r.Errors()
+r.Kill(id) // Closes individual context for #id routine that must shutdown it
```
-Turn on errors channel
-
-### Lifecycle listener
+### List of routines
```go
-r = r.With(rutina.WithLifecycleListener(func (event rutina.Event, rid int) { ... }))
+list := r.Processes()
```
-Simple lifecycle listener
+Returns ids of working routines
-### Auto listen OS signals
+### Get errors channel
```go
-r = r.With(rutina.WithListenOsSignals())
+err := <- r.Errors()
```
-Automatically listen OS signals. There is no `r.ListenOsSignals()` needed.
+Disabled by default. Used when passed errors channel to rutina options
## Example
diff --git a/event_string.go b/event_string.go
deleted file mode 100644
index 6501cad..0000000
--- a/event_string.go
+++ /dev/null
@@ -1,29 +0,0 @@
-// Code generated by "stringer -type=Event"; DO NOT EDIT.
-
-package rutina
-
-import "strconv"
-
-func _() {
- // An "invalid array index" compiler error signifies that the constant values have changed.
- // Re-run the stringer command to generate them again.
- var x [1]struct{}
- _ = x[EventRoutineStart-0]
- _ = x[EventRoutineStop-1]
- _ = x[EventRoutineComplete-2]
- _ = x[EventRoutineError-3]
- _ = x[EventAppStop-4]
- _ = x[EventAppComplete-5]
- _ = x[EventAppError-6]
-}
-
-const _Event_name = "EventRoutineStartEventRoutineStopEventRoutineCompleteEventRoutineErrorEventAppStopEventAppCompleteEventAppError"
-
-var _Event_index = [...]uint8{0, 17, 33, 53, 70, 82, 98, 111}
-
-func (i Event) String() string {
- if i < 0 || i >= Event(len(_Event_index)-1) {
- return "Event(" + strconv.FormatInt(int64(i), 10) + ")"
- }
- return _Event_name[_Event_index[i]:_Event_index[i+1]]
-}
diff --git a/events.go b/events.go
deleted file mode 100644
index 6159343..0000000
--- a/events.go
+++ /dev/null
@@ -1,22 +0,0 @@
-//go:generate stringer -type=Event
-package rutina
-
-// Event represents lifecycle events
-type Event int
-
-const (
- EventRoutineStart Event = iota
- EventRoutineStop
- EventRoutineComplete
- EventRoutineError
- EventAppStop
- EventAppComplete
- EventAppError
-)
-
-// Hook is function that calls when event fired
-// Params:
-// ev Event - fired event
-// r *Rutina - pointer to rutina
-// rid int - ID of routine if present, 0 - otherwise
-type Hook func(ev Event, r *Rutina, rid int) error
diff --git a/example/http_server.go b/example/http_server.go
index fe1d456..6bb7157 100755
--- a/example/http_server.go
+++ b/example/http_server.go
@@ -13,7 +13,7 @@ import (
func main() {
// New instance with builtin context
- r := rutina.New(rutina.WithStdLogger())
+ r := rutina.New(rutina.Opt.SetListenOsSignals(true))
srv := &http.Server{Addr: ":8080"}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@@ -27,17 +27,14 @@ func main() {
}
log.Println("Server stopped")
return nil
- })
+ }, rutina.RunOpt.SetOnDone(rutina.Shutdown))
- // Gracefully stoping server when context canceled
+ // Gracefully stopping server when context canceled
r.Go(func(ctx context.Context) error {
<-ctx.Done()
log.Println("Stopping server...")
return srv.Shutdown(ctx)
- })
-
- // OS signals subscriber
- r.ListenOsSignals()
+ }, nil)
if err := r.Wait(); err != nil {
log.Fatal(err)
diff --git a/example/policies.go b/example/policies.go
index 6addea7..8bb4643 100644
--- a/example/policies.go
+++ b/example/policies.go
@@ -13,58 +13,41 @@ import (
func main() {
// New instance with builtin context
- r := rutina.New()
-
- r = r.With(rutina.WithErrChan(), rutina.WithStdLogger())
+ r := rutina.New(rutina.Opt.SetLogger(log.Printf).SetListenOsSignals(true))
r.Go(func(ctx context.Context) error {
<-time.After(1 * time.Second)
log.Println("Do something 1 second without errors and restart")
return nil
- }, rutina.RestartIfDone, rutina.ShutdownIfError)
+ }, nil)
r.Go(func(ctx context.Context) error {
<-time.After(2 * time.Second)
log.Println("Do something 2 seconds without errors and do nothing")
return nil
- }, rutina.DoNothingIfDone, rutina.ShutdownIfError)
-
- r.Go(func(ctx context.Context) error {
- <-time.After(3 * time.Second)
- log.Println("Do something 3 seconds with error and restart")
- return errors.New("Error #1!")
- }, rutina.RestartIfError)
+ }, nil)
r.Go(func(ctx context.Context) error {
- <-time.After(4 * time.Second)
- log.Println("Do something 4 seconds with error and do nothing")
- return errors.New("Error #2!")
- }, rutina.DoNothingIfError)
-
- r.Go(func(ctx context.Context) error {
- <-time.After(10 * time.Second)
- log.Println("Do something 10 seconds with error and close context")
- return errors.New("Successfully shutdown at proper place")
- }, rutina.ShutdownIfError)
+ select {
+ case <-time.After(time.Second):
+ return errors.New("max 10 times")
+ case <-ctx.Done():
+ return nil
+ }
+ }, rutina.RunOpt.SetOnError(rutina.Restart).SetMaxCount(10))
r.Go(func(ctx context.Context) error {
- for {
- select {
- case <-ctx.Done():
- log.Println("Shutdown chan listener")
- return nil
- case err := <-r.Errors():
- log.Printf("Error in chan: %v", err)
- }
+ select {
+ case <-time.After(time.Second):
+ return errors.New("max 10 seconds")
+ case <-ctx.Done():
+ return nil
}
- })
-
- // OS signals subscriber
- r.ListenOsSignals()
+ }, rutina.RunOpt.SetOnError(rutina.Restart).SetTimeout(10*time.Second))
if err := r.Wait(); err != nil {
log.Fatal(err)
} else {
- log.Println("Routines stopped but not correct")
+ log.Println("Routines stopped")
}
}
diff --git a/mixins.go b/mixins.go
deleted file mode 100755
index aea08ef..0000000
--- a/mixins.go
+++ /dev/null
@@ -1,90 +0,0 @@
-package rutina
-
-import (
- "context"
- "log"
- "os"
- "syscall"
-)
-
-// Mixin interface
-type Mixin interface {
- apply(*Rutina)
-}
-
-// MixinContext propagates user defined context to rutina
-type MixinContext struct {
- Context context.Context
-}
-
-// WithContext propagates user defined context to rutina
-func WithContext(context context.Context) *MixinContext {
- return &MixinContext{Context: context}
-}
-
-func (o MixinContext) apply(r *Rutina) {
- ctx, cancel := context.WithCancel(o.Context)
- r.ctx = ctx
- r.Cancel = cancel
-}
-
-// MixinLogger adds logger to rutina
-type MixinLogger struct {
- Logger *log.Logger
-}
-
-// WithLogger adds custom logger to rutina
-func WithLogger(logger *log.Logger) *MixinLogger {
- return &MixinLogger{Logger: logger}
-}
-
-// WithStdLogger adds standard logger to rutina
-func WithStdLogger() *MixinLogger {
- return &MixinLogger{Logger: log.New(os.Stdout, "[rutina]", log.LstdFlags)}
-}
-
-func (o MixinLogger) apply(r *Rutina) {
- r.logger = o.Logger
-}
-
-// MixinErrChan turns on errors channel on rutina
-type MixinErrChan struct {
-}
-
-// WithErrChan turns on errors channel on rutina
-func WithErrChan() *MixinErrChan {
- return &MixinErrChan{}
-}
-
-func (o MixinErrChan) apply(r *Rutina) {
- r.errCh = make(chan error, 1)
-}
-
-type LifecycleListener func(event Event, routineID int)
-
-type LifecycleMixin struct {
- Listener LifecycleListener
-}
-
-func (l LifecycleMixin) apply(r *Rutina) {
- r.lifecycleListener = l.Listener
-}
-
-func WithLifecycleListener(listener LifecycleListener) *LifecycleMixin {
- return &LifecycleMixin{Listener: listener}
-}
-
-type ListenOsSignalsMixin struct {
- Signals []os.Signal
-}
-
-func (l ListenOsSignalsMixin) apply(r *Rutina) {
- r.autoListenSignals = l.Signals
-}
-
-func WithListenOsSignals(signals ...os.Signal) *ListenOsSignalsMixin {
- if len(signals) == 0 {
- signals = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
- }
- return &ListenOsSignalsMixin{Signals: signals}
-}
diff --git a/options.go b/options.go
index be91068..70a9965 100644
--- a/options.go
+++ b/options.go
@@ -1,13 +1,82 @@
package rutina
-// Options sets custom run policies
-type Options int
+import (
+ "context"
+ "time"
+)
+
+type Options struct {
+ ParentContext context.Context
+ ListenOsSignals bool
+ Logger func(format string, v ...interface{})
+ Errors chan error
+}
+
+func (o *Options) SetParentContext(ctx context.Context) *Options {
+ o.ParentContext = ctx
+ return o
+}
+
+func (o *Options) SetListenOsSignals(listenOsSignals bool) *Options {
+ o.ListenOsSignals = listenOsSignals
+ return o
+}
+
+func (o *Options) SetLogger(l logger) *Options {
+ o.Logger = l
+ return o
+}
+
+func (o *Options) SetErrors(errCh chan error) *Options {
+ o.Errors = errCh
+ return o
+}
+
+var Opt = &Options{
+ ParentContext: context.Background(),
+ ListenOsSignals: false,
+ Logger: nil,
+ Errors: nil,
+}
+
+type Policy int
const (
- ShutdownIfError Options = iota // Shutdown all routines if fail
- RestartIfError // Restart this routine if fail
- DoNothingIfError // Do nothing on fail
- ShutdownIfDone // Shutdown all routines if this routine done without errors
- RestartIfDone // Restart if this routine done without errors
- DoNothingIfDone // Do nothing if this routine done without errors
+ DoNothing Policy = iota
+ Shutdown
+ Restart
)
+
+type RunOptions struct {
+ OnDone Policy
+ OnError Policy
+ Timeout *time.Duration
+ MaxCount *int
+}
+
+func (rp *RunOptions) SetOnDone(policy Policy) *RunOptions {
+ rp.OnDone = policy
+ return rp
+}
+
+func (rp *RunOptions) SetOnError(policy Policy) *RunOptions {
+ rp.OnError = policy
+ return rp
+}
+
+func (rp *RunOptions) SetTimeout(timeout time.Duration) *RunOptions {
+ rp.Timeout = &timeout
+ return rp
+}
+
+func (rp *RunOptions) SetMaxCount(maxCount int) *RunOptions {
+ rp.MaxCount = &maxCount
+ return rp
+}
+
+var RunOpt = &RunOptions{
+ OnDone: DoNothing,
+ OnError: Shutdown,
+ Timeout: nil,
+ MaxCount: nil,
+}
diff --git a/rutina.go b/rutina.go
index 34d7723..cd5748e 100755
--- a/rutina.go
+++ b/rutina.go
@@ -2,114 +2,121 @@ package rutina
import (
"context"
- "log"
+ "errors"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
+ "time"
)
+var (
+ ErrRunLimit = errors.New("rutina run limit")
+ ErrTimeoutOrKilled = errors.New("rutina timeouted or killed")
+ ErrProcessNotFound = errors.New("process not found")
+ ErrShutdown = errors.New("shutdown")
+)
+
+type logger func(format string, v ...interface{})
+
+var nopLogger = func(format string, v ...interface{}) {}
+
//Rutina is routine manager
type Rutina struct {
- ctx context.Context // State of application (started/stopped)
- Cancel func() // Cancel func that stops all routines
- wg sync.WaitGroup // WaitGroup that wait all routines to complete
- onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines
- onceWait sync.Once // Flag that prevents wait already waited rutina
- err error // First error that shutdowns all routines
- logger *log.Logger // Optional logger
- counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
- errCh chan error // Optional channel for errors when RestartIfError and DoNothingIfError
- lifecycleListener LifecycleListener // Optional listener for events
- autoListenSignals []os.Signal // Optional listening os signals, default disabled
+ ctx context.Context // State of application (started/stopped)
+ Cancel func() // Cancel func that stops all routines
+ wg sync.WaitGroup // WaitGroup that wait all routines to complete
+ onceErr sync.Once // Flag that prevents overwrite first error that shutdowns all routines
+ onceWait sync.Once // Flag that prevents wait already waited rutina
+ err error // First error that shutdowns all routines
+ logger logger // Optional logger
+ counter *uint64 // Optional counter that names routines with increment ids for debug purposes at logger
+ errCh chan error // Optional channel for errors when RestartIfError and DoNothingIfError
+ autoListenSignals []os.Signal // Optional listening os signals, default disabled
+ processes map[uint64]*process
+ mu sync.Mutex
}
// New instance with builtin context
-func New(mixins ...Mixin) *Rutina {
- ctx, cancel := context.WithCancel(context.Background())
+func New(opts *Options) *Rutina {
+ if opts == nil {
+ opts = Opt
+ }
+ ctx, cancel := context.WithCancel(opts.ParentContext)
var counter uint64
- r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, errCh: nil}
- return r.With(mixins...)
-}
-
-// With applies mixins
-func (r *Rutina) With(mixins ...Mixin) *Rutina {
- for _, m := range mixins {
- m.apply(r)
+ if opts.Logger == nil {
+ opts.Logger = nopLogger
}
- if r.autoListenSignals != nil {
- r.ListenOsSignals(r.autoListenSignals...)
+ var signals []os.Signal
+ if opts.ListenOsSignals {
+ signals = []os.Signal{os.Kill, os.Interrupt}
+ }
+ return &Rutina{
+ ctx: ctx,
+ Cancel: cancel,
+ wg: sync.WaitGroup{},
+ onceErr: sync.Once{},
+ onceWait: sync.Once{},
+ err: nil,
+ logger: opts.Logger,
+ counter: &counter,
+ errCh: opts.Errors,
+ autoListenSignals: signals,
+ processes: map[uint64]*process{},
+ mu: sync.Mutex{},
}
- return r
}
// Go routine
-func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...Options) {
+func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint64 {
+ if opts == nil {
+ opts = RunOpt
+ }
// Check that context is not canceled yet
if r.ctx.Err() != nil {
- return
- }
- onFail := ShutdownIfError
- for _, o := range opts {
- switch o {
- case ShutdownIfError:
- onFail = ShutdownIfError
- case RestartIfError:
- onFail = RestartIfError
- case DoNothingIfError:
- onFail = DoNothingIfError
- }
+ return 0
}
- onDone := ShutdownIfDone
- for _, o := range opts {
- switch o {
- case ShutdownIfDone:
- onDone = ShutdownIfDone
- case RestartIfDone:
- onDone = RestartIfDone
- case DoNothingIfDone:
- onDone = DoNothingIfDone
- }
+
+ r.mu.Lock()
+ id := atomic.AddUint64(r.counter, 1)
+ process := process{
+ id: id,
+ doer: doer,
+ onDone: opts.OnDone,
+ onError: opts.OnError,
+ restartLimit: opts.MaxCount,
+ restartCount: 0,
+ timeout: opts.Timeout,
}
+ r.processes[id] = &process
+ r.mu.Unlock()
r.wg.Add(1)
go func() {
defer r.wg.Done()
- id := atomic.AddUint64(r.counter, 1)
- r.lifecycleEvent(EventRoutineStart, int(id))
- if err := doer(r.ctx); err != nil {
- r.lifecycleEvent(EventRoutineError, int(id))
- r.lifecycleEvent(EventRoutineStop, int(id))
- // errors history
- if r.errCh != nil {
- r.errCh <- err
- }
- // region routine failed
- switch onFail {
- case ShutdownIfError:
- // Save error only if shutdown all routines
+ if err := process.run(r.ctx, r.errCh, r.logger); err != nil {
+ if err != ErrShutdown {
r.onceErr.Do(func() {
r.err = err
})
- r.Cancel()
- case RestartIfError:
- r.Go(doer, opts...)
}
- // endregion
- } else {
- r.lifecycleEvent(EventRoutineComplete, int(id))
- r.lifecycleEvent(EventRoutineStop, int(id))
- // region routine successfully done
- switch onDone {
- case ShutdownIfDone:
- r.Cancel()
- case RestartIfDone:
- r.Go(doer, opts...)
- }
- // endregion
+ r.Cancel()
}
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ delete(r.processes, process.id)
+ r.logger("completed #%d", process.id)
}()
+ return id
+}
+
+func (r *Rutina) Processes() []uint64 {
+ var procesess []uint64
+ for id, _ := range r.processes {
+ procesess = append(procesess, id)
+ }
+ return procesess
}
// Errors returns chan for all errors, event if DoNothingIfError or RestartIfError set.
@@ -126,10 +133,10 @@ func (r *Rutina) ListenOsSignals(signals ...os.Signal) {
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, signals...)
- r.log("starting OS signals listener")
+ r.logger("starting OS signals listener")
select {
case s := <-sig:
- r.log("stopping by OS signal (%v)", s)
+ r.logger("stopping by OS signal (%v)", s)
r.Cancel()
case <-r.ctx.Done():
}
@@ -138,14 +145,11 @@ func (r *Rutina) ListenOsSignals(signals ...os.Signal) {
// Wait all routines and returns first error or nil if all routines completes without errors
func (r *Rutina) Wait() error {
+ if len(r.autoListenSignals) > 0 {
+ r.ListenOsSignals(r.autoListenSignals...)
+ }
r.onceWait.Do(func() {
r.wg.Wait()
- r.lifecycleEvent(EventAppStop, 0)
- if r.err == nil {
- r.lifecycleEvent(EventAppComplete, 0)
- } else {
- r.lifecycleEvent(EventAppError, 0)
- }
if r.errCh != nil {
close(r.errCh)
}
@@ -153,16 +157,80 @@ func (r *Rutina) Wait() error {
return r.err
}
-func (r *Rutina) lifecycleEvent(ev Event, rid int) {
- r.log("Event = %s Routine ID = %d", ev.String(), rid)
- if r.lifecycleListener != nil {
- r.lifecycleListener(ev, rid)
+// Kill process by id
+func (r *Rutina) Kill(id uint64) error {
+ p, ok := r.processes[id]
+ if !ok {
+ return ErrProcessNotFound
}
+ if p.cancel != nil {
+ p.cancel()
+ }
+ return nil
+}
+
+type process struct {
+ id uint64
+ doer func(ctx context.Context) error
+ cancel func()
+ onDone Policy
+ onError Policy
+ restartLimit *int
+ restartCount int
+ timeout *time.Duration
}
-// Log if can
-func (r *Rutina) log(format string, args ...interface{}) {
- if r.logger != nil {
- r.logger.Printf(format, args...)
+func (p *process) run(pctx context.Context, errCh chan error, logger logger) error {
+ var ctx context.Context
+ if p.timeout != nil {
+ ctx, p.cancel = context.WithTimeout(pctx, *p.timeout)
+ defer p.cancel()
+ } else {
+ ctx, p.cancel = context.WithCancel(pctx)
+ }
+ for {
+ logger("starting process #%d", p.id)
+ p.restartCount++
+ currentAction := p.onDone
+ err := p.doer(ctx)
+ if err != nil {
+ if p.onError == Shutdown {
+ return err
+ }
+ currentAction = p.onError
+ logger("error on process #%d: %s", p.id, err)
+ if errCh != nil {
+ errCh <- err
+ }
+ }
+ switch currentAction {
+ case DoNothing:
+ return nil
+ case Shutdown:
+ return ErrShutdown
+ case Restart:
+ if ctx.Err() != nil {
+ if p.onError == Shutdown {
+ return ErrTimeoutOrKilled
+ } else {
+ if errCh != nil {
+ errCh <- ErrTimeoutOrKilled
+ }
+ return nil
+ }
+ }
+ if p.restartLimit == nil || p.restartCount > *p.restartLimit {
+ logger("run count limit process #%d", p.id)
+ if p.onError == Shutdown {
+ return ErrRunLimit
+ } else {
+ if errCh != nil {
+ errCh <- ErrRunLimit
+ }
+ return nil
+ }
+ }
+ logger("restarting process #%d", p.id)
+ }
}
}
diff --git a/rutina_test.go b/rutina_test.go
index 0c6d94a..3cbdf40 100755
--- a/rutina_test.go
+++ b/rutina_test.go
@@ -8,10 +8,7 @@ import (
)
func TestSuccess(t *testing.T) {
- r := New(
- WithStdLogger(),
- WithContext(context.Background()),
- )
+ r := New(nil)
counter := 0
f := func(name string, ttl time.Duration) error {
counter++
@@ -22,13 +19,13 @@ func TestSuccess(t *testing.T) {
}
r.Go(func(ctx context.Context) error {
return f("one", 1*time.Second)
- })
+ }, nil)
r.Go(func(ctx context.Context) error {
return f("two", 2*time.Second)
- })
+ }, nil)
r.Go(func(ctx context.Context) error {
return f("three", 3*time.Second)
- })
+ }, nil)
if err := r.Wait(); err != nil {
t.Error("Unexpected error", err)
}
@@ -40,7 +37,7 @@ func TestSuccess(t *testing.T) {
}
func TestError(t *testing.T) {
- r := New()
+ r := New(nil)
f := func(name string, ttl time.Duration) error {
<-time.After(ttl)
t.Log(name)
@@ -48,13 +45,13 @@ func TestError(t *testing.T) {
}
r.Go(func(ctx context.Context) error {
return f("one", 1*time.Second)
- })
+ }, nil)
r.Go(func(ctx context.Context) error {
return f("two", 2*time.Second)
- })
+ }, nil)
r.Go(func(ctx context.Context) error {
return f("three", 3*time.Second)
- })
+ }, nil)
if err := r.Wait(); err != nil {
if err.Error() != "error from one" {
t.Error("Must be error from first routine")
@@ -65,12 +62,12 @@ func TestError(t *testing.T) {
}
func TestContext(t *testing.T) {
- r := New()
+ r := New(nil)
cc := false
r.Go(func(ctx context.Context) error {
<-time.After(1 * time.Second)
return nil
- }, ShutdownIfDone)
+ }, RunOpt.SetOnDone(Shutdown))
r.Go(func(ctx context.Context) error {
select {
case <-ctx.Done():
@@ -79,7 +76,7 @@ func TestContext(t *testing.T) {
case <-time.After(3 * time.Second):
return errors.New("Timeout")
}
- }, ShutdownIfDone)
+ }, nil)
if err := r.Wait(); err != nil {
t.Error("Unexpected error", err)
}