diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/api/args_test.go | 12 | ||||
-rw-r--r-- | rpc/api/eth_args.go | 21 | ||||
-rw-r--r-- | rpc/comms/comms.go | 11 | ||||
-rw-r--r-- | rpc/comms/ipc.go | 43 | ||||
-rw-r--r-- | rpc/comms/ipc_unix.go | 40 | ||||
-rw-r--r-- | rpc/comms/ipc_windows.go | 36 |
6 files changed, 79 insertions, 84 deletions
diff --git a/rpc/api/args_test.go b/rpc/api/args_test.go index 23ae2930d..130315bd9 100644 --- a/rpc/api/args_test.go +++ b/rpc/api/args_test.go @@ -1394,13 +1394,10 @@ func TestBlockFilterArgsDefaults(t *testing.T) { } func TestBlockFilterArgsWords(t *testing.T) { - input := `[{ - "fromBlock": "latest", - "toBlock": "pending" - }]` + input := `[{"fromBlock": "latest", "toBlock": "latest"}]` expected := new(BlockFilterArgs) expected.Earliest = -1 - expected.Latest = -2 + expected.Latest = -1 args := new(BlockFilterArgs) if err := json.Unmarshal([]byte(input), &args); err != nil { @@ -1411,8 +1408,9 @@ func TestBlockFilterArgsWords(t *testing.T) { t.Errorf("Earliest shoud be %#v but is %#v", expected.Earliest, args.Earliest) } - if expected.Latest != args.Latest { - t.Errorf("Latest shoud be %#v but is %#v", expected.Latest, args.Latest) + input = `[{"toBlock": "pending"}]` + if err := json.Unmarshal([]byte(input), &args); err == nil { + t.Errorf("Pending isn't currently supported and should raise an unsupported error") } } diff --git a/rpc/api/eth_args.go b/rpc/api/eth_args.go index 6aca6a663..ed3d761f1 100644 --- a/rpc/api/eth_args.go +++ b/rpc/api/eth_args.go @@ -626,7 +626,12 @@ func (args *GetBlockByHashArgs) UnmarshalJSON(b []byte) (err error) { args.IncludeTxs = obj[1].(bool) - return nil + if inclTx, ok := obj[1].(bool); ok { + args.IncludeTxs = inclTx + return nil + } + + return shared.NewInvalidTypeError("includeTxs", "not a bool") } type GetBlockByNumberArgs struct { @@ -648,9 +653,12 @@ func (args *GetBlockByNumberArgs) UnmarshalJSON(b []byte) (err error) { return err } - args.IncludeTxs = obj[1].(bool) + if inclTx, ok := obj[1].(bool); ok { + args.IncludeTxs = inclTx + return nil + } - return nil + return shared.NewInvalidTypeError("includeTxs", "not a bool") } type BlockFilterArgs struct { @@ -714,6 +722,13 @@ func (args *BlockFilterArgs) UnmarshalJSON(b []byte) (err error) { return err } } + + if num == -2 { + return fmt.Errorf("\"pending\" is unsupported") + } else if num < -2 { + return fmt.Errorf("Invalid to block number") + } + args.Latest = num if obj[0].Limit == nil { diff --git a/rpc/comms/comms.go b/rpc/comms/comms.go index 731b2f62e..61fba5722 100644 --- a/rpc/comms/comms.go +++ b/rpc/comms/comms.go @@ -62,13 +62,18 @@ type EthereumClient interface { func handle(id int, conn net.Conn, api shared.EthereumApi, c codec.Codec) { codec := c.New(conn) + defer func() { + if r := recover(); r != nil { + glog.Errorf("panic: %v\n", r) + } + codec.Close() + }() + for { requests, isBatch, err := codec.ReadRequest() if err == io.EOF { - codec.Close() return } else if err != nil { - codec.Close() glog.V(logger.Debug).Infof("Closed IPC Conn %06d recv err - %v\n", id, err) return } @@ -87,7 +92,6 @@ func handle(id int, conn net.Conn, api shared.EthereumApi, c codec.Codec) { err = codec.WriteResponse(responses[:responseCount]) if err != nil { - codec.Close() glog.V(logger.Debug).Infof("Closed IPC Conn %06d send err - %v\n", id, err) return } @@ -98,7 +102,6 @@ func handle(id int, conn net.Conn, api shared.EthereumApi, c codec.Codec) { rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err) err = codec.WriteResponse(rpcResponse) if err != nil { - codec.Close() glog.V(logger.Debug).Infof("Closed IPC Conn %06d send err - %v\n", id, err) return } diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go index 3de659b65..882d62ab4 100644 --- a/rpc/comms/ipc.go +++ b/rpc/comms/ipc.go @@ -20,13 +20,22 @@ import ( "fmt" "math/rand" "net" + "os" "encoding/json" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" ) +type Stopper interface { + Stop() +} + +type InitFunc func(conn net.Conn) (Stopper, shared.EthereumApi, error) + type IpcConfig struct { Endpoint string } @@ -90,8 +99,38 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) { } // Start IPC server -func StartIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { - return startIpc(cfg, codec, initializer) +func StartIpc(cfg IpcConfig, codec codec.Codec, initializer InitFunc) error { + l, err := ipcListen(cfg) + if err != nil { + return err + } + go ipcLoop(cfg, codec, initializer, l) + return nil +} + +func ipcLoop(cfg IpcConfig, codec codec.Codec, initializer InitFunc, l net.Listener) { + glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) + defer os.Remove(cfg.Endpoint) + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + glog.V(logger.Debug).Infof("accept: %v", err) + return + } + id := newIpcConnId() + go func() { + defer conn.Close() + glog.V(logger.Debug).Infof("new connection with id %06d started", id) + stopper, api, err := initializer(conn) + if err != nil { + glog.V(logger.Error).Infof("Unable to initialize IPC connection: %v", err) + return + } + defer stopper.Stop() + handle(id, conn, api, codec) + }() + } } func newIpcConnId() int { diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go index d68363a45..4b839572a 100644 --- a/rpc/comms/ipc_unix.go +++ b/rpc/comms/ipc_unix.go @@ -23,8 +23,6 @@ import ( "os" "path/filepath" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/rpc/useragent" @@ -69,44 +67,16 @@ func (self *ipcClient) reconnect() error { return err } -func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { +func ipcListen(cfg IpcConfig) (net.Listener, error) { // Ensure the IPC path exists and remove any previous leftover if err := os.MkdirAll(filepath.Dir(cfg.Endpoint), 0751); err != nil { - return err + return nil, err } os.Remove(cfg.Endpoint) - - l, err := net.ListenUnix("unix", &net.UnixAddr{Name: cfg.Endpoint, Net: "unix"}) + l, err := net.Listen("unix", cfg.Endpoint) if err != nil { - return err + return nil, err } os.Chmod(cfg.Endpoint, 0600) - - go func() { - for { - conn, err := l.AcceptUnix() - if err != nil { - glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) - continue - } - - id := newIpcConnId() - glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) - - api, err := initializer(conn) - if err != nil { - glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err) - conn.Close() - continue - } - - go handle(id, conn, api, codec) - } - - os.Remove(cfg.Endpoint) - }() - - glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) - - return nil + return l, nil } diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go index 47edd9e5b..e25fba253 100644 --- a/rpc/comms/ipc_windows.go +++ b/rpc/comms/ipc_windows.go @@ -28,8 +28,6 @@ import ( "time" "unsafe" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/rpc/useragent" @@ -688,40 +686,12 @@ func (self *ipcClient) reconnect() error { return err } -func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error { +func ipcListen(cfg IpcConfig) (net.Listener, error) { os.Remove(cfg.Endpoint) // in case it still exists from a previous run - l, err := Listen(cfg.Endpoint) if err != nil { - return err + return nil, err } os.Chmod(cfg.Endpoint, 0600) - - go func() { - for { - conn, err := l.Accept() - if err != nil { - glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err) - continue - } - - id := newIpcConnId() - glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id) - - api, err := initializer(conn) - if err != nil { - glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err) - conn.Close() - continue - } - - go handle(id, conn, api, codec) - } - - os.Remove(cfg.Endpoint) - }() - - glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint) - - return nil + return l, nil } |