aboutsummaryrefslogtreecommitdiff
path: root/rutina.go
diff options
context:
space:
mode:
authorAlexander Kiryukhin <alexander@kiryukhin.su>2019-01-17 07:54:39 +0300
committerAlexander Kiryukhin <alexander@kiryukhin.su>2019-01-17 07:54:39 +0300
commitc5776ba6a3d8303650208ee8abc8f63cadf5127a (patch)
treecf92f3d64a46f7167b9857688e5344554b2a8d69 /rutina.go
parentd64ca3bd06c6a2c702f3604d95658566907a9beb (diff)
Many changesv0.2.0
Diffstat (limited to 'rutina.go')
-rwxr-xr-xrutina.go66
1 files changed, 45 insertions, 21 deletions
diff --git a/rutina.go b/rutina.go
index 0f9a9da..2a28682 100755
--- a/rutina.go
+++ b/rutina.go
@@ -2,58 +2,85 @@ package rutina
import (
"context"
+ "log"
"os"
"os/signal"
"sync"
- "syscall"
+ "sync/atomic"
)
//Rutina is routine manager
type Rutina struct {
- ctx context.Context
- cancel func()
- wg sync.WaitGroup
- o sync.Once
- err error
+ ctx context.Context
+ Cancel func()
+ wg sync.WaitGroup
+ o sync.Once
+ err error
+ logger *log.Logger
+ counter *uint64
+ cancelByError bool
}
// New instance with builtin context
-func New() (*Rutina, context.Context) {
- return WithContext(context.Background())
+func New(opts ...Option) (*Rutina, context.Context) {
+ ctx, cancel := context.WithCancel(context.Background())
+ var counter uint64 = 0
+ r := &Rutina{ctx: ctx, Cancel: cancel, counter: &counter, cancelByError: false}
+ return r.WithOptions(opts...), ctx
}
-// WithContext is constructor that takes context from outside
-func WithContext(ctx context.Context) (*Rutina, context.Context) {
- ctx, cancel := context.WithCancel(ctx)
-
- return &Rutina{ctx: ctx, cancel: cancel}, ctx
+func (r *Rutina) WithOptions(opts ...Option) *Rutina {
+ nr := *r
+ for _, o := range opts {
+ o.apply(&nr)
+ }
+ return &nr
}
// Go routine
func (r *Rutina) Go(doer func(ctx context.Context) error) {
r.wg.Add(1)
go func() {
+ id := atomic.AddUint64(r.counter, 1)
defer func() {
+ if r.logger != nil {
+ r.logger.Printf("stopping #%d", id)
+ }
r.wg.Done()
- if r.cancel != nil {
- r.cancel()
+ if !r.cancelByError {
+ r.Cancel()
}
}()
+ if r.logger != nil {
+ r.logger.Printf("starting #%d", id)
+ }
if err := doer(r.ctx); err != nil {
+ if r.logger != nil {
+ r.logger.Printf("error at #%d : %v", id, err)
+ }
r.o.Do(func() {
r.err = err
})
+ if r.cancelByError {
+ r.Cancel()
+ }
}
}()
}
// OS signals handler
-func (r *Rutina) ListenTermSignals() {
+func (r *Rutina) ListenOsSignals() {
r.Go(func(ctx context.Context) error {
sig := make(chan os.Signal, 1)
- signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
+ signal.Notify(sig, os.Interrupt, os.Kill)
select {
- case <-sig:
+ case s := <-sig:
+ if r.logger != nil {
+ r.logger.Printf("stopping by OS signal (%v)", s)
+ }
+ if r.cancelByError {
+ r.Cancel()
+ }
case <-ctx.Done():
}
return nil
@@ -63,8 +90,5 @@ func (r *Rutina) ListenTermSignals() {
// Wait all routines and returns first error or nil if all routines completes without errors
func (r *Rutina) Wait() error {
r.wg.Wait()
- if r.cancel != nil {
- r.cancel()
- }
return r.err
}