aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/api/args_test.go12
-rw-r--r--rpc/api/eth_args.go21
-rw-r--r--rpc/comms/comms.go11
-rw-r--r--rpc/comms/ipc.go43
-rw-r--r--rpc/comms/ipc_unix.go40
-rw-r--r--rpc/comms/ipc_windows.go36
6 files changed, 79 insertions, 84 deletions
diff --git a/rpc/api/args_test.go b/rpc/api/args_test.go
index 23ae2930d..130315bd9 100644
--- a/rpc/api/args_test.go
+++ b/rpc/api/args_test.go
@@ -1394,13 +1394,10 @@ func TestBlockFilterArgsDefaults(t *testing.T) {
}
func TestBlockFilterArgsWords(t *testing.T) {
- input := `[{
- "fromBlock": "latest",
- "toBlock": "pending"
- }]`
+ input := `[{"fromBlock": "latest", "toBlock": "latest"}]`
expected := new(BlockFilterArgs)
expected.Earliest = -1
- expected.Latest = -2
+ expected.Latest = -1
args := new(BlockFilterArgs)
if err := json.Unmarshal([]byte(input), &args); err != nil {
@@ -1411,8 +1408,9 @@ func TestBlockFilterArgsWords(t *testing.T) {
t.Errorf("Earliest shoud be %#v but is %#v", expected.Earliest, args.Earliest)
}
- if expected.Latest != args.Latest {
- t.Errorf("Latest shoud be %#v but is %#v", expected.Latest, args.Latest)
+ input = `[{"toBlock": "pending"}]`
+ if err := json.Unmarshal([]byte(input), &args); err == nil {
+ t.Errorf("Pending isn't currently supported and should raise an unsupported error")
}
}
diff --git a/rpc/api/eth_args.go b/rpc/api/eth_args.go
index 6aca6a663..ed3d761f1 100644
--- a/rpc/api/eth_args.go
+++ b/rpc/api/eth_args.go
@@ -626,7 +626,12 @@ func (args *GetBlockByHashArgs) UnmarshalJSON(b []byte) (err error) {
args.IncludeTxs = obj[1].(bool)
- return nil
+ if inclTx, ok := obj[1].(bool); ok {
+ args.IncludeTxs = inclTx
+ return nil
+ }
+
+ return shared.NewInvalidTypeError("includeTxs", "not a bool")
}
type GetBlockByNumberArgs struct {
@@ -648,9 +653,12 @@ func (args *GetBlockByNumberArgs) UnmarshalJSON(b []byte) (err error) {
return err
}
- args.IncludeTxs = obj[1].(bool)
+ if inclTx, ok := obj[1].(bool); ok {
+ args.IncludeTxs = inclTx
+ return nil
+ }
- return nil
+ return shared.NewInvalidTypeError("includeTxs", "not a bool")
}
type BlockFilterArgs struct {
@@ -714,6 +722,13 @@ func (args *BlockFilterArgs) UnmarshalJSON(b []byte) (err error) {
return err
}
}
+
+ if num == -2 {
+ return fmt.Errorf("\"pending\" is unsupported")
+ } else if num < -2 {
+ return fmt.Errorf("Invalid to block number")
+ }
+
args.Latest = num
if obj[0].Limit == nil {
diff --git a/rpc/comms/comms.go b/rpc/comms/comms.go
index 731b2f62e..61fba5722 100644
--- a/rpc/comms/comms.go
+++ b/rpc/comms/comms.go
@@ -62,13 +62,18 @@ type EthereumClient interface {
func handle(id int, conn net.Conn, api shared.EthereumApi, c codec.Codec) {
codec := c.New(conn)
+ defer func() {
+ if r := recover(); r != nil {
+ glog.Errorf("panic: %v\n", r)
+ }
+ codec.Close()
+ }()
+
for {
requests, isBatch, err := codec.ReadRequest()
if err == io.EOF {
- codec.Close()
return
} else if err != nil {
- codec.Close()
glog.V(logger.Debug).Infof("Closed IPC Conn %06d recv err - %v\n", id, err)
return
}
@@ -87,7 +92,6 @@ func handle(id int, conn net.Conn, api shared.EthereumApi, c codec.Codec) {
err = codec.WriteResponse(responses[:responseCount])
if err != nil {
- codec.Close()
glog.V(logger.Debug).Infof("Closed IPC Conn %06d send err - %v\n", id, err)
return
}
@@ -98,7 +102,6 @@ func handle(id int, conn net.Conn, api shared.EthereumApi, c codec.Codec) {
rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err)
err = codec.WriteResponse(rpcResponse)
if err != nil {
- codec.Close()
glog.V(logger.Debug).Infof("Closed IPC Conn %06d send err - %v\n", id, err)
return
}
diff --git a/rpc/comms/ipc.go b/rpc/comms/ipc.go
index 3de659b65..882d62ab4 100644
--- a/rpc/comms/ipc.go
+++ b/rpc/comms/ipc.go
@@ -20,13 +20,22 @@ import (
"fmt"
"math/rand"
"net"
+ "os"
"encoding/json"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc/codec"
"github.com/ethereum/go-ethereum/rpc/shared"
)
+type Stopper interface {
+ Stop()
+}
+
+type InitFunc func(conn net.Conn) (Stopper, shared.EthereumApi, error)
+
type IpcConfig struct {
Endpoint string
}
@@ -90,8 +99,38 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) {
}
// Start IPC server
-func StartIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error {
- return startIpc(cfg, codec, initializer)
+func StartIpc(cfg IpcConfig, codec codec.Codec, initializer InitFunc) error {
+ l, err := ipcListen(cfg)
+ if err != nil {
+ return err
+ }
+ go ipcLoop(cfg, codec, initializer, l)
+ return nil
+}
+
+func ipcLoop(cfg IpcConfig, codec codec.Codec, initializer InitFunc, l net.Listener) {
+ glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint)
+ defer os.Remove(cfg.Endpoint)
+ defer l.Close()
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ glog.V(logger.Debug).Infof("accept: %v", err)
+ return
+ }
+ id := newIpcConnId()
+ go func() {
+ defer conn.Close()
+ glog.V(logger.Debug).Infof("new connection with id %06d started", id)
+ stopper, api, err := initializer(conn)
+ if err != nil {
+ glog.V(logger.Error).Infof("Unable to initialize IPC connection: %v", err)
+ return
+ }
+ defer stopper.Stop()
+ handle(id, conn, api, codec)
+ }()
+ }
}
func newIpcConnId() int {
diff --git a/rpc/comms/ipc_unix.go b/rpc/comms/ipc_unix.go
index d68363a45..4b839572a 100644
--- a/rpc/comms/ipc_unix.go
+++ b/rpc/comms/ipc_unix.go
@@ -23,8 +23,6 @@ import (
"os"
"path/filepath"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc/codec"
"github.com/ethereum/go-ethereum/rpc/shared"
"github.com/ethereum/go-ethereum/rpc/useragent"
@@ -69,44 +67,16 @@ func (self *ipcClient) reconnect() error {
return err
}
-func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error {
+func ipcListen(cfg IpcConfig) (net.Listener, error) {
// Ensure the IPC path exists and remove any previous leftover
if err := os.MkdirAll(filepath.Dir(cfg.Endpoint), 0751); err != nil {
- return err
+ return nil, err
}
os.Remove(cfg.Endpoint)
-
- l, err := net.ListenUnix("unix", &net.UnixAddr{Name: cfg.Endpoint, Net: "unix"})
+ l, err := net.Listen("unix", cfg.Endpoint)
if err != nil {
- return err
+ return nil, err
}
os.Chmod(cfg.Endpoint, 0600)
-
- go func() {
- for {
- conn, err := l.AcceptUnix()
- if err != nil {
- glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err)
- continue
- }
-
- id := newIpcConnId()
- glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id)
-
- api, err := initializer(conn)
- if err != nil {
- glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err)
- conn.Close()
- continue
- }
-
- go handle(id, conn, api, codec)
- }
-
- os.Remove(cfg.Endpoint)
- }()
-
- glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint)
-
- return nil
+ return l, nil
}
diff --git a/rpc/comms/ipc_windows.go b/rpc/comms/ipc_windows.go
index 47edd9e5b..e25fba253 100644
--- a/rpc/comms/ipc_windows.go
+++ b/rpc/comms/ipc_windows.go
@@ -28,8 +28,6 @@ import (
"time"
"unsafe"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/rpc/codec"
"github.com/ethereum/go-ethereum/rpc/shared"
"github.com/ethereum/go-ethereum/rpc/useragent"
@@ -688,40 +686,12 @@ func (self *ipcClient) reconnect() error {
return err
}
-func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error {
+func ipcListen(cfg IpcConfig) (net.Listener, error) {
os.Remove(cfg.Endpoint) // in case it still exists from a previous run
-
l, err := Listen(cfg.Endpoint)
if err != nil {
- return err
+ return nil, err
}
os.Chmod(cfg.Endpoint, 0600)
-
- go func() {
- for {
- conn, err := l.Accept()
- if err != nil {
- glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err)
- continue
- }
-
- id := newIpcConnId()
- glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id)
-
- api, err := initializer(conn)
- if err != nil {
- glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err)
- conn.Close()
- continue
- }
-
- go handle(id, conn, api, codec)
- }
-
- os.Remove(cfg.Endpoint)
- }()
-
- glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint)
-
- return nil
+ return l, nil
}