From 6d92fdc0df12337c2ffb8e6d19c83653f1a00ff2 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 25 Jun 2015 12:01:28 +0200 Subject: added support for batch requests --- rpc/codec/codec.go | 2 +- rpc/codec/json.go | 59 ++++++++++++++++++++++++++++++++++++++++++++---------- rpc/comms/comms.go | 38 ++++++++++++++++++++++++++--------- 3 files changed, 78 insertions(+), 21 deletions(-) diff --git a/rpc/codec/codec.go b/rpc/codec/codec.go index 5e8f38438..3177f77e4 100644 --- a/rpc/codec/codec.go +++ b/rpc/codec/codec.go @@ -12,7 +12,7 @@ type Codec int // (de)serialization support for rpc interface type ApiCoder interface { // Parse message to request from underlying stream - ReadRequest() (*shared.Request, error) + ReadRequest() ([]*shared.Request, bool, error) // Parse response message from underlying stream ReadResponse() (interface{}, error) // Encode response to encoded form in underlying stream diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 31024ee74..380b4cba7 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -8,33 +8,53 @@ import ( ) const ( - MAX_RESPONSE_SIZE = 64 * 1024 + MAX_REQUEST_SIZE = 1024 * 1024 + MAX_RESPONSE_SIZE = 1024 * 1024 ) // Json serialization support type JsonCodec struct { c net.Conn - d *json.Decoder - e *json.Encoder + buffer []byte + bytesInBuffer int } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ c: conn, - d: json.NewDecoder(conn), - e: json.NewEncoder(conn), + buffer: make([]byte, MAX_REQUEST_SIZE), + bytesInBuffer: 0, } } // Serialize obj to JSON and write it to conn -func (self *JsonCodec) ReadRequest() (*shared.Request, error) { - req := shared.Request{} - err := self.d.Decode(&req) +func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { + n, err := self.c.Read(self.buffer[self.bytesInBuffer:]) + if err != nil { + self.bytesInBuffer = 0 + return nil, false, err + } + + self.bytesInBuffer += n + + singleRequest := shared.Request{} + err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest) if err == nil { - return &req, nil + self.bytesInBuffer = 0 + requests := make([]*shared.Request, 1) + requests[0] = &singleRequest + return requests, false, nil } - return nil, err + + requests = make([]*shared.Request, 0) + err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests) + if err == nil { + self.bytesInBuffer = 0 + return requests, true, nil + } + + return nil, false, err } func (self *JsonCodec) ReadResponse() (interface{}, error) { @@ -66,7 +86,24 @@ func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) { // Parse JSON data from conn to obj func (self *JsonCodec) WriteResponse(res interface{}) error { - return self.e.Encode(&res) + data, err := json.Marshal(res) + if err != nil { + self.c.Close() + return err + } + + bytesWritten := 0 + + for bytesWritten < len(data) { + n, err := self.c.Write(data[bytesWritten:]) + if err != nil { + self.c.Close() + return err + } + bytesWritten += n + } + + return nil } // Close decoder and encoder diff --git a/rpc/comms/comms.go b/rpc/comms/comms.go index bfe625758..1374bde3f 100644 --- a/rpc/comms/comms.go +++ b/rpc/comms/comms.go @@ -47,7 +47,7 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { codec := c.New(conn) for { - req, err := codec.ReadRequest() + requests, isBatch, err := codec.ReadRequest() if err == io.EOF { codec.Close() return @@ -57,15 +57,35 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) { return } - var rpcResponse interface{} - res, err := api.Execute(req) + if isBatch { + responses := make([]*interface{}, len(requests)) + responseCount := 0 + for _, req := range requests { + res, err := api.Execute(req) + if req.Id != nil { + rpcResponse := shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) + responses[responseCount] = rpcResponse + responseCount += 1 + } + } - rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err) - err = codec.WriteResponse(rpcResponse) - if err != nil { - glog.V(logger.Error).Infof("comms send err - %v\n", err) - codec.Close() - return + err = codec.WriteResponse(responses[:responseCount]) + if err != nil { + glog.V(logger.Error).Infof("comms send err - %v\n", err) + codec.Close() + return + } + } else { + var rpcResponse interface{} + res, err := api.Execute(requests[0]) + + rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err) + err = codec.WriteResponse(rpcResponse) + if err != nil { + glog.V(logger.Error).Infof("comms send err - %v\n", err) + codec.Close() + return + } } } } -- cgit