diff options
-rwxr-xr-x[-rw-r--r--] | .gitignore | 2 | ||||
-rwxr-xr-x[-rw-r--r--] | LICENSE | 0 | ||||
-rwxr-xr-x[-rw-r--r--] | README.md | 59 | ||||
-rw-r--r-- | example/http_server.go | 56 | ||||
-rwxr-xr-x | rutina.go | 54 | ||||
-rwxr-xr-x | rutina_test.go | 89 |
6 files changed, 259 insertions, 1 deletions
diff --git a/.gitignore b/.gitignore index f1c181e..b3efc39 100644..100755 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out + +.idea
\ No newline at end of file diff --git a/README.md b/README.md index c8c7853..c7beb77 100644..100755 --- a/README.md +++ b/README.md @@ -1 +1,58 @@ -# rutina
\ No newline at end of file +# rutina + +Package Rutina (russian "рутина" - ordinary boring everyday work) works like https://godoc.org/golang.org/x/sync/errgroup with small differences: + +1) propagates context to routines +2) cancels context when any routine ends with any result (not only when error result) + +## When it need? + +Usually, when yout program consists of several routines (i.e.: http server, metrics server and os signals subscriber) and you want to stop all routines when one of them ends (i.e.: by TERM os signal in signal subscriber). + +## Example + +HTTP server with graceful shutdown (`example/http_server.go`): + +``` +// New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx) +r := rutina.New() + +srv := &http.Server{Addr: ":8080"} +http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, "hello world\n") +}) + +// Starting http server and listen connections +r.Go(func(ctx context.Context) error { + if err := srv.ListenAndServe(); err != nil { + return err + } + log.Println("Server stopped") + return nil +}) + +// Gracefully stoping server when context canceled +r.Go(func(ctx context.Context) error { + <-ctx.Done() + log.Println("Stopping server...") + return srv.Shutdown(ctx) +}) + +// OS signals subscriber +r.Go(func(ctx context.Context) error { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) + select { + case <-sig: + log.Println("TERM or INT signal received") + case <-ctx.Done(): + } + return nil +}) + +if err := r.Wait(); err != nil { + log.Fatal(err) +} + +log.Println("All routines successfully stopped") +```
\ No newline at end of file diff --git a/example/http_server.go b/example/http_server.go new file mode 100644 index 0000000..b1d0181 --- /dev/null +++ b/example/http_server.go @@ -0,0 +1,56 @@ +package main + +import ( + "context" + "io" + "log" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/neonxp/rutina" +) + +func main() { + // New instance with builtin context. Alternative: r, ctx := rutina.WithContext(ctx) + r, _ := rutina.New() + + srv := &http.Server{Addr: ":8080"} + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, "hello world\n") + }) + + // Starting http server and listen connections + r.Go(func(ctx context.Context) error { + if err := srv.ListenAndServe(); err != nil { + return err + } + log.Println("Server stopped") + return nil + }) + + // Gracefully stoping server when context canceled + r.Go(func(ctx context.Context) error { + <-ctx.Done() + log.Println("Stopping server...") + return srv.Shutdown(ctx) + }) + + // OS signals subscriber + r.Go(func(ctx context.Context) error { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) + select { + case <-sig: + log.Println("TERM or INT signal received") + case <-ctx.Done(): + } + return nil + }) + + if err := r.Wait(); err != nil { + log.Fatal(err) + } + log.Println("All routines successfully stopped") +} diff --git a/rutina.go b/rutina.go new file mode 100755 index 0000000..68bf048 --- /dev/null +++ b/rutina.go @@ -0,0 +1,54 @@ +package rutina + +import ( + "context" + "sync" +) + +//Rutina is routine manager +type Rutina struct { + ctx context.Context + cancel func() + wg sync.WaitGroup + o sync.Once + err error +} + +// New instance with builtin context +func New() (*Rutina, context.Context) { + return WithContext(context.Background()) +} + +// WithContext is constructor that takes context from outside +func WithContext(ctx context.Context) (*Rutina, context.Context) { + ctx, cancel := context.WithCancel(ctx) + + return &Rutina{ctx: ctx, cancel: cancel}, ctx +} + +// Go routine +func (r *Rutina) Go(doer func(ctx context.Context) error) { + r.wg.Add(1) + go func() { + defer func() { + r.wg.Done() + if r.cancel != nil { + r.cancel() + } + }() + if err := doer(r.ctx); err != nil { + r.o.Do(func() { + r.err = err + }) + } + }() +} + +// Wait all routines and returns first error or nil if all routines completes without errors +func (r *Rutina) Wait() error { + r.wg.Wait() + if r.cancel != nil { + r.cancel() + } + return r.err +} diff --git a/rutina_test.go b/rutina_test.go new file mode 100755 index 0000000..cff73c4 --- /dev/null +++ b/rutina_test.go @@ -0,0 +1,89 @@ +package rutina + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" +) + +func TestSuccess(t *testing.T) { + r, _ := New() + counter := 0 + f := func(name string, ttl time.Duration) error { + counter++ + <-time.After(ttl) + counter-- + t.Log(name) + return nil + } + r.Go(func(ctx context.Context) error { + return f("one", 1*time.Second) + }) + r.Go(func(ctx context.Context) error { + return f("two", 2*time.Second) + }) + r.Go(func(ctx context.Context) error { + return f("three", 3*time.Second) + }) + if err := r.Wait(); err != nil { + t.Error("Unexpected error", err) + } + if counter == 0 { + t.Log("All routines done") + } else { + t.Error("Not all routines stopped") + } +} + +func TestError(t *testing.T) { + r, _ := New() + f := func(name string, ttl time.Duration) error { + <-time.After(ttl) + t.Log(name) + return errors.New("error from " + name) + } + r.Go(func(ctx context.Context) error { + return f("one", 1*time.Second) + }) + r.Go(func(ctx context.Context) error { + return f("two", 2*time.Second) + }) + r.Go(func(ctx context.Context) error { + return f("three", 3*time.Second) + }) + if err := r.Wait(); err != nil { + if err.Error() != "error from one" { + t.Error("Must be error from first routine") + } + t.Log(err) + } + t.Log("All routines done") +} + +func TestContext(t *testing.T) { + r, _ := New() + cc := false + r.Go(func(ctx context.Context) error { + <-time.After(1 * time.Second) + return nil + }) + r.Go(func(ctx context.Context) error { + select { + case <-ctx.Done(): + cc = true + return nil + case <-time.After(3 * time.Second): + return errors.New("Timeout") + } + }) + if err := r.Wait(); err != nil { + t.Error("Unexpected error", err) + } + if cc { + t.Log("Second routine succesfuly complete by context done") + } else { + t.Error("Routine not completed by context") + } +} |