diff options
-rw-r--r-- | Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go | 4 | ||||
-rw-r--r-- | Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go | 14 | ||||
-rw-r--r-- | Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go | 4 | ||||
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go | 5 | ||||
-rw-r--r-- | cmd/geth/main.go | 4 | ||||
-rw-r--r-- | fdtrack/fdtrack.go | 112 | ||||
-rw-r--r-- | fdtrack/fdusage.go | 29 | ||||
-rw-r--r-- | fdtrack/fdusage_darwin.go | 72 | ||||
-rw-r--r-- | fdtrack/fdusage_linux.go | 53 | ||||
-rw-r--r-- | metrics/disk_linux.go | 1 | ||||
-rw-r--r-- | p2p/dial.go | 2 | ||||
-rw-r--r-- | p2p/discover/udp.go | 3 | ||||
-rw-r--r-- | p2p/metrics.go | 8 | ||||
-rw-r--r-- | p2p/server.go | 3 | ||||
-rw-r--r-- | rpc/comms/http.go | 187 | ||||
-rw-r--r-- | rpc/comms/http_net.go | 182 | ||||
-rw-r--r-- | rpc/comms/ipc_unix.go | 6 |
17 files changed, 477 insertions, 212 deletions
diff --git a/Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go b/Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go index 862c3def4..3f4606af0 100644 --- a/Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go +++ b/Godeps/_workspace/src/github.com/huin/goupnp/httpu/httpu.go @@ -9,6 +9,8 @@ import ( "net/http" "sync" "time" + + "github.com/ethereum/go-ethereum/fdtrack" ) // HTTPUClient is a client for dealing with HTTPU (HTTP over UDP). Its typical @@ -25,6 +27,7 @@ func NewHTTPUClient() (*HTTPUClient, error) { if err != nil { return nil, err } + fdtrack.Open("upnp") return &HTTPUClient{conn: conn}, nil } @@ -33,6 +36,7 @@ func NewHTTPUClient() (*HTTPUClient, error) { func (httpu *HTTPUClient) Close() error { httpu.connLock.Lock() defer httpu.connLock.Unlock() + fdtrack.Close("upnp") return httpu.conn.Close() } diff --git a/Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go b/Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go index 815610734..786ce6fa8 100644 --- a/Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go +++ b/Godeps/_workspace/src/github.com/huin/goupnp/soap/soap.go @@ -7,9 +7,12 @@ import ( "encoding/xml" "fmt" "io/ioutil" + "net" "net/http" "net/url" "reflect" + + "github.com/ethereum/go-ethereum/fdtrack" ) const ( @@ -26,6 +29,17 @@ type SOAPClient struct { func NewSOAPClient(endpointURL url.URL) *SOAPClient { return &SOAPClient{ EndpointURL: endpointURL, + HTTPClient: http.Client{ + Transport: &http.Transport{ + Dial: func(network, addr string) (net.Conn, error) { + c, err := net.Dial(network, addr) + if c != nil { + c = fdtrack.WrapConn("upnp", c) + } + return c, err + }, + }, + }, } } diff --git a/Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go b/Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go index 8ce4e8342..b165c784e 100644 --- a/Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go +++ b/Godeps/_workspace/src/github.com/jackpal/go-nat-pmp/natpmp.go @@ -5,6 +5,8 @@ import ( "log" "net" "time" + + "github.com/ethereum/go-ethereum/fdtrack" ) // Implement the NAT-PMP protocol, typically supported by Apple routers and open source @@ -102,6 +104,8 @@ func (n *Client) rpc(msg []byte, resultSize int) (result []byte, err error) { if err != nil { return } + fdtrack.Open("natpmp") + defer fdtrack.Close("natpmp") defer conn.Close() result = make([]byte, resultSize) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go index 46cc9d070..6d44f74b3 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/fdtrack" "github.com/syndtr/goleveldb/leveldb/util" ) @@ -369,6 +370,8 @@ func (fw fileWrap) Close() error { err := fw.File.Close() if err != nil { f.fs.log(fmt.Sprintf("close %s.%d: %v", f.Type(), f.Num(), err)) + } else { + fdtrack.Close("leveldb") } return err } @@ -400,6 +403,7 @@ func (f *file) Open() (Reader, error) { return nil, err } ok: + fdtrack.Open("leveldb") f.open = true f.fs.open++ return fileWrap{of, f}, nil @@ -418,6 +422,7 @@ func (f *file) Create() (Writer, error) { if err != nil { return nil, err } + fdtrack.Open("leveldb") f.open = true f.fs.open++ return fileWrap{of, f}, nil diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 511e0df9d..00dd8f753 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/fdtrack" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/metrics" @@ -532,6 +533,9 @@ func startEth(ctx *cli.Context, eth *eth.Ethereum) { // Start Ethereum itself utils.StartEthereum(eth) + // Start logging file descriptor stats. + fdtrack.Start() + am := eth.AccountManager() account := ctx.GlobalString(utils.UnlockedAccountFlag.Name) accounts := strings.Split(account, " ") diff --git a/fdtrack/fdtrack.go b/fdtrack/fdtrack.go new file mode 100644 index 000000000..2f5ab57f4 --- /dev/null +++ b/fdtrack/fdtrack.go @@ -0,0 +1,112 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package fdtrack logs statistics about open file descriptors. +package fdtrack + +import ( + "fmt" + "net" + "sort" + "sync" + "time" + + "github.com/ethereum/go-ethereum/logger/glog" +) + +var ( + mutex sync.Mutex + all = make(map[string]int) +) + +func Open(desc string) { + mutex.Lock() + all[desc] += 1 + mutex.Unlock() +} + +func Close(desc string) { + mutex.Lock() + defer mutex.Unlock() + if c, ok := all[desc]; ok { + if c == 1 { + delete(all, desc) + } else { + all[desc]-- + } + } +} + +func WrapListener(desc string, l net.Listener) net.Listener { + Open(desc) + return &wrappedListener{l, desc} +} + +type wrappedListener struct { + net.Listener + desc string +} + +func (w *wrappedListener) Accept() (net.Conn, error) { + c, err := w.Listener.Accept() + if err == nil { + c = WrapConn(w.desc, c) + } + return c, err +} + +func (w *wrappedListener) Close() error { + err := w.Listener.Close() + if err == nil { + Close(w.desc) + } + return err +} + +func WrapConn(desc string, conn net.Conn) net.Conn { + Open(desc) + return &wrappedConn{conn, desc} +} + +type wrappedConn struct { + net.Conn + desc string +} + +func (w *wrappedConn) Close() error { + err := w.Conn.Close() + if err == nil { + Close(w.desc) + } + return err +} + +func Start() { + go func() { + for range time.Tick(15 * time.Second) { + mutex.Lock() + var sum, tracked = 0, []string{} + for what, n := range all { + sum += n + tracked = append(tracked, fmt.Sprintf("%s:%d", what, n)) + } + mutex.Unlock() + used, _ := fdusage() + sort.Strings(tracked) + glog.Infof("fd usage %d/%d, tracked %d %v", used, fdlimit(), sum, tracked) + } + }() +} diff --git a/fdtrack/fdusage.go b/fdtrack/fdusage.go new file mode 100644 index 000000000..689625a8f --- /dev/null +++ b/fdtrack/fdusage.go @@ -0,0 +1,29 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build !linux,!darwin + +package fdtrack + +import "errors" + +func fdlimit() int { + return 0 +} + +func fdusage() (int, error) { + return 0, errors.New("not implemented") +} diff --git a/fdtrack/fdusage_darwin.go b/fdtrack/fdusage_darwin.go new file mode 100644 index 000000000..04a3a9baf --- /dev/null +++ b/fdtrack/fdusage_darwin.go @@ -0,0 +1,72 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build darwin + +package fdtrack + +import ( + "os" + "syscall" + "unsafe" +) + +// #cgo CFLAGS: -lproc +// #include <libproc.h> +// #include <stdlib.h> +import "C" + +func fdlimit() int { + var nofile syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &nofile); err != nil { + return 0 + } + return int(nofile.Cur) +} + +func fdusage() (int, error) { + pid := C.int(os.Getpid()) + // Query for a rough estimate on the amout of data that + // proc_pidinfo will return. + rlen, err := C.proc_pidinfo(pid, C.PROC_PIDLISTFDS, 0, nil, 0) + if rlen <= 0 { + return 0, err + } + // Load the list of file descriptors. We don't actually care about + // the content, only about the size. Since the number of fds can + // change while we're reading them, the loop enlarges the buffer + // until proc_pidinfo says the result fitted. + var buf unsafe.Pointer + defer func() { + if buf != nil { + C.free(buf) + } + }() + for buflen := rlen; ; buflen *= 2 { + buf, err = C.reallocf(buf, C.size_t(buflen)) + if buf == nil { + return 0, err + } + rlen, err = C.proc_pidinfo(pid, C.PROC_PIDLISTFDS, 0, buf, buflen) + if rlen <= 0 { + return 0, err + } else if rlen == buflen { + continue + } + return int(rlen / C.PROC_PIDLISTFD_SIZE), nil + } + panic("unreachable") +} diff --git a/fdtrack/fdusage_linux.go b/fdtrack/fdusage_linux.go new file mode 100644 index 000000000..d9a856a0c --- /dev/null +++ b/fdtrack/fdusage_linux.go @@ -0,0 +1,53 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build linux + +package fdtrack + +import ( + "io" + "os" + "syscall" +) + +func fdlimit() int { + var nofile syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &nofile); err != nil { + return 0 + } + return int(nofile.Cur) +} + +func fdusage() (int, error) { + f, err := os.Open("/proc/self/fd") + if err != nil { + return 0, err + } + defer f.Close() + const batchSize = 100 + n := 0 + for { + list, err := f.Readdirnames(batchSize) + n += len(list) + if err == io.EOF { + break + } else if err != nil { + return 0, err + } + } + return n, nil +} diff --git a/metrics/disk_linux.go b/metrics/disk_linux.go index e0c8a1a3a..8967d490e 100644 --- a/metrics/disk_linux.go +++ b/metrics/disk_linux.go @@ -34,6 +34,7 @@ func ReadDiskStats(stats *DiskStats) error { if err != nil { return err } + defer inf.Close() in := bufio.NewReader(inf) // Iterate over the IO counter, and extract what we need diff --git a/p2p/dial.go b/p2p/dial.go index 0fd3a4cf5..8b210bacd 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -23,6 +23,7 @@ import ( "net" "time" + "github.com/ethereum/go-ethereum/fdtrack" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p/discover" @@ -212,6 +213,7 @@ func (t *dialTask) Do(srv *Server) { glog.V(logger.Detail).Infof("dial error: %v", err) return } + fd = fdtrack.WrapConn("p2p", fd) mfd := newMeteredConn(fd, false) srv.setupConn(mfd, t.flags, t.dest) diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 4e6ecaf23..d7ca9000d 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/fdtrack" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p/nat" @@ -197,6 +198,7 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBP if err != nil { return nil, err } + fdtrack.Open("p2p") conn, err := net.ListenUDP("udp", addr) if err != nil { return nil, err @@ -234,6 +236,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath strin func (t *udp) close() { close(t.closing) + fdtrack.Close("p2p") t.conn.Close() // TODO: wait for the loops to end. } diff --git a/p2p/metrics.go b/p2p/metrics.go index f98cac274..8ee4ed04b 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -34,7 +34,7 @@ var ( // meteredConn is a wrapper around a network TCP connection that meters both the // inbound and outbound network traffic. type meteredConn struct { - *net.TCPConn // Network connection to wrap with metering + net.Conn } // newMeteredConn creates a new metered connection, also bumping the ingress or @@ -45,13 +45,13 @@ func newMeteredConn(conn net.Conn, ingress bool) net.Conn { } else { egressConnectMeter.Mark(1) } - return &meteredConn{conn.(*net.TCPConn)} + return &meteredConn{conn} } // Read delegates a network read to the underlying connection, bumping the ingress // traffic meter along the way. func (c *meteredConn) Read(b []byte) (n int, err error) { - n, err = c.TCPConn.Read(b) + n, err = c.Conn.Read(b) ingressTrafficMeter.Mark(int64(n)) return } @@ -59,7 +59,7 @@ func (c *meteredConn) Read(b []byte) (n int, err error) { // Write delegates a network write to the underlying connection, bumping the // egress traffic meter along the way. func (c *meteredConn) Write(b []byte) (n int, err error) { - n, err = c.TCPConn.Write(b) + n, err = c.Conn.Write(b) egressTrafficMeter.Mark(int64(n)) return } diff --git a/p2p/server.go b/p2p/server.go index ba83c5503..7351a2654 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/fdtrack" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p/discover" @@ -372,7 +373,7 @@ func (srv *Server) startListening() error { } laddr := listener.Addr().(*net.TCPAddr) srv.ListenAddr = laddr.String() - srv.listener = listener + srv.listener = fdtrack.WrapListener("p2p", listener) srv.loopWG.Add(1) go srv.listenLoop() // Map the TCP listening port if NAT is configured. diff --git a/rpc/comms/http.go b/rpc/comms/http.go index 108ba0c5f..c08b744a1 100644 --- a/rpc/comms/http.go +++ b/rpc/comms/http.go @@ -17,13 +17,19 @@ package comms import ( + "encoding/json" "fmt" + "net" "net/http" "strings" + "sync" + "time" "bytes" + "io" "io/ioutil" + "github.com/ethereum/go-ethereum/fdtrack" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" @@ -31,10 +37,15 @@ import ( "github.com/rs/cors" ) +const ( + serverIdleTimeout = 10 * time.Second // idle keep-alive connections + serverReadTimeout = 15 * time.Second // per-request read timeout + serverWriteTimeout = 15 * time.Second // per-request read timeout +) + var ( - // main HTTP rpc listener - httpListener *stoppableTCPListener - listenerStoppedError = fmt.Errorf("Listener has stopped") + httpServerMu sync.Mutex + httpServer *stopServer ) type HttpConfig struct { @@ -43,42 +54,172 @@ type HttpConfig struct { CorsDomain string } +// stopServer augments http.Server with idle connection tracking. +// Idle keep-alive connections are shut down when Close is called. +type stopServer struct { + *http.Server + l net.Listener + // connection tracking state + mu sync.Mutex + shutdown bool // true when Stop has returned + idle map[net.Conn]struct{} +} + +type handler struct { + codec codec.Codec + api shared.EthereumApi +} + +// StartHTTP starts listening for RPC requests sent via HTTP. func StartHttp(cfg HttpConfig, codec codec.Codec, api shared.EthereumApi) error { - if httpListener != nil { - if fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort) != httpListener.Addr().String() { - return fmt.Errorf("RPC service already running on %s ", httpListener.Addr().String()) + httpServerMu.Lock() + defer httpServerMu.Unlock() + + addr := fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort) + if httpServer != nil { + if addr != httpServer.Addr { + return fmt.Errorf("RPC service already running on %s ", httpServer.Addr) } return nil // RPC service already running on given host/port } - - l, err := newStoppableTCPListener(fmt.Sprintf("%s:%d", cfg.ListenAddress, cfg.ListenPort)) + // Set up the request handler, wrapping it with CORS headers if configured. + handler := http.Handler(&handler{codec, api}) + if len(cfg.CorsDomain) > 0 { + opts := cors.Options{ + AllowedMethods: []string{"POST"}, + AllowedOrigins: strings.Split(cfg.CorsDomain, " "), + } + handler = cors.New(opts).Handler(handler) + } + // Start the server. + s, err := listenHTTP(addr, handler) if err != nil { glog.V(logger.Error).Infof("Can't listen on %s:%d: %v", cfg.ListenAddress, cfg.ListenPort, err) return err } - httpListener = l + httpServer = s + return nil +} - var handler http.Handler - if len(cfg.CorsDomain) > 0 { - var opts cors.Options - opts.AllowedMethods = []string{"POST"} - opts.AllowedOrigins = strings.Split(cfg.CorsDomain, " ") +func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") - c := cors.New(opts) - handler = newStoppableHandler(c.Handler(gethHttpHandler(codec, api)), l.stop) - } else { - handler = newStoppableHandler(gethHttpHandler(codec, api), l.stop) + // Limit request size to resist DoS + if req.ContentLength > maxHttpSizeReqLength { + err := fmt.Errorf("Request too large") + response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) + sendJSON(w, &response) + return } - go http.Serve(l, handler) + defer req.Body.Close() + payload, err := ioutil.ReadAll(req.Body) + if err != nil { + err := fmt.Errorf("Could not read request body") + response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) + sendJSON(w, &response) + return + } - return nil + c := h.codec.New(nil) + var rpcReq shared.Request + if err = c.Decode(payload, &rpcReq); err == nil { + reply, err := h.api.Execute(&rpcReq) + res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) + sendJSON(w, &res) + return + } + + var reqBatch []shared.Request + if err = c.Decode(payload, &reqBatch); err == nil { + resBatch := make([]*interface{}, len(reqBatch)) + resCount := 0 + for i, rpcReq := range reqBatch { + reply, err := h.api.Execute(&rpcReq) + if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal + resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) + resCount += 1 + } + } + // make response omitting nil entries + sendJSON(w, resBatch[:resCount]) + return + } + + // invalid request + err = fmt.Errorf("Could not decode request") + res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err) + sendJSON(w, res) } +func sendJSON(w io.Writer, v interface{}) { + if glog.V(logger.Detail) { + if payload, err := json.MarshalIndent(v, "", "\t"); err == nil { + glog.Infof("Sending payload: %s", payload) + } + } + if err := json.NewEncoder(w).Encode(v); err != nil { + glog.V(logger.Error).Infoln("Error sending JSON:", err) + } +} + +// Stop closes all active HTTP connections and shuts down the server. func StopHttp() { - if httpListener != nil { - httpListener.Stop() - httpListener = nil + httpServerMu.Lock() + defer httpServerMu.Unlock() + if httpServer != nil { + httpServer.Close() + httpServer = nil + } +} + +func listenHTTP(addr string, h http.Handler) (*stopServer, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + l = fdtrack.WrapListener("rpc", l) + s := &stopServer{l: l, idle: make(map[net.Conn]struct{})} + s.Server = &http.Server{ + Addr: addr, + Handler: h, + ReadTimeout: serverReadTimeout, + WriteTimeout: serverWriteTimeout, + ConnState: s.connState, + } + go s.Serve(l) + return s, nil +} + +func (s *stopServer) connState(c net.Conn, state http.ConnState) { + s.mu.Lock() + defer s.mu.Unlock() + // Close c immediately if we're past shutdown. + if s.shutdown { + if state != http.StateClosed { + c.Close() + } + return + } + if state == http.StateIdle { + s.idle[c] = struct{}{} + } else { + delete(s.idle, c) + } +} + +func (s *stopServer) Close() { + s.mu.Lock() + defer s.mu.Unlock() + // Shut down the acceptor. No new connections can be created. + s.l.Close() + // Drop all idle connections. Non-idle connections will be + // closed by connState as soon as they become idle. + s.shutdown = true + for c := range s.idle { + glog.V(logger.Detail).Infof("closing idle connection %v", c.RemoteAddr()) + c.Close() + delete(s.idle, c) } } diff --git a/rpc/comms/http_net.go b/rpc/comms/http_net.go deleted file mode 100644 index dba2029d4..000000000 --- a/rpc/comms/http_net.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package comms - -import ( - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net" - "net/http" - "time" - - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc/codec" - "github.com/ethereum/go-ethereum/rpc/shared" -) - -// When https://github.com/golang/go/issues/4674 is implemented this could be replaced -type stoppableTCPListener struct { - *net.TCPListener - stop chan struct{} // closed when the listener must stop -} - -func newStoppableTCPListener(addr string) (*stoppableTCPListener, error) { - wl, err := net.Listen("tcp", addr) - if err != nil { - return nil, err - } - - if tcpl, ok := wl.(*net.TCPListener); ok { - stop := make(chan struct{}) - return &stoppableTCPListener{tcpl, stop}, nil - } - - return nil, fmt.Errorf("Unable to create TCP listener for RPC service") -} - -// Stop the listener and all accepted and still active connections. -func (self *stoppableTCPListener) Stop() { - close(self.stop) -} - -func (self *stoppableTCPListener) Accept() (net.Conn, error) { - for { - self.SetDeadline(time.Now().Add(time.Duration(1 * time.Second))) - c, err := self.TCPListener.AcceptTCP() - - select { - case <-self.stop: - if c != nil { // accept timeout - c.Close() - } - self.TCPListener.Close() - return nil, listenerStoppedError - default: - } - - if err != nil { - if netErr, ok := err.(net.Error); ok && netErr.Timeout() && netErr.Temporary() { - continue // regular timeout - } - } - - return &closableConnection{c, self.stop}, err - } -} - -type closableConnection struct { - *net.TCPConn - closed chan struct{} -} - -func (self *closableConnection) Read(b []byte) (n int, err error) { - select { - case <-self.closed: - self.TCPConn.Close() - return 0, io.EOF - default: - return self.TCPConn.Read(b) - } -} - -// Wraps the default handler and checks if the RPC service was stopped. In that case it returns an -// error indicating that the service was stopped. This will only happen for connections which are -// kept open (HTTP keep-alive) when the RPC service was shutdown. -func newStoppableHandler(h http.Handler, stop chan struct{}) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - select { - case <-stop: - w.Header().Set("Content-Type", "application/json") - err := fmt.Errorf("RPC service stopped") - response := shared.NewRpcResponse(-1, shared.JsonRpcVersion, nil, err) - httpSend(w, response) - default: - h.ServeHTTP(w, r) - } - }) -} - -func httpSend(writer io.Writer, v interface{}) (n int, err error) { - var payload []byte - payload, err = json.MarshalIndent(v, "", "\t") - if err != nil { - glog.V(logger.Error).Infoln("Error marshalling JSON", err) - return 0, err - } - glog.V(logger.Detail).Infof("Sending payload: %s", payload) - - return writer.Write(payload) -} - -func gethHttpHandler(codec codec.Codec, a shared.EthereumApi) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.Header().Set("Content-Type", "application/json") - - // Limit request size to resist DoS - if req.ContentLength > maxHttpSizeReqLength { - err := fmt.Errorf("Request too large") - response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) - httpSend(w, &response) - return - } - - defer req.Body.Close() - payload, err := ioutil.ReadAll(req.Body) - if err != nil { - err := fmt.Errorf("Could not read request body") - response := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32700, err) - httpSend(w, &response) - return - } - - c := codec.New(nil) - var rpcReq shared.Request - if err = c.Decode(payload, &rpcReq); err == nil { - reply, err := a.Execute(&rpcReq) - res := shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) - httpSend(w, &res) - return - } - - var reqBatch []shared.Request - if err = c.Decode(payload, &reqBatch); err == nil { - resBatch := make([]*interface{}, len(reqBatch)) - resCount := 0 - - for i, rpcReq := range reqBatch { - reply, err := a.Execute(&rpcReq) - if rpcReq.Id != nil { // this leaves nil entries in the response batch for later removal - resBatch[i] = shared.NewRpcResponse(rpcReq.Id, rpcReq.Jsonrpc, reply, err) - resCount += 1 - } - } - - // make response omitting nil entries - resBatch = resBatch[:resCount] - httpSend(w, resBatch) - return - } - - // invalid request - err = fmt.Errorf("Could not decode request") - res := shared.NewRpcErrorResponse(-1, shared.JsonRpcVersion, -32600, err) - httpSend(w, res) - }) -} diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index aff90cfaa..432bf93b5 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -22,6 +22,7 @@ import ( "net" "os" + "github.com/ethereum/go-ethereum/fdtrack" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" @@ -50,15 +51,16 @@ func (self *ipcClient) reconnect() error { func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error { os.Remove(cfg.Endpoint) // in case it still exists from a previous run - l, err := net.ListenUnix("unix", &net.UnixAddr{Name: cfg.Endpoint, Net: "unix"}) + l, err := net.Listen("unix", cfg.Endpoint) if err != nil { return err } + l = fdtrack.WrapListener("ipc", l) os.Chmod(cfg.Endpoint, 0600) go func() { for { - conn, err := l.AcceptUnix() + conn, err := l.Accept() if err != nil { glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) continue |