aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xREADME.md45
-rwxr-xr-xexample/http_server.go9
-rw-r--r--example/policies.go13
-rw-r--r--options.go110
-rwxr-xr-xrutina.go36
5 files changed, 125 insertions, 88 deletions
diff --git a/README.md b/README.md
index e27aa4d..7b01839 100755
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@ Package Rutina (russian "рутина" - ordinary boring everyday work) is routi
It seems like https://godoc.org/golang.org/x/sync/errgroup with some different:
1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`).
-2) has flexible run/stop policy. i.e. one routine restarts when it errors (useful on daemons) but if errors another - all routines will be cancelled
+2) has flexible run/stop policy. i.e. one routine restarts when it errors (useful on daemons) but if errors another - all routines will be cancelled
3) already has optional signal handler `ListenOsSignals()`
## When it need?
@@ -21,18 +21,17 @@ Usually, when your program consists of several routines (i.e.: http server, metr
With default options:
```go
-r := rutina.New(nil)
+r := rutina.New()
```
or with custom options:
```go
r := rutina.New(
- rutina.Opt.
- SetParentContext(ctx context.Context). // Pass parent context to Rutina (otherwise it uses own new context)
- SetListenOsSignals(listenOsSignals bool). // Auto listen OS signals and close context on Kill, Term signal
- SetLogger(l logger). // Pass logger for debug, i.e. `log.Printf`
- SetErrors(errCh chan error) // Set errors channel for errors from routines in Restart/DoNothing errors policy
+ ParentContext(ctx context.Context), // Pass parent context to Rutina (otherwise it uses own new context)
+ ListenOsSignals(listenOsSignals ...os.Signal), // Auto listen OS signals and close context on Kill, Term signal
+ Logger(l logger), // Pass logger for debug, i.e. `log.Printf`
+ Errors(errCh chan error), // Set errors channel for errors from routines in Restart/DoNothing errors policy
)
```
@@ -41,17 +40,21 @@ r := rutina.New(
```go
r.Go(func (ctx context.Context) error {
...do something...
-}, *runOptions)
+})
```
#### Run Options
```go
-RunOpt.
- SetOnDone(policy Policy). // Run policy if returns no error
- SetOnError(policy Policy). // Run policy if returns error
- SetTimeout(timeout time.Duration). // Timeout to routine (after it context will be closed)
- SetMaxCount(maxCount int) // Max tries on Restart policy
+r.Go(
+ func (ctx context.Context) error {
+ ...do something...
+ },
+ SetOnDone(policy Policy), // Run policy if returns no error (default: Shutdown)
+ SetOnError(policy Policy), // Run policy if returns error (default: Shutdown)
+ SetTimeout(timeout time.Duration), // Timeout to routine (after it context will be closed)
+ SetMaxCount(maxCount int), // Max tries on Restart policy
+)
```
#### Run policies
@@ -64,23 +67,23 @@ RunOpt.
```go
r.Go(func(ctx context.Context) error {
- // If this routine produce no error - it just completes, other routines not affected
+ // If this routine produce no error - all other routines will shutdown (because context cancels)
// If it returns error - all other routines will shutdown (because context cancels)
-}, nil)
+},)
r.Go(func(ctx context.Context) error {
// If this routine produce no error - it restarts
// If it returns error - all other routines will shutdown (because context cancels)
-}, rutina.RunOpt.SetOnDone(rutina.Restart))
+}, SetOnDone(rutina.Restart))
r.Go(func(ctx context.Context) error {
// If this routine produce no error - all other routines will shutdown (because context cancels)
- // If it returns error - it will be restarted
-}, rutina.RunOpt.SetOnDone(rutina.Shutdown).SetOnError(rutina.Restart))
+ // If it returns error - it will be restarted (maximum 10 times)
+}, SetOnError(rutina.Restart), SetMaxCount(10))
r.Go(func(ctx context.Context) error {
- // If this routine stopped by any case - all other routines will shutdown (because context cancels)
-}, rutina.RunOpt.SetOnDone(rutina.Shutdown))
+ // If this routine stopped by any case other routines will work as before.
+}, SetOnDone(rutina.DoNothing))
r.ListenOsSignals() // Shutdown all routines by OS signal
```
@@ -104,7 +107,7 @@ r.Kill(id) // Closes individual context for #id routine that must shutdown it
### List of routines
```go
-list := r.Processes()
+list := r.Processes()
```
Returns ids of working routines
diff --git a/example/http_server.go b/example/http_server.go
index 6bb7157..8b723b9 100755
--- a/example/http_server.go
+++ b/example/http_server.go
@@ -7,13 +7,14 @@ import (
"io"
"log"
"net/http"
+ "os"
- "github.com/neonxp/rutina"
+ "github.com/neonxp/rutina/v3"
)
func main() {
// New instance with builtin context
- r := rutina.New(rutina.Opt.SetListenOsSignals(true))
+ r := rutina.New(rutina.ListenOsSignals(os.Interrupt, os.Kill))
srv := &http.Server{Addr: ":8080"}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@@ -27,14 +28,14 @@ func main() {
}
log.Println("Server stopped")
return nil
- }, rutina.RunOpt.SetOnDone(rutina.Shutdown))
+ })
// Gracefully stopping server when context canceled
r.Go(func(ctx context.Context) error {
<-ctx.Done()
log.Println("Stopping server...")
return srv.Shutdown(ctx)
- }, nil)
+ })
if err := r.Wait(); err != nil {
log.Fatal(err)
diff --git a/example/policies.go b/example/policies.go
index 8bb4643..87081cb 100644
--- a/example/policies.go
+++ b/example/policies.go
@@ -6,26 +6,27 @@ import (
"context"
"errors"
"log"
+ "os"
"time"
- "github.com/neonxp/rutina"
+ "github.com/neonxp/rutina/v3"
)
func main() {
// New instance with builtin context
- r := rutina.New(rutina.Opt.SetLogger(log.Printf).SetListenOsSignals(true))
+ r := rutina.New(rutina.Logger(log.Printf), rutina.ListenOsSignals(os.Interrupt, os.Kill))
r.Go(func(ctx context.Context) error {
<-time.After(1 * time.Second)
log.Println("Do something 1 second without errors and restart")
return nil
- }, nil)
+ })
r.Go(func(ctx context.Context) error {
<-time.After(2 * time.Second)
log.Println("Do something 2 seconds without errors and do nothing")
return nil
- }, nil)
+ })
r.Go(func(ctx context.Context) error {
select {
@@ -34,7 +35,7 @@ func main() {
case <-ctx.Done():
return nil
}
- }, rutina.RunOpt.SetOnError(rutina.Restart).SetMaxCount(10))
+ }, rutina.OnError(rutina.Restart), rutina.MaxCount(10))
r.Go(func(ctx context.Context) error {
select {
@@ -43,7 +44,7 @@ func main() {
case <-ctx.Done():
return nil
}
- }, rutina.RunOpt.SetOnError(rutina.Restart).SetTimeout(10*time.Second))
+ }, rutina.OnError(rutina.Restart), rutina.SetTimeout(10*time.Second))
if err := r.Wait(); err != nil {
log.Fatal(err)
diff --git a/options.go b/options.go
index 70a9965..9aed7df 100644
--- a/options.go
+++ b/options.go
@@ -2,41 +2,62 @@ package rutina
import (
"context"
+ "os"
"time"
)
type Options struct {
ParentContext context.Context
- ListenOsSignals bool
+ ListenOsSignals []os.Signal
Logger func(format string, v ...interface{})
Errors chan error
}
-func (o *Options) SetParentContext(ctx context.Context) *Options {
- o.ParentContext = ctx
- return o
+func ParentContext(ctx context.Context) Options {
+ return Options{
+ ParentContext: ctx,
+ }
}
-func (o *Options) SetListenOsSignals(listenOsSignals bool) *Options {
- o.ListenOsSignals = listenOsSignals
- return o
+func ListenOsSignals(signals ...os.Signal) Options {
+ return Options{
+ ListenOsSignals: signals,
+ }
}
-func (o *Options) SetLogger(l logger) *Options {
- o.Logger = l
- return o
+func Logger(l logger) Options {
+ return Options{
+ Logger: l,
+ }
}
-func (o *Options) SetErrors(errCh chan error) *Options {
- o.Errors = errCh
- return o
+func Errors(errCh chan error) Options {
+ return Options{
+ Errors: errCh,
+ }
}
-var Opt = &Options{
- ParentContext: context.Background(),
- ListenOsSignals: false,
- Logger: nil,
- Errors: nil,
+func composeOptions(opts []Options) Options {
+ res := Options{
+ ParentContext: context.Background(),
+ Logger: nopLogger,
+ ListenOsSignals: []os.Signal{},
+ }
+ for _, o := range opts {
+ if o.ParentContext != nil {
+ res.ParentContext = o.ParentContext
+ }
+ if o.Errors != nil {
+ res.Errors = o.Errors
+ }
+ if o.ListenOsSignals != nil {
+ res.ListenOsSignals = o.ListenOsSignals
+ }
+ if o.Logger != nil {
+ res.Logger = o.Logger
+ }
+ }
+ return res
}
type Policy int
@@ -54,29 +75,48 @@ type RunOptions struct {
MaxCount *int
}
-func (rp *RunOptions) SetOnDone(policy Policy) *RunOptions {
- rp.OnDone = policy
- return rp
+func OnDone(policy Policy) RunOptions {
+ return RunOptions{
+ OnDone: policy,
+ }
}
-func (rp *RunOptions) SetOnError(policy Policy) *RunOptions {
- rp.OnError = policy
- return rp
+func OnError(policy Policy) RunOptions {
+ return RunOptions{
+ OnError: policy,
+ }
}
-func (rp *RunOptions) SetTimeout(timeout time.Duration) *RunOptions {
- rp.Timeout = &timeout
- return rp
+func Timeout(timeout time.Duration) RunOptions {
+ return RunOptions{
+ Timeout: &timeout,
+ }
}
-func (rp *RunOptions) SetMaxCount(maxCount int) *RunOptions {
- rp.MaxCount = &maxCount
- return rp
+func MaxCount(maxCount int) RunOptions {
+ return RunOptions{
+ MaxCount: &maxCount,
+ }
}
-var RunOpt = &RunOptions{
- OnDone: DoNothing,
- OnError: Shutdown,
- Timeout: nil,
- MaxCount: nil,
+func composeRunOptions(opts []RunOptions) RunOptions {
+ res := RunOptions{
+ OnDone: Shutdown,
+ OnError: Shutdown,
+ }
+ for _, o := range opts {
+ if o.OnDone != res.OnDone {
+ res.OnDone = o.OnDone
+ }
+ if o.OnError != res.OnError {
+ res.OnError = o.OnError
+ }
+ if o.MaxCount != nil {
+ res.MaxCount = o.MaxCount
+ }
+ if o.Timeout != nil {
+ res.Timeout = o.Timeout
+ }
+ }
+ return res
}
diff --git a/rutina.go b/rutina.go
index cd5748e..3cf140c 100755
--- a/rutina.go
+++ b/rutina.go
@@ -22,7 +22,7 @@ type logger func(format string, v ...interface{})
var nopLogger = func(format string, v ...interface{}) {}
-//Rutina is routine manager
+// Rutina is routine manager
type Rutina struct {
ctx context.Context // State of application (started/stopped)
Cancel func() // Cancel func that stops all routines
@@ -39,19 +39,13 @@ type Rutina struct {
}
// New instance with builtin context
-func New(opts *Options) *Rutina {
+func New(opts ...Options) *Rutina {
if opts == nil {
- opts = Opt
+ opts = []Options{}
}
- ctx, cancel := context.WithCancel(opts.ParentContext)
+ options := composeOptions(opts)
+ ctx, cancel := context.WithCancel(options.ParentContext)
var counter uint64
- if opts.Logger == nil {
- opts.Logger = nopLogger
- }
- var signals []os.Signal
- if opts.ListenOsSignals {
- signals = []os.Signal{os.Kill, os.Interrupt}
- }
return &Rutina{
ctx: ctx,
Cancel: cancel,
@@ -59,20 +53,18 @@ func New(opts *Options) *Rutina {
onceErr: sync.Once{},
onceWait: sync.Once{},
err: nil,
- logger: opts.Logger,
+ logger: options.Logger,
counter: &counter,
- errCh: opts.Errors,
- autoListenSignals: signals,
+ errCh: options.Errors,
+ autoListenSignals: options.ListenOsSignals,
processes: map[uint64]*process{},
mu: sync.Mutex{},
}
}
// Go routine
-func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint64 {
- if opts == nil {
- opts = RunOpt
- }
+func (r *Rutina) Go(doer func(ctx context.Context) error, opts ...RunOptions) uint64 {
+ options := composeRunOptions(opts)
// Check that context is not canceled yet
if r.ctx.Err() != nil {
return 0
@@ -83,11 +75,11 @@ func (r *Rutina) Go(doer func(ctx context.Context) error, opts *RunOptions) uint
process := process{
id: id,
doer: doer,
- onDone: opts.OnDone,
- onError: opts.OnError,
- restartLimit: opts.MaxCount,
+ onDone: options.OnDone,
+ onError: options.OnError,
+ restartLimit: options.MaxCount,
restartCount: 0,
- timeout: opts.Timeout,
+ timeout: options.Timeout,
}
r.processes[id] = &process
r.mu.Unlock()