diff options
author | Alexander Kiryukhin <a.kiryukhin@mail.ru> | 2022-05-22 16:37:48 +0300 |
---|---|---|
committer | Alexander Kiryukhin <a.kiryukhin@mail.ru> | 2022-05-22 16:37:48 +0300 |
commit | 4a81eff217c40c459c9a9ed4f318b4dd9bc5ee8a (patch) | |
tree | 35e6d3b1f80af80fddedb26d5543377931abfe2f /rpc | |
parent | c74596c6a6a741e3365a2f372de6e7cdf2583fdc (diff) |
Middlewares and optionsv1.1.0
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/meta.go | 3 | ||||
-rw-r--r-- | rpc/middleware.go | 43 | ||||
-rw-r--r-- | rpc/options.go | 42 | ||||
-rw-r--r-- | rpc/server.go | 86 |
4 files changed, 137 insertions, 37 deletions
diff --git a/rpc/meta.go b/rpc/meta.go deleted file mode 100644 index 77de69b..0000000 --- a/rpc/meta.go +++ /dev/null @@ -1,3 +0,0 @@ -package rpc - -type Meta map[string]interface{} diff --git a/rpc/middleware.go b/rpc/middleware.go new file mode 100644 index 0000000..cd99823 --- /dev/null +++ b/rpc/middleware.go @@ -0,0 +1,43 @@ +//Package rpc provides abstract rpc server +// +//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev> +// +//This file is part of go.neonxp.dev/jsonrpc2 project. +// +//This program is free software: you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation, either version 3 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program. If not, see <https://www.gnu.org/licenses/>. + +package rpc + +import ( + "context" + "strings" + "time" +) + +type Middleware func(handler RpcHandler) RpcHandler + +type RpcHandler func(ctx context.Context, req *RpcRequest) *RpcResponse + +func LoggerMiddleware(logger Logger) Middleware { + return func(handler RpcHandler) RpcHandler { + return func(ctx context.Context, req *RpcRequest) *RpcResponse { + t1 := time.Now().UnixMicro() + resp := handler(ctx, req) + t2 := time.Now().UnixMicro() + args := strings.ReplaceAll(string(req.Params), "\n", "") + logger.Logf("rpc call=%s, args=%s, take=%dμs", req.Method, args, (t2 - t1)) + return resp + } + } +} diff --git a/rpc/options.go b/rpc/options.go new file mode 100644 index 0000000..825dbca --- /dev/null +++ b/rpc/options.go @@ -0,0 +1,42 @@ +//Package rpc provides abstract rpc server +// +//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev> +// +//This file is part of go.neonxp.dev/jsonrpc2 project. +// +//This program is free software: you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation, either version 3 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program. If not, see <https://www.gnu.org/licenses/>. + +package rpc + +import "go.neonxp.dev/jsonrpc2/transport" + +type Option func(s *RpcServer) + +func WithTransport(transport transport.Transport) Option { + return func(s *RpcServer) { + s.transports = append(s.transports, transport) + } +} + +func WithMiddleware(mw Middleware) Option { + return func(s *RpcServer) { + s.middlewares = append(s.middlewares, mw) + } +} + +func WithLogger(l Logger) Option { + return func(s *RpcServer) { + s.logger = l + } +} diff --git a/rpc/server.go b/rpc/server.go index 9c5e847..b2f6158 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -33,20 +33,29 @@ import ( const version = "2.0" type RpcServer struct { - Logger Logger - IgnoreNotifications bool - handlers map[string]Handler - transports []transport.Transport - mu sync.RWMutex + logger Logger + handlers map[string]Handler + middlewares []Middleware + transports []transport.Transport + mu sync.RWMutex } -func New() *RpcServer { - return &RpcServer{ - Logger: nopLogger{}, - IgnoreNotifications: true, - handlers: map[string]Handler{}, - transports: []transport.Transport{}, - mu: sync.RWMutex{}, +func New(opts ...Option) *RpcServer { + s := &RpcServer{ + logger: nopLogger{}, + handlers: map[string]Handler{}, + transports: []transport.Transport{}, + mu: sync.RWMutex{}, + } + for _, opt := range opts { + opt(s) + } + return s +} + +func (r *RpcServer) Use(opts ...Option) { + for _, opt := range opts { + opt(r) } } @@ -56,10 +65,6 @@ func (r *RpcServer) Register(method string, handler Handler) { r.handlers[method] = handler } -func (r *RpcServer) AddTransport(transport transport.Transport) { - r.transports = append(r.transports, transport) -} - func (r *RpcServer) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) for _, t := range r.transports { @@ -70,25 +75,27 @@ func (r *RpcServer) Run(ctx context.Context) error { return eg.Wait() } -func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) { +func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer, parallel bool) { dec := json.NewDecoder(rd) enc := json.NewEncoder(w) mu := sync.Mutex{} wg := sync.WaitGroup{} for { - req := new(rpcRequest) + req := new(RpcRequest) if err := dec.Decode(req); err != nil { if err == io.EOF { break } - r.Logger.Logf("Can't read body: %v", err) + r.logger.Logf("Can't read body: %v", err) WriteError(ErrCodeParseError, enc) break } - wg.Add(1) - go func(req *rpcRequest) { - defer wg.Done() - resp := r.callMethod(ctx, req) + exec := func() { + h := r.callMethod + for _, m := range r.middlewares { + h = m(h) + } + resp := h(ctx, req) if req.Id == nil { // notification request return @@ -96,23 +103,34 @@ func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) { mu.Lock() defer mu.Unlock() if err := enc.Encode(resp); err != nil { - r.Logger.Logf("Can't write response: %v", err) + r.logger.Logf("Can't write response: %v", err) WriteError(ErrCodeInternalError, enc) } if w, canFlush := w.(Flusher); canFlush { w.Flush() } - }(req) + } + if parallel { + wg.Add(1) + go func(req *RpcRequest) { + defer wg.Done() + exec() + }(req) + } else { + exec() + } + } + if parallel { + wg.Wait() } - wg.Wait() } -func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcResponse { +func (r *RpcServer) callMethod(ctx context.Context, req *RpcRequest) *RpcResponse { r.mu.RLock() h, ok := r.handlers[req.Method] r.mu.RUnlock() if !ok { - return &rpcResponse{ + return &RpcResponse{ Jsonrpc: version, Error: ErrorFromCode(ErrCodeMethodNotFound), Id: req.Id, @@ -120,14 +138,14 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons } resp, err := h(ctx, req.Params) if err != nil { - r.Logger.Logf("User error %v", err) - return &rpcResponse{ + r.logger.Logf("User error %v", err) + return &RpcResponse{ Jsonrpc: version, Error: err, Id: req.Id, } } - return &rpcResponse{ + return &RpcResponse{ Jsonrpc: version, Result: resp, Id: req.Id, @@ -135,20 +153,20 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons } func WriteError(code int, enc *json.Encoder) { - enc.Encode(rpcResponse{ + enc.Encode(RpcResponse{ Jsonrpc: version, Error: ErrorFromCode(code), }) } -type rpcRequest struct { +type RpcRequest struct { Jsonrpc string `json:"jsonrpc"` Method string `json:"method"` Params json.RawMessage `json:"params"` Id any `json:"id"` } -type rpcResponse struct { +type RpcResponse struct { Jsonrpc string `json:"jsonrpc"` Result json.RawMessage `json:"result,omitempty"` Error error `json:"error,omitempty"` |