aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xREADME.md66
-rwxr-xr-x[-rw-r--r--]example/http_server.go21
-rwxr-xr-x[-rw-r--r--]go.mod0
-rwxr-xr-x[-rw-r--r--]go.sum0
-rwxr-xr-xoptions.go51
-rwxr-xr-xrutina.go66
-rwxr-xr-xrutina_test.go7
7 files changed, 165 insertions, 46 deletions
diff --git a/README.md b/README.md
index 03236eb..2034fe6 100755
--- a/README.md
+++ b/README.md
@@ -1,13 +1,67 @@
# rutina
-Package Rutina (russian "рутина" - ordinary boring everyday work) works like https://godoc.org/golang.org/x/sync/errgroup with small differences:
+Package Rutina (russian "рутина" - ordinary boring everyday work) is routine orchestrator for your application.
-1) propagates context to routines
-2) cancels context when any routine ends with any result (not only when error result)
+It seems like https://godoc.org/golang.org/x/sync/errgroup with some different:
+
+1) propagates context to every routines. So routine can check if context stopped (`ctx.Done()`).
+2) by default cancels context when any routine ends with any result (not only when error result). Can be configured by option `OptionCancelByError`.
+3) already has optional signal handler `ListenOsSignals()`
## When it need?
-Usually, when yout program consists of several routines (i.e.: http server, metrics server and os signals subscriber) and you want to stop all routines when one of them ends (i.e.: by TERM os signal in signal subscriber).
+Usually, when your program consists of several routines (i.e.: http server, metrics server and os signals subscriber) and you want to stop all routines when one of them ends (i.e.: by TERM os signal in signal subscriber).
+
+## Usage
+
+### New instance
+
+`r := rutina.New()`
+
+or with options (see below):
+
+`r := rutina.New(...Option)` or `r.WithOptions(...Option)`
+
+### Start new routine
+
+```
+r.Go(func (ctx context.Context) error {
+ ...do something...
+})
+```
+
+### Wait routines to complete
+
+```
+err := r.Wait()
+```
+
+Here err = first error in any routine
+
+## Options
+
+### Usage options
+
+`r := rutina.New(option1, option2, ...)`
+or
+```
+r := rutina.New()
+r = r.WithOptions(option1, option2, ...) // Returns new instance of Rutina!
+```
+
+### Logger
+
+`rutina.WithLogger(logger log.Logger) Option` or `rutina.WithStdLogger() Option`
+
+### Custom context
+
+`rutina.WithContext(ctx context.Context) Option`
+
+### Cancel only by errors
+
+`rutina.WithCancelByError() Option`
+
+If this option set, rutina doesnt cancel context if routine completed without error.
## Example
@@ -15,7 +69,7 @@ HTTP server with graceful shutdown (`example/http_server.go`):
```
// New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx)
-r, _ := rutina.New()
+r, _ := rutina.New(rutina.WithStdLogger())
srv := &http.Server{Addr: ":8080"}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@@ -39,7 +93,7 @@ r.Go(func(ctx context.Context) error {
})
// OS signals listener
-r.ListenTermSignals()
+r.ListenOsSignals()
if err := r.Wait(); err != nil {
log.Fatal(err)
diff --git a/example/http_server.go b/example/http_server.go
index a8072bb..ef25567 100644..100755
--- a/example/http_server.go
+++ b/example/http_server.go
@@ -4,19 +4,15 @@ package main
import (
"context"
+ "github.com/neonxp/rutina"
"io"
"log"
"net/http"
- "os"
- "os/signal"
- "syscall"
-
- "github.com/neonxp/rutina"
)
func main() {
- // New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx)
- r, _ := rutina.New()
+ // New instance with builtin context. Alternative: r, ctx := rutina.OptionContext(ctx)
+ r, _ := rutina.New(rutina.WithStdLogger())
srv := &http.Server{Addr: ":8080"}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@@ -40,16 +36,7 @@ func main() {
})
// OS signals subscriber
- r.Go(func(ctx context.Context) error {
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
- select {
- case <-sig:
- log.Println("TERM or INT signal received")
- case <-ctx.Done():
- }
- return nil
- })
+ r.ListenOsSignals()
if err := r.Wait(); err != nil {
log.Fatal(err)
diff --git a/go.mod b/go.mod
index d807050..d807050 100644..100755
--- a/go.mod
+++ b/go.mod
diff --git a/go.sum b/go.sum
index e69de29..e69de29 100644..100755
--- a/go.sum
+++ b/go.sum
diff --git a/options.go b/options.go
new file mode 100755
index 0000000..8144dde
--- /dev/null
+++ b/options.go
@@ -0,0 +1,51 @@
+package rutina
+
+import (
+ "context"
+ "log"
+ "os"
+)
+
+type Option interface {
+ apply(*Rutina)
+}
+
+type OptionContext struct {
+ Context context.Context
+}
+
+func WithContext(context context.Context) *OptionContext {
+ return &OptionContext{Context: context}
+}
+
+func (o OptionContext) apply(r *Rutina) {
+ ctx, cancel := context.WithCancel(o.Context)
+ r.ctx = ctx
+ r.Cancel = cancel
+}
+
+type OptionLogger struct {
+ Logger *log.Logger
+}
+
+func WithLogger(logger *log.Logger) *OptionLogger {
+ return &OptionLogger{Logger: logger}
+}
+
+func WithStdLogger() *OptionLogger {
+ return &OptionLogger{Logger: log.New(os.Stdout, "rutina", log.LstdFlags)}
+}
+
+func (o OptionLogger) apply(r *Rutina) {
+ r.logger = o.Logger
+}
+
+type OptionCancelByError struct{}
+
+func WithCancelByError() *OptionCancelByError {
+ return &OptionCancelByError{}
+}
+
+func (OptionCancelByError) apply(r *Rutina) {
+ r.cancelByError = true
+}
diff --git a/rutina.go b/rutina.go
index 0f9a9da..2a28682 100755
--- a/rutina.go
+++ b/rutina.go
@@ -2,58 +2,85 @@ package rutina
import (
"context"
+ "log"
"os"
"os/signal"
"sync"
- "syscall"
+ "sync/atomic"
)
//Rutina is routine manager
type Rutina struct {
- ctx context.Context
- cancel func()
- wg sync.WaitGroup
- o sync.Once
- err error
+ ctx context.Context
+ Cancel func()
+ wg sync.WaitGroup
+ o sync.Once
+ err error
+ logger *log.Logger
+ counter *uint64
+ cancelByError bool
}
// New instance with builtin context
-func New() (*Rutina, context.Context) {
- return WithContext(context.Background())
+func New(opts ...Option) (*Rutina, context.Context) {
+ ctx, cancel := context.WithCancel(context.Background())
+ var counter uint64 = 0
+ r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false}
+ return r.WithOptions(opts...), ctx
}
-// WithContext is constructor that takes context from outside
-func WithContext(ctx context.Context) (*Rutina, context.Context) {
- ctx, cancel := context.WithCancel(ctx)
-
- return &Rutina{ctx: ctx, cancel: cancel}, ctx
+func (r *Rutina) WithOptions(opts ...Option) *Rutina {
+ nr := *r
+ for _, o := range opts {
+ o.apply(&nr)
+ }
+ return &nr
}
// Go routine
func (r *Rutina) Go(doer func(ctx context.Context) error) {
r.wg.Add(1)
go func() {
+ id := atomic.AddUint64(r.counter, 1)
defer func() {
+ if r.logger != nil {
+ r.logger.Printf("stopping #%d", id)
+ }
r.wg.Done()
- if r.cancel != nil {
- r.cancel()
+ if !r.cancelByError {
+ r.Cancel()
}
}()
+ if r.logger != nil {
+ r.logger.Printf("starting #%d", id)
+ }
if err := doer(r.ctx); err != nil {
+ if r.logger != nil {
+ r.logger.Printf("error at #%d : %v", id, err)
+ }
r.o.Do(func() {
r.err = err
})
+ if r.cancelByError {
+ r.Cancel()
+ }
}
}()
}
// OS signals handler
-func (r *Rutina) ListenTermSignals() {
+func (r *Rutina) ListenOsSignals() {
r.Go(func(ctx context.Context) error {
sig := make(chan os.Signal, 1)
- signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
+ signal.Notify(sig, os.Interrupt, os.Kill)
select {
- case <-sig:
+ case s := <-sig:
+ if r.logger != nil {
+ r.logger.Printf("stopping by OS signal (%v)", s)
+ }
+ if r.cancelByError {
+ r.Cancel()
+ }
case <-ctx.Done():
}
return nil
@@ -63,8 +90,5 @@ func (r *Rutina) ListenTermSignals() {
// Wait all routines and returns first error or nil if all routines completes without errors
func (r *Rutina) Wait() error {
r.wg.Wait()
- if r.cancel != nil {
- r.cancel()
- }
return r.err
}
diff --git a/rutina_test.go b/rutina_test.go
index 361d5eb..077f68e 100755
--- a/rutina_test.go
+++ b/rutina_test.go
@@ -8,7 +8,10 @@ import (
)
func TestSuccess(t *testing.T) {
- r, _ := New()
+ r, _ := New(
+ WithStdLogger(),
+ WithContext(context.Background()),
+ )
counter := 0
f := func(name string, ttl time.Duration) error {
counter++
@@ -37,7 +40,7 @@ func TestSuccess(t *testing.T) {
}
func TestError(t *testing.T) {
- r, _ := New()
+ r, _ := New(WithCancelByError())
f := func(name string, ttl time.Duration) error {
<-time.After(ttl)
t.Log(name)