aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Kiryukhin <a.kiryukhin@mail.ru>2022-05-21 20:38:21 +0300
committerAlexander Kiryukhin <a.kiryukhin@mail.ru>2022-05-21 20:38:21 +0300
commit81389df9484c28dfcec1cf7592b8d0f8b7e4e8e1 (patch)
tree7a7d0440481e45b999e828b1e5ba2b28129658bc
parentd4708a3665e546eea57611b17441ad9b8c89e9a4 (diff)
Improvments. Breaking changes
-rw-r--r--README.md72
-rw-r--r--example/main.go (renamed from examples/http/main.go)22
-rw-r--r--example/test.http82
-rw-r--r--examples/http/test.http42
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--http/server.go52
-rw-r--r--rpc/server.go89
-rw-r--r--rpc/wrapper.go2
-rw-r--r--transport/http.go40
-rw-r--r--transport/tcp.go22
-rw-r--r--transport/transport.go14
12 files changed, 279 insertions, 162 deletions
diff --git a/README.md b/README.md
index 5938fdf..708675e 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,4 @@
+
# JSON-RPC 2.0
Golang implementation of JSON-RPC 2.0 server with generics.
@@ -6,58 +7,79 @@ Go 1.18+ required
## Features:
-- [x] Batch request and responses
+- [x] HTTP/HTTPS transport
+- [x] TCP transport
- [ ] WebSocket transport
## Usage (http transport)
-1. Create JSON-RPC/HTTP server:
- ```go
- import "go.neonxp.dev/jsonrpc2/http"
+1. Create JSON-RPC server:
+```go
+ import "go.neonxp.dev/jsonrpc2/rpc"
+ ...
+ s := rpc.New()
+```
+
+2. Add required transport(s):
+```go
+ import "go.neonxp.dev/jsonrpc2/transport"
...
- s := http.New()
- ```
-2. Write handler:
- ```go
+ s.AddTransport(&transport.HTTP{Bind: ":8000"})
+ s.AddTransport(&transport.TCP{Bind: ":3000"})
+```
+
+3. Write handler:
+```go
func Multiply(ctx context.Context, args *Args) (int, error) {
return args.A * args.B, nil
}
- ```
+```
+
Handler must have exact two arguments (context and input of any json serializable type) and exact two return values (output of any json serializable type and error)
3. Wrap handler with `rpc.Wrap` method and register it in server:
- ```go
- s.Register("multiply", rpc.Wrap(Multiply))
- ```
-4. Use server as common http handler:
- ```go
- http.ListenAndServe(":8000", s)
- ```
+```go
+ s.Register("multiply", rpc.H(Multiply))
+```
+
+4. Run RPC server:
+```go
+ s.Run(ctx)
+```
## Custom transport
-See [http/server.go](/http/server.go) for example of transport implementation.
+Any transport must implement simple interface `transport.Transport`:
+
+```go
+type Transport interface {
+ Run(ctx context.Context, resolver Resolver) error
+}
+```
## Complete example
-[Full code](/examples/http)
+[Full code](/example)
```go
package main
import (
"context"
- "net/http"
- httpRPC "go.neonxp.dev/jsonrpc2/http"
"go.neonxp.dev/jsonrpc2/rpc"
+ "go.neonxp.dev/jsonrpc2/transport"
)
func main() {
- s := httpRPC.New()
- s.Register("multiply", rpc.Wrap(Multiply))
- s.Register("divide", rpc.Wrap(Divide))
+ s := rpc.New()
+
+ s.AddTransport(&transport.HTTP{Bind: ":8000"}) // HTTP transport
+ s.AddTransport(&transport.TCP{Bind: ":3000"}) // TCP transport
- http.ListenAndServe(":8000", s)
+ s.Register("multiply", rpc.H(Multiply))
+ s.Register("divide", rpc.H(Divide))
+
+ s.Run(context.Background())
}
func Multiply(ctx context.Context, args *Args) (int, error) {
@@ -87,3 +109,5 @@ Alexander Kiryukhin <i@neonxp.dev>
## License
![GPL v3](https://www.gnu.org/graphics/gplv3-with-text-136x68.png)
+
+
diff --git a/examples/http/main.go b/example/main.go
index 5c24867..9f25e61 100644
--- a/examples/http/main.go
+++ b/example/main.go
@@ -3,19 +3,29 @@ package main
import (
"context"
"errors"
- "net/http"
+ "log"
+ "os"
+ "os/signal"
- httpRPC "go.neonxp.dev/jsonrpc2/http"
"go.neonxp.dev/jsonrpc2/rpc"
+ "go.neonxp.dev/jsonrpc2/transport"
)
func main() {
- s := httpRPC.New()
+ s := rpc.New()
- s.Register("multiply", rpc.Wrap(Multiply))
- s.Register("divide", rpc.Wrap(Divide))
+ s.AddTransport(&transport.HTTP{Bind: ":8000"})
+ s.AddTransport(&transport.TCP{Bind: ":3000"})
- http.ListenAndServe(":8000", s)
+ s.Register("multiply", rpc.H(Multiply))
+ s.Register("divide", rpc.H(Divide))
+
+ ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
+ defer cancel()
+
+ if err := s.Run(ctx); err != nil {
+ log.Fatal(err)
+ }
}
func Multiply(ctx context.Context, args *Args) (int, error) {
diff --git a/example/test.http b/example/test.http
new file mode 100644
index 0000000..7ed134a
--- /dev/null
+++ b/example/test.http
@@ -0,0 +1,82 @@
+POST http://localhost:8000/
+Content-Type: application/json
+{
+ "jsonrpc": "2.0",
+ "method": "multiply",
+ "params": {
+ "a": 2,
+ "b": 3
+ },
+ "id": 1
+}
+
+
+date: Sat, 21 May 2022 16:53:31 GMT
+content-length: 36
+content-type: text/plain; charset=utf-8
+connection: close
+
+HTTP/1.1 200 - OK
+date: Sat, 21 May 2022 16:53:31 GMT
+content-length: 36
+content-type: text/plain; charset=utf-8
+connection: close
+
+
+###
+POST http://localhost:8000/
+Content-Type: application/json
+{
+ "jsonrpc": "2.0",
+ "method": "divide",
+ "params": {
+ "a": 10,
+ "b": 3
+ },
+ "id": 2
+}
+
+
+date: Sat, 21 May 2022 16:53:51 GMT
+content-length: 52
+content-type: text/plain; charset=utf-8
+connection: close
+
+HTTP/1.1 200 - OK
+date: Sat, 21 May 2022 16:53:51 GMT
+content-length: 52
+content-type: text/plain; charset=utf-8
+connection: close
+
+
+### Batch request
+POST http://localhost:8000/
+Content-Type: application/json
+ { "jsonrpc": "2.0", "method": "multiply", "params": { "a": 2, "b": 3 }, "id": 10 }
+ {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"}
+ {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]}
+ {"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"}
+ {
+ "jsonrpc": "2.0",
+ "method": "divide",
+ "params": {
+ "a": 10,
+ "b": 3
+ },
+ "id": "divide"
+ }
+ {"foo": "boo"}
+ {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"}
+ {"jsonrpc": "2.0", "method": "get_data", "id": "9"}
+
+
+date: Sat, 21 May 2022 17:18:17 GMT
+content-type: text/plain; charset=utf-8
+connection: close
+transfer-encoding: chunked
+
+HTTP/1.1 200 - OK
+date: Sat, 21 May 2022 17:18:17 GMT
+content-type: text/plain; charset=utf-8
+connection: close
+transfer-encoding: chunked \ No newline at end of file
diff --git a/examples/http/test.http b/examples/http/test.http
deleted file mode 100644
index d4e68b3..0000000
--- a/examples/http/test.http
+++ /dev/null
@@ -1,42 +0,0 @@
-POST http://localhost:8000/
-Content-Type: application/json
-
-{
- "jsonrpc": "2.0",
- "method": "multiply",
- "params": {
- "a": 2,
- "b": 3
- },
- "id": 1
-}
-
-###
-
-POST http://localhost:8000/
-Content-Type: application/json
-
-{
- "jsonrpc": "2.0",
- "method": "divide",
- "params": {
- "a": 10,
- "b": 3
- },
- "id": 2
-}
-
-###
-
-POST http://localhost:8000/
-Content-Type: application/json
-
-[
- { "jsonrpc": "2.0", "method": "multiply", "params": { "a": 2, "b": 3 }, "id": 10 },
- {"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
- {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]},
- {"jsonrpc": "2.0", "method": "subtract", "params": [42,23], "id": "2"},
- {"foo": "boo"},
- {"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"},
- {"jsonrpc": "2.0", "method": "get_data", "id": "9"}
-] \ No newline at end of file
diff --git a/go.mod b/go.mod
index c419096..f5af9b1 100644
--- a/go.mod
+++ b/go.mod
@@ -1,3 +1,5 @@
module go.neonxp.dev/jsonrpc2
go 1.18
+
+require golang.org/x/sync v0.0.0-20220513210516-0976fa681c29
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..7569778
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,2 @@
+golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
+golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
diff --git a/http/server.go b/http/server.go
deleted file mode 100644
index 9fae0a0..0000000
--- a/http/server.go
+++ /dev/null
@@ -1,52 +0,0 @@
-//Package http provides HTTP transport for JSON-RPC 2.0 server
-//
-//Copyright (C) 2022 Alexander Kiryukhin <i@neonxp.dev>
-//
-//This file is part of go.neonxp.dev/jsonrpc2 project.
-//
-//This program is free software: you can redistribute it and/or modify
-//it under the terms of the GNU General Public License as published by
-//the Free Software Foundation, either version 3 of the License, or
-//(at your option) any later version.
-//
-//This program is distributed in the hope that it will be useful,
-//but WITHOUT ANY WARRANTY; without even the implied warranty of
-//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-//GNU General Public License for more details.
-//
-//You should have received a copy of the GNU General Public License
-//along with this program. If not, see <https://www.gnu.org/licenses/>.
-
-package http
-
-import (
- "bufio"
- "net/http"
-
- "go.neonxp.dev/jsonrpc2/rpc"
-)
-
-type Server struct {
- *rpc.RpcServer
-}
-
-func New() *Server {
- return &Server{RpcServer: rpc.New()}
-}
-
-func (r *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
- writer.Header().Set("Content-Type", "application/json")
- reader := bufio.NewReader(request.Body)
- defer request.Body.Close()
- firstByte, err := reader.Peek(1)
- if err != nil {
- r.Logger.Logf("Can't read body: %v", err)
- rpc.WriteError(rpc.ErrCodeParseError, writer)
- return
- }
- if string(firstByte) == "[" {
- r.BatchRequest(request.Context(), reader, writer)
- return
- }
- r.SingleRequest(request.Context(), reader, writer)
-}
diff --git a/rpc/server.go b/rpc/server.go
index 4fa004d..1bb15d5 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -24,6 +24,10 @@ import (
"encoding/json"
"io"
"sync"
+
+ "golang.org/x/sync/errgroup"
+
+ "go.neonxp.dev/jsonrpc2/transport"
)
const version = "2.0"
@@ -32,6 +36,7 @@ type RpcServer struct {
Logger Logger
IgnoreNotifications bool
handlers map[string]Handler
+ transports []transport.Transport
mu sync.RWMutex
}
@@ -40,6 +45,7 @@ func New() *RpcServer {
Logger: nopLogger{},
IgnoreNotifications: true,
handlers: map[string]Handler{},
+ transports: []transport.Transport{},
mu: sync.RWMutex{},
}
}
@@ -50,51 +56,55 @@ func (r *RpcServer) Register(method string, handler Handler) {
r.handlers[method] = handler
}
-func (r *RpcServer) SingleRequest(ctx context.Context, reader io.Reader, writer io.Writer) {
- req := new(rpcRequest)
- if err := json.NewDecoder(reader).Decode(req); err != nil {
- r.Logger.Logf("Can't read body: %v", err)
- WriteError(ErrCodeParseError, writer)
- return
- }
- resp := r.callMethod(ctx, req)
- if req.Id == nil && r.IgnoreNotifications {
- // notification request
- return
- }
- if err := json.NewEncoder(writer).Encode(resp); err != nil {
- r.Logger.Logf("Can't write response: %v", err)
- WriteError(ErrCodeInternalError, writer)
- return
- }
+func (r *RpcServer) AddTransport(transport transport.Transport) {
+ r.transports = append(r.transports, transport)
}
-func (r *RpcServer) BatchRequest(ctx context.Context, reader io.Reader, writer io.Writer) {
- var req []rpcRequest
- if err := json.NewDecoder(reader).Decode(&req); err != nil {
- r.Logger.Logf("Can't read body: %v", err)
- WriteError(ErrCodeParseError, writer)
- return
+func (r *RpcServer) Run(ctx context.Context) error {
+ eg, ctx := errgroup.WithContext(ctx)
+ for _, t := range r.transports {
+ eg.Go(func(t transport.Transport) func() error {
+ return func() error { return t.Run(ctx, r) }
+ }(t))
}
- var responses []*rpcResponse
+ return eg.Wait()
+}
+
+func (r *RpcServer) Resolve(ctx context.Context, rd io.Reader, w io.Writer) {
+ dec := json.NewDecoder(rd)
+ enc := json.NewEncoder(w)
+ mu := sync.Mutex{}
wg := sync.WaitGroup{}
- wg.Add(len(req))
- for _, j := range req {
- go func(req rpcRequest) {
+ for {
+ req := new(rpcRequest)
+ if err := dec.Decode(req); err != nil {
+ if err == io.EOF {
+ break
+ }
+ r.Logger.Logf("Can't read body: %v", err)
+ WriteError(ErrCodeParseError, enc)
+ break
+ }
+ wg.Add(1)
+ go func(req *rpcRequest) {
defer wg.Done()
- resp := r.callMethod(ctx, &req)
- if req.Id == nil && r.IgnoreNotifications {
+ resp := r.callMethod(ctx, req)
+ if req.Id == nil {
// notification request
return
}
- responses = append(responses, resp)
- }(j)
+ mu.Lock()
+ defer mu.Unlock()
+ if err := enc.Encode(resp); err != nil {
+ r.Logger.Logf("Can't write response: %v", err)
+ WriteError(ErrCodeInternalError, enc)
+ }
+ if w, canFlush := w.(Flusher); canFlush {
+ w.Flush()
+ }
+ }(req)
}
wg.Wait()
- if err := json.NewEncoder(writer).Encode(responses); err != nil {
- r.Logger.Logf("Can't write response: %v", err)
- WriteError(ErrCodeInternalError, writer)
- }
}
func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcResponse {
@@ -124,8 +134,8 @@ func (r *RpcServer) callMethod(ctx context.Context, req *rpcRequest) *rpcRespons
}
}
-func WriteError(code int, w io.Writer) {
- _ = json.NewEncoder(w).Encode(rpcResponse{
+func WriteError(code int, enc *json.Encoder) {
+ enc.Encode(rpcResponse{
Jsonrpc: version,
Error: NewError(code),
})
@@ -144,3 +154,8 @@ type rpcResponse struct {
Error error `json:"error,omitempty"`
Id any `json:"id,omitempty"`
}
+
+type Flusher interface {
+ // Flush sends any buffered data to the client.
+ Flush()
+}
diff --git a/rpc/wrapper.go b/rpc/wrapper.go
index bfcd381..9b4de28 100644
--- a/rpc/wrapper.go
+++ b/rpc/wrapper.go
@@ -24,7 +24,7 @@ import (
"encoding/json"
)
-func Wrap[RQ any, RS any](handler func(context.Context, *RQ) (RS, error)) Handler {
+func H[RQ any, RS any](handler func(context.Context, *RQ) (RS, error)) Handler {
return func(ctx context.Context, in json.RawMessage) (json.RawMessage, error) {
req := new(RQ)
if err := json.Unmarshal(in, req); err != nil {
diff --git a/transport/http.go b/transport/http.go
new file mode 100644
index 0000000..663bb31
--- /dev/null
+++ b/transport/http.go
@@ -0,0 +1,40 @@
+package transport
+
+import (
+ "context"
+ "crypto/tls"
+ "net"
+ "net/http"
+)
+
+type HTTP struct {
+ Bind string
+ TLS *tls.Config
+}
+
+func (h *HTTP) Run(ctx context.Context, resolver Resolver) error {
+ srv := http.Server{
+ Addr: h.Bind,
+ Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodPost {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "application/json")
+ resolver.Resolve(ctx, r.Body, w)
+ }),
+ BaseContext: func(l net.Listener) context.Context {
+ return ctx
+ },
+ TLSConfig: h.TLS,
+ }
+ go func() {
+ <-ctx.Done()
+ srv.Close()
+ }()
+ if err := srv.ListenAndServe(); err != http.ErrServerClosed {
+ return err
+ }
+ return nil
+}
diff --git a/transport/tcp.go b/transport/tcp.go
new file mode 100644
index 0000000..2ab946a
--- /dev/null
+++ b/transport/tcp.go
@@ -0,0 +1,22 @@
+package transport
+
+import (
+ "context"
+ "net"
+)
+
+type TCP struct {
+ Bind string
+}
+
+func (t *TCP) Run(ctx context.Context, resolver Resolver) error {
+ ln, _ := net.Listen("tcp", t.Bind)
+
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ return err
+ }
+ go resolver.Resolve(ctx, conn, conn)
+ }
+}
diff --git a/transport/transport.go b/transport/transport.go
new file mode 100644
index 0000000..2a54295
--- /dev/null
+++ b/transport/transport.go
@@ -0,0 +1,14 @@
+package transport
+
+import (
+ "context"
+ "io"
+)
+
+type Transport interface {
+ Run(ctx context.Context, resolver Resolver) error
+}
+
+type Resolver interface {
+ Resolve(context.Context, io.Reader, io.Writer)
+}