diff options
-rw-r--r-- | rpc/comms/http.go | 185 | ||||
-rw-r--r-- | rpc/comms/http_net.go | 182 |
2 files changed, 162 insertions, 205 deletions
diff --git a/rpc/comms/http.go b/rpc/comms/http.go index 108ba0c5f..c165aa27e 100644 --- a/rpc/comms/http.go +++ b/rpc/comms/http.go @@ -17,11 +17,16 @@ package comms import ( + "encoding/json" "fmt" + "net" "net/http" "strings" + "sync" + "time" "bytes" + "io" "io/ioutil" "github.com/ethereum/go-ethereum/logger" @@ -31,10 +36,15 @@ import ( "github.com/rs/cors" ) +const ( + serverIdleTimeout = 10 * time.Second // idle keep-alive connections + serverReadTimeout = 15 * time.Second // per-request read timeout + serverWriteTimeout = 15 * time.Second // per-request read timeout +) + var ( - // main HTTP rpc listener - httpListener *stoppableTCPListener - listenerStoppedError = fmt.Errorf("Listener has stopped") + httpServerMu sync.Mutex + httpServer *stopServer ) type HttpConfig struct { @@ -43,42 +53,171 @@ type HttpConfig struct { CorsDomain string } +// stopServer augments http.Server with idle connection tracking. +// Idle keep-alive connections are shut down when Close is called. +type stopServer struct { + *http.Server + l net.Listener + // connection tracking state + mu sync.Mutex + shutdown bool // true when Stop has returned + idle map[net.Conn]struct{} +} + +type handler struct { + codec codec.Codec + api shared.EthereumApi +} + +// StartHTTP starts listening for RPC requests sent via HTTP. func StartHttp(cfg HttpConfig, codec codec.Codec, api shared.EthereumApi) error { - if httpListener != nil { - if fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort) != httpListener.Addr().String() { - return fmt.Errorf("RPC service already running on %s ", httpListener.Addr().String()) + httpServerMu.Lock() + defer httpServerMu.Unlock() + + addr := fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort) + if httpServer != nil { + if addr != httpServer.Addr { + return fmt.Errorf("RPC service already running on %s ", httpServer.Addr) } return nil // RPC service already running on given host/port } - - l, err := newStoppableTCPListener(fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort)) + // Set up the request handler, wrapping it with CORS headers if configured. + handler := http.Handler(&handler{codec, api}) + if len(cfg.CorsDomain) > 0 { + opts := cors.Options{ + AllowedMethods: []string{"POST"}, + AllowedOrigins: strings.Split(cfg.CorsDomain, " "), + } + handler = cors.New(opts).Handler(handler) + } + // Start the server. + s, err := listenHTTP(addr, handler) if err != nil { glog.V(logger.Error).Infof("Can't listen on %s:%d: %v", cfg.ListenAddress, cfg.ListenPort, err) return err } - httpListener = l + httpServer = s + return nil +} - var handler http.Handler - if len(cfg.CorsDomain) > 0 { - var opts cors.Options - opts.AllowedMethods = []string{"POST"} - opts.AllowedOrigins = strings.Split(cfg.CorsDomain, " ") +func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") - c := cors.New(opts) - handler = newStoppableHandler(c.Handler(gethHttpHandler(codec, api)), l.stop) - } else { - handler = newStoppableHandler(gethHttpHandler(codec, api), l.stop) + // Limit request size to resist DoS + if req.ContentLength > maxHttpSizeReqLength { + err := fmt.Errorf("Request too large") + response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) + sendJSON(w, &response) + return } - go http.Serve(l, handler) + defer req.Body.Close() + payload, err := ioutil.ReadAll(req.Body) + if err != nil { + err := fmt.Errorf("Could not read request body") + response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) + sendJSON(w, &response) + return + } - return nil + c := h.codec.New(nil) + var rpcReq shared.Request + if err = c.Decode(payload, &rpcReq); err == nil { + reply, err := h.api.Execute(&rpcReq) + res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) + sendJSON(w, &res) + return + } + + var reqBatch []shared.Request + if err = c.Decode(payload, &reqBatch); err == nil { + resBatch := make([]*interface{}, len(reqBatch)) + resCount := 0 + for i, rpcReq := range reqBatch { + reply, err := h.api.Execute(&rpcReq) + if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal + resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) + resCount += 1 + } + } + // make response omitting nil entries + sendJSON(w, resBatch[:resCount]) + return + } + + // invalid request + err = fmt.Errorf("Could not decode request") + res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err) + sendJSON(w, res) } +func sendJSON(w io.Writer, v interface{}) { + if glog.V(logger.Detail) { + if payload, err := json.MarshalIndent(v, "", "\t"); err == nil { + glog.Infof("Sending payload: %s", payload) + } + } + if err := json.NewEncoder(w).Encode(v); err != nil { + glog.V(logger.Error).Infoln("Error sending JSON:", err) + } +} + +// Stop closes all active HTTP connections and shuts down the server. func StopHttp() { - if httpListener != nil { - httpListener.Stop() - httpListener = nil + httpServerMu.Lock() + defer httpServerMu.Unlock() + if httpServer != nil { + httpServer.Close() + httpServer = nil + } +} + +func listenHTTP(addr string, h http.Handler) (*stopServer, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + s := &stopServer{l: l, idle: make(map[net.Conn]struct{})} + s.Server = &http.Server{ + Addr: addr, + Handler: h, + ReadTimeout: serverReadTimeout, + WriteTimeout: serverWriteTimeout, + ConnState: s.connState, + } + go s.Serve(l) + return s, nil +} + +func (s *stopServer) connState(c net.Conn, state http.ConnState) { + s.mu.Lock() + defer s.mu.Unlock() + // Close c immediately if we're past shutdown. + if s.shutdown { + if state != http.StateClosed { + c.Close() + } + return + } + if state == http.StateIdle { + s.idle[c] = struct{}{} + } else { + delete(s.idle, c) + } +} + +func (s *stopServer) Close() { + s.mu.Lock() + defer s.mu.Unlock() + // Shut down the acceptor. No new connections can be created. + s.l.Close() + // Drop all idle connections. Non-idle connections will be + // closed by connState as soon as they become idle. + s.shutdown = true + for c := range s.idle { + glog.V(logger.Detail).Infof("closing idle connection %v", c.RemoteAddr()) + c.Close() + delete(s.idle, c) } } diff --git a/rpc/comms/http_net.go b/rpc/comms/http_net.go deleted file mode 100644 index dba2029d4..000000000 --- a/rpc/comms/http_net.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package comms - -import ( - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net" - "net/http" - "time" - - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc/codec" - "github.com/ethereum/go-ethereum/rpc/shared" -) - -// When https://github.com/golang/go/issues/4674 is implemented this could be replaced -type stoppableTCPListener struct { - *net.TCPListener - stop chan struct{} // closed when the listener must stop -} - -func newStoppableTCPListener(addr string) (*stoppableTCPListener, error) { - wl, err := net.Listen("tcp", addr) - if err != nil { - return nil, err - } - - if tcpl, ok := wl.(*net.TCPListener); ok { - stop := make(chan struct{}) - return &stoppableTCPListener{tcpl, stop}, nil - } - - return nil, fmt.Errorf("Unable to create TCP listener for RPC service") -} - -// Stop the listener and all accepted and still active connections. -func (self *stoppableTCPListener) Stop() { - close(self.stop) -} - -func (self *stoppableTCPListener) Accept() (net.Conn, error) { - for { - self.SetDeadline(time.Now().Add(time.Duration(1 * time.Second))) - c, err := self.TCPListener.AcceptTCP() - - select { - case <-self.stop: - if c != nil { // accept timeout - c.Close() - } - self.TCPListener.Close() - return nil, listenerStoppedError - default: - } - - if err != nil { - if netErr, ok := err.(net.Error); ok && netErr.Timeout() && netErr.Temporary() { - continue // regular timeout - } - } - - return &closableConnection{c, self.stop}, err - } -} - -type closableConnection struct { - *net.TCPConn - closed chan struct{} -} - -func (self *closableConnection) Read(b []byte) (n int, err error) { - select { - case <-self.closed: - self.TCPConn.Close() - return 0, io.EOF - default: - return self.TCPConn.Read(b) - } -} - -// Wraps the default handler and checks if the RPC service was stopped. In that case it returns an -// error indicating that the service was stopped. This will only happen for connections which are -// kept open (HTTP keep-alive) when the RPC service was shutdown. -func newStoppableHandler(h http.Handler, stop chan struct{}) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - select { - case <-stop: - w.Header().Set("Content-Type", "application/json") - err := fmt.Errorf("RPC service stopped") - response := shared.NewRpcResponse(-1, shared.JsonRpcVersion, nil, err) - httpSend(w, response) - default: - h.ServeHTTP(w, r) - } - }) -} - -func httpSend(writer io.Writer, v interface{}) (n int, err error) { - var payload []byte - payload, err = json.MarshalIndent(v, "", "\t") - if err != nil { - glog.V(logger.Error).Infoln("Error marshalling JSON", err) - return 0, err - } - glog.V(logger.Detail).Infof("Sending payload: %s", payload) - - return writer.Write(payload) -} - -func gethHttpHandler(codec codec.Codec, a shared.EthereumApi) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.Header().Set("Content-Type", "application/json") - - // Limit request size to resist DoS - if req.ContentLength > maxHttpSizeReqLength { - err := fmt.Errorf("Request too large") - response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) - httpSend(w, &response) - return - } - - defer req.Body.Close() - payload, err := ioutil.ReadAll(req.Body) - if err != nil { - err := fmt.Errorf("Could not read request body") - response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) - httpSend(w, &response) - return - } - - c := codec.New(nil) - var rpcReq shared.Request - if err = c.Decode(payload, &rpcReq); err == nil { - reply, err := a.Execute(&rpcReq) - res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) - httpSend(w, &res) - return - } - - var reqBatch []shared.Request - if err = c.Decode(payload, &reqBatch); err == nil { - resBatch := make([]*interface{}, len(reqBatch)) - resCount := 0 - - for i, rpcReq := range reqBatch { - reply, err := a.Execute(&rpcReq) - if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal - resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) - resCount += 1 - } - } - - // make response omitting nil entries - resBatch = resBatch[:resCount] - httpSend(w, resBatch) - return - } - - // invalid request - err = fmt.Errorf("Could not decode request") - res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err) - httpSend(w, res) - }) -} |