diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/client.go | 740 | ||||
-rw-r--r-- | rpc/client_context_go1.4.go | 60 | ||||
-rw-r--r-- | rpc/client_context_go1.5.go | 61 | ||||
-rw-r--r-- | rpc/client_context_go1.6.go | 55 | ||||
-rw-r--r-- | rpc/client_context_go1.7.go | 51 | ||||
-rw-r--r-- | rpc/client_example_test.go | 83 | ||||
-rw-r--r-- | rpc/client_test.go | 489 | ||||
-rw-r--r-- | rpc/errors.go | 63 | ||||
-rw-r--r-- | rpc/http.go | 163 | ||||
-rw-r--r-- | rpc/inproc.go | 49 | ||||
-rw-r--r-- | rpc/ipc.go | 79 | ||||
-rw-r--r-- | rpc/ipc_unix.go | 6 | ||||
-rw-r--r-- | rpc/ipc_windows.go | 15 | ||||
-rw-r--r-- | rpc/json.go | 77 | ||||
-rw-r--r-- | rpc/notification.go | 2 | ||||
-rw-r--r-- | rpc/notification_test.go | 45 | ||||
-rw-r--r-- | rpc/server.go | 24 | ||||
-rw-r--r-- | rpc/server_test.go | 14 | ||||
-rw-r--r-- | rpc/types.go | 32 | ||||
-rw-r--r-- | rpc/utils.go | 29 | ||||
-rw-r--r-- | rpc/websocket.go | 160 |
21 files changed, 1884 insertions, 413 deletions
diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 000000000..4ff9a8cb9 --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,740 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rpc + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net" + "net/url" + "reflect" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" +) + +var ( + ErrClientQuit = errors.New("client is closed") + ErrNoResult = errors.New("no result in JSON-RPC response") +) + +const ( + clientSubscriptionBuffer = 100 // if exceeded, the client stops reading + tcpKeepAliveInterval = 30 * time.Second + defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline + defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline + subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls +) + +// BatchElem is an element in a batch request. +type BatchElem struct { + Method string + Args []interface{} + // The result is unmarshaled into this field. Result must be set to a + // non-nil pointer value of the desired type, otherwise the response will be + // discarded. + Result interface{} + // Error is set if the server returns an error for this request, or if + // unmarshaling into Result fails. It is not set for I/O errors. + Error error +} + +// A value of this type can a JSON-RPC request, notification, successful response or +// error response. Which one it is depends on the fields. +type jsonrpcMessage struct { + Version string `json:"jsonrpc"` + ID json.RawMessage `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Error *jsonError `json:"error,omitempty"` + Result json.RawMessage `json:"result,omitempty"` +} + +func (msg *jsonrpcMessage) isNotification() bool { + return msg.ID == nil && msg.Method != "" +} + +func (msg *jsonrpcMessage) isResponse() bool { + return msg.hasValidID() && msg.Method == "" && len(msg.Params) == 0 +} + +func (msg *jsonrpcMessage) hasValidID() bool { + return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '[' +} + +func (msg *jsonrpcMessage) String() string { + b, _ := json.Marshal(msg) + return string(b) +} + +// Client represents a connection to an RPC server. +type Client struct { + idCounter uint32 + connectFunc func(ctx context.Context) (net.Conn, error) + isHTTP bool + + // writeConn is only safe to access outside dispatch, with the + // write lock held. The write lock is taken by sending on + // requestOp and released by sending on sendDone. + writeConn net.Conn + + // for dispatch + close chan struct{} + didQuit chan struct{} // closed when client quits + reconnected chan net.Conn // where write/reconnect sends the new connection + readErr chan error // errors from read + readResp chan []*jsonrpcMessage // valid messages from read + requestOp chan *requestOp // for registering response IDs + sendDone chan error // signals write completion, releases write lock + respWait map[string]*requestOp // active requests + subs map[string]*ClientSubscription // active subscriptions +} + +type requestOp struct { + ids []json.RawMessage + err error + resp chan *jsonrpcMessage // receives up to len(ids) responses + sub *ClientSubscription // only set for EthSubscribe requests +} + +func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-op.resp: + return resp, op.err + } +} + +// Dial creates a new client for the given URL. +// +// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a +// file name with no URL scheme, a local socket connection is established using UNIX +// domain sockets on supported platforms and named pipes on Windows. If you want to +// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead. +// +// For websocket connections, the origin is set to the local host name. +// +// The client reconnects automatically if the connection is lost. +func Dial(rawurl string) (*Client, error) { + return DialContext(context.Background(), rawurl) +} + +// DialContext creates a new RPC client, just like Dial. +// +// The context is used to cancel or time out the initial connection establishment. It does +// not affect subsequent interactions with the client. +func DialContext(ctx context.Context, rawurl string) (*Client, error) { + u, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + switch u.Scheme { + case "http", "https": + return DialHTTP(rawurl) + case "ws", "wss": + return DialWebsocket(ctx, rawurl, "") + case "": + return DialIPC(ctx, rawurl) + default: + return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) + } +} + +func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) { + conn, err := connectFunc(initctx) + if err != nil { + return nil, err + } + _, isHTTP := conn.(*httpConn) + + c := &Client{ + writeConn: conn, + isHTTP: isHTTP, + connectFunc: connectFunc, + close: make(chan struct{}), + didQuit: make(chan struct{}), + reconnected: make(chan net.Conn), + readErr: make(chan error), + readResp: make(chan []*jsonrpcMessage), + requestOp: make(chan *requestOp), + sendDone: make(chan error, 1), + respWait: make(map[string]*requestOp), + subs: make(map[string]*ClientSubscription), + } + if !isHTTP { + go c.dispatch(conn) + } + return c, nil +} + +func (c *Client) nextID() json.RawMessage { + id := atomic.AddUint32(&c.idCounter, 1) + return []byte(strconv.FormatUint(uint64(id), 10)) +} + +// SupportedModules calls the rpc_modules method, retrieving the list of +// APIs that are available on the server. +func (c *Client) SupportedModules() (map[string]string, error) { + var result map[string]string + ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + defer cancel() + err := c.CallContext(ctx, &result, "rpc_modules") + return result, err +} + +// Close closes the client, aborting any in-flight requests. +func (c *Client) Close() { + if c.isHTTP { + return + } + select { + case c.close <- struct{}{}: + <-c.didQuit + case <-c.didQuit: + } +} + +// Call performs a JSON-RPC call with the given arguments and unmarshals into +// result if no error occurred. +// +// The result must be a pointer so that package json can unmarshal into it. You +// can also pass nil, in which case the result is ignored. +func (c *Client) Call(result interface{}, method string, args ...interface{}) error { + ctx := context.Background() + return c.CallContext(ctx, result, method, args...) +} + +// CallContext performs a JSON-RPC call with the given arguments. If the context is +// canceled before the call has successfully returned, CallContext returns immediately. +// +// The result must be a pointer so that package json can unmarshal into it. You +// can also pass nil, in which case the result is ignored. +func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + msg, err := c.newMessage(method, args...) + if err != nil { + return err + } + op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} + + if c.isHTTP { + err = c.sendHTTP(ctx, op, msg) + } else { + err = c.send(ctx, op, msg) + } + if err != nil { + return err + } + + // dispatch has accepted the request and will close the channel it when it quits. + switch resp, err := op.wait(ctx); { + case err != nil: + return err + case resp.Error != nil: + return resp.Error + case len(resp.Result) == 0: + return ErrNoResult + default: + return json.Unmarshal(resp.Result, &result) + } +} + +// BatchCall sends all given requests as a single batch and waits for the server +// to return a response for all of them. +// +// In contrast to Call, BatchCall only returns I/O errors. Any error specific to +// a request is reported through the Error field of the corresponding BatchElem. +// +// Note that batch calls may not be executed atomically on the server side. +func (c *Client) BatchCall(b []BatchElem) error { + ctx := context.Background() + return c.BatchCallContext(ctx, b) +} + +// BatchCall sends all given requests as a single batch and waits for the server +// to return a response for all of them. The wait duration is bounded by the +// context's deadline. +// +// In contrast to CallContext, BatchCallContext only returns I/O errors. Any +// error specific to a request is reported through the Error field of the +// corresponding BatchElem. +// +// Note that batch calls may not be executed atomically on the server side. +func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { + msgs := make([]*jsonrpcMessage, len(b)) + op := &requestOp{ + ids: make([]json.RawMessage, len(b)), + resp: make(chan *jsonrpcMessage, len(b)), + } + for i, elem := range b { + msg, err := c.newMessage(elem.Method, elem.Args...) + if err != nil { + return err + } + msgs[i] = msg + op.ids[i] = msg.ID + } + + var err error + if c.isHTTP { + err = c.sendBatchHTTP(ctx, op, msgs) + } else { + err = c.send(ctx, op, msgs) + } + + // Wait for all responses to come back. + for n := 0; n < len(b) && err == nil; n++ { + var resp *jsonrpcMessage + resp, err = op.wait(ctx) + if err != nil { + break + } + // Find the element corresponding to this response. + // The element is guaranteed to be present because dispatch + // only sends valid IDs to our channel. + var elem *BatchElem + for i := range msgs { + if bytes.Equal(msgs[i].ID, resp.ID) { + elem = &b[i] + break + } + } + if resp.Error != nil { + elem.Error = resp.Error + continue + } + if len(resp.Result) == 0 { + elem.Error = ErrNoResult + continue + } + elem.Error = json.Unmarshal(resp.Result, elem.Result) + } + return err +} + +// EthSubscribe calls the "eth_subscribe" method with the given arguments, +// registering a subscription. Server notifications for the subscription are +// sent to the given channel. The element type of the channel must match the +// expected type of content returned by the subscription. +// +// Callers should not use the same channel for multiple calls to EthSubscribe. +// The channel is closed when the notification is unsubscribed or an error +// occurs. The error can be retrieved via the Err method of the subscription. +// +// Slow subscribers will block the clients ingress path eventually. +func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) { + // Check type of channel first. + chanVal := reflect.ValueOf(channel) + if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { + panic("first argument to EthSubscribe must be a writable channel") + } + if chanVal.IsNil() { + panic("channel given to EthSubscribe must not be nil") + } + if c.isHTTP { + return nil, ErrNotificationsUnsupported + } + + msg, err := c.newMessage(subscribeMethod, args...) + if err != nil { + return nil, err + } + op := &requestOp{ + ids: []json.RawMessage{msg.ID}, + resp: make(chan *jsonrpcMessage), + sub: newClientSubscription(c, chanVal), + } + ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + defer cancel() + + // Send the subscription request. + // The arrival and validity of the response is signaled on sub.quit. + if err := c.send(ctx, op, msg); err != nil { + return nil, err + } + if _, err := op.wait(ctx); err != nil { + return nil, err + } + return op.sub, nil +} + +func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { + params, err := json.Marshal(paramsIn) + if err != nil { + return nil, err + } + return &jsonrpcMessage{Version: "2.0", ID: c.nextID(), Method: method, Params: params}, nil +} + +// send registers op with the dispatch loop, then sends msg on the connection. +// if sending fails, op is deregistered. +func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { + select { + case c.requestOp <- op: + if glog.V(logger.Detail) { + glog.Info("sending ", msg) + } + err := c.write(ctx, msg) + c.sendDone <- err + return err + case <-c.didQuit: + return ErrClientQuit + } +} + +func (c *Client) write(ctx context.Context, msg interface{}) error { + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultWriteTimeout) + } + // The previous write failed. Try to establish a new connection. + if c.writeConn == nil { + if err := c.reconnect(ctx); err != nil { + return err + } + } + c.writeConn.SetWriteDeadline(deadline) + err := json.NewEncoder(c.writeConn).Encode(msg) + if err != nil { + c.writeConn = nil + } + return err +} + +func (c *Client) reconnect(ctx context.Context) error { + newconn, err := c.connectFunc(ctx) + if err != nil { + glog.V(logger.Detail).Infof("reconnect failed: %v", err) + return err + } + select { + case c.reconnected <- newconn: + c.writeConn = newconn + return nil + case <-c.didQuit: + newconn.Close() + return ErrClientQuit + } +} + +// dispatch is the main loop of the client. +// It sends read messages to waiting calls to Call and BatchCall +// and subscription notifications to registered subscriptions. +func (c *Client) dispatch(conn net.Conn) { + // Spawn the initial read loop. + go c.read(conn) + + var ( + lastOp *requestOp // tracks last send operation + requestOpLock = c.requestOp // nil while the send lock is held + reading = true // if true, a read loop is running + ) + defer close(c.didQuit) + defer func() { + c.closeRequestOps(ErrClientQuit) + conn.Close() + if reading { + // Empty read channels until read is dead. + for { + select { + case <-c.readResp: + case <-c.readErr: + return + } + } + } + }() + + for { + select { + case <-c.close: + return + + // Read path. + case batch := <-c.readResp: + for _, msg := range batch { + switch { + case msg.isNotification(): + if glog.V(logger.Detail) { + glog.Info("<-readResp: notification ", msg) + } + c.handleNotification(msg) + case msg.isResponse(): + if glog.V(logger.Detail) { + glog.Info("<-readResp: response ", msg) + } + c.handleResponse(msg) + default: + if glog.V(logger.Debug) { + glog.Error("<-readResp: dropping weird message", msg) + } + // TODO: maybe close + } + } + + case err := <-c.readErr: + glog.V(logger.Debug).Infof("<-readErr: %v", err) + c.closeRequestOps(err) + conn.Close() + reading = false + + case newconn := <-c.reconnected: + glog.V(logger.Debug).Infof("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr()) + if reading { + // Wait for the previous read loop to exit. This is a rare case. + conn.Close() + <-c.readErr + } + go c.read(newconn) + reading = true + conn = newconn + + // Send path. + case op := <-requestOpLock: + // Stop listening for further send ops until the current one is done. + requestOpLock = nil + lastOp = op + for _, id := range op.ids { + c.respWait[string(id)] = op + } + + case err := <-c.sendDone: + if err != nil { + // Remove response handlers for the last send. We remove those here + // because the error is already handled in Call or BatchCall. When the + // read loop goes down, it will signal all other current operations. + for _, id := range lastOp.ids { + delete(c.respWait, string(id)) + } + } + // Listen for send ops again. + requestOpLock = c.requestOp + lastOp = nil + } + } +} + +// closeRequestOps unblocks pending send ops and active subscriptions. +func (c *Client) closeRequestOps(err error) { + didClose := make(map[*requestOp]bool) + + for id, op := range c.respWait { + // Remove the op so that later calls will not close op.resp again. + delete(c.respWait, id) + + if !didClose[op] { + op.err = err + close(op.resp) + didClose[op] = true + } + } + for id, sub := range c.subs { + delete(c.subs, id) + sub.quitWithError(err, false) + } +} + +func (c *Client) handleNotification(msg *jsonrpcMessage) { + if msg.Method != notificationMethod { + glog.V(logger.Debug).Info("dropping non-subscription message: ", msg) + return + } + var subResult struct { + ID string `json:"subscription"` + Result json.RawMessage `json:"result"` + } + if err := json.Unmarshal(msg.Params, &subResult); err != nil { + glog.V(logger.Debug).Info("dropping invalid subscription message: ", msg) + return + } + if c.subs[subResult.ID] != nil { + c.subs[subResult.ID].deliver(subResult.Result) + } +} + +func (c *Client) handleResponse(msg *jsonrpcMessage) { + op := c.respWait[string(msg.ID)] + if op == nil { + glog.V(logger.Debug).Infof("unsolicited response %v", msg) + return + } + delete(c.respWait, string(msg.ID)) + // For normal responses, just forward the reply to Call/BatchCall. + if op.sub == nil { + op.resp <- msg + return + } + // For subscription responses, start the subscription if the server + // indicates success. EthSubscribe gets unblocked in either case through + // the op.resp channel. + defer close(op.resp) + if msg.Error != nil { + op.err = msg.Error + return + } + if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { + go op.sub.start() + c.subs[op.sub.subid] = op.sub + } +} + +// Reading happens on a dedicated goroutine. + +func (c *Client) read(conn net.Conn) error { + var ( + buf json.RawMessage + dec = json.NewDecoder(conn) + ) + readMessage := func() (rs []*jsonrpcMessage, err error) { + buf = buf[:0] + if err = dec.Decode(&buf); err != nil { + return nil, err + } + if isBatch(buf) { + err = json.Unmarshal(buf, &rs) + } else { + rs = make([]*jsonrpcMessage, 1) + err = json.Unmarshal(buf, &rs[0]) + } + return rs, err + } + + for { + resp, err := readMessage() + if err != nil { + c.readErr <- err + return err + } + c.readResp <- resp + } +} + +// Subscriptions. + +// A ClientSubscription represents a subscription established through EthSubscribe. +type ClientSubscription struct { + client *Client + etype reflect.Type + channel reflect.Value + subid string + in chan json.RawMessage + + quitOnce sync.Once // ensures quit is closed once + quit chan struct{} // quit is closed when the subscription exits + errOnce sync.Once // ensures err is closed once + err chan error +} + +func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription { + sub := &ClientSubscription{ + client: c, + etype: channel.Type().Elem(), + channel: channel, + quit: make(chan struct{}), + err: make(chan error, 1), + // in is buffered so dispatch can continue even if the subscriber is slow. + in: make(chan json.RawMessage, clientSubscriptionBuffer), + } + return sub +} + +// Err returns the subscription error channel. The intended use of Err is to schedule +// resubscription when the client connection is closed unexpectedly. +// +// The error channel receives a value when the subscription has ended due +// to an error. The received error is ErrClientQuit if Close has been called +// on the underlying client and no other error has occurred. +// +// The error channel is closed when Unsubscribe is called on the subscription. +func (sub *ClientSubscription) Err() <-chan error { + return sub.err +} + +// Unsubscribe unsubscribes the notification and closes the error channel. +// It can safely be called more than once. +func (sub *ClientSubscription) Unsubscribe() { + sub.quitWithError(nil, true) + sub.errOnce.Do(func() { close(sub.err) }) +} + +func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { + sub.quitOnce.Do(func() { + if unsubscribeServer { + sub.requestUnsubscribe() + } + if err != nil { + sub.err <- err + } + close(sub.quit) + }) +} + +func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { + select { + case sub.in <- result: + return true + case <-sub.quit: + return false + } +} + +func (sub *ClientSubscription) start() { + sub.quitWithError(sub.forward()) +} + +func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { + cases := []reflect.SelectCase{ + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, + {Dir: reflect.SelectSend, Chan: sub.channel}, + } + for { + select { + case result := <-sub.in: + val, err := sub.unmarshal(result) + if err != nil { + return err, true + } + cases[1].Send = val + switch chosen, _, _ := reflect.Select(cases); chosen { + case 0: // <-sub.quit + return nil, false + case 1: // sub.channel<- + continue + } + case <-sub.quit: + return nil, false + } + } +} + +func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) { + val := reflect.New(sub.etype) + err := json.Unmarshal(result, val.Interface()) + return val.Elem(), err +} + +func (sub *ClientSubscription) requestUnsubscribe() error { + var result interface{} + return sub.client.Call(&result, unsubscribeMethod, sub.subid) +} diff --git a/rpc/client_context_go1.4.go b/rpc/client_context_go1.4.go new file mode 100644 index 000000000..ac956a17d --- /dev/null +++ b/rpc/client_context_go1.4.go @@ -0,0 +1,60 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build !go1.5 + +package rpc + +import ( + "net" + "net/http" + "time" + + "golang.org/x/net/context" +) + +// In older versions of Go (below 1.5), dials cannot be canceled +// via a channel or context. The context deadline can still applied. + +// contextDialer returns a dialer that applies the deadline value from the given context. +func contextDialer(ctx context.Context) *net.Dialer { + dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } else { + dialer.Deadline = time.Now().Add(defaultDialTimeout) + } + return dialer +} + +// dialContext connects to the given address, aborting the dial if ctx is canceled. +func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + return contextDialer(ctx).Dial(network, addr) +} + +// requestWithContext copies req, adding the cancelation channel and deadline from ctx. +func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) { + // Set Timeout on the client if the context has a deadline. + // Note that there is no default timeout (unlike in contextDialer) because + // the timeout applies to the entire request, including reads from body. + if deadline, ok := ctx.Deadline(); ok { + c2 := *c + c2.Timeout = deadline.Sub(time.Now()) + c = &c2 + } + req2 := *req + return c, &req2 +} diff --git a/rpc/client_context_go1.5.go b/rpc/client_context_go1.5.go new file mode 100644 index 000000000..4a007d9f8 --- /dev/null +++ b/rpc/client_context_go1.5.go @@ -0,0 +1,61 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build go1.5,!go1.6 + +package rpc + +import ( + "net" + "net/http" + "time" + + "golang.org/x/net/context" +) + +// In Go 1.5, dials cannot be canceled via a channel or context. The context deadline can +// still be applied. Go 1.5 adds the ability to cancel HTTP requests via a channel. + +// contextDialer returns a dialer that applies the deadline value from the given context. +func contextDialer(ctx context.Context) *net.Dialer { + dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } else { + dialer.Deadline = time.Now().Add(defaultDialTimeout) + } + return dialer +} + +// dialContext connects to the given address, aborting the dial if ctx is canceled. +func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + return contextDialer(ctx).Dial(network, addr) +} + +// requestWithContext copies req, adding the cancelation channel and deadline from ctx. +func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) { + // Set Timeout on the client if the context has a deadline. + // Note that there is no default timeout (unlike in contextDialer) because + // the timeout applies to the entire request, including reads from body. + if deadline, ok := ctx.Deadline(); ok { + c2 := *c + c2.Timeout = deadline.Sub(time.Now()) + c = &c2 + } + req2 := *req + req2.Cancel = ctx.Done() + return c, &req2 +} diff --git a/rpc/client_context_go1.6.go b/rpc/client_context_go1.6.go new file mode 100644 index 000000000..67777ddc6 --- /dev/null +++ b/rpc/client_context_go1.6.go @@ -0,0 +1,55 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build go1.6,!go1.7 + +package rpc + +import ( + "net" + "net/http" + "time" + + "golang.org/x/net/context" +) + +// In Go 1.6, net.Dialer gained the ability to cancel via a channel. + +// contextDialer returns a dialer that applies the deadline value from the given context. +func contextDialer(ctx context.Context) *net.Dialer { + dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } else { + dialer.Deadline = time.Now().Add(defaultDialTimeout) + } + return dialer +} + +// dialContext connects to the given address, aborting the dial if ctx is canceled. +func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + return contextDialer(ctx).Dial(network, addr) +} + +// requestWithContext copies req, adding the cancelation channel and deadline from ctx. +func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) { + // We set Timeout on the client for Go <= 1.5. There + // is no need to do that here because the dial will be canceled + // by package http. + req2 := *req + req2.Cancel = ctx.Done() + return c, &req2 +} diff --git a/rpc/client_context_go1.7.go b/rpc/client_context_go1.7.go new file mode 100644 index 000000000..56ce12ab8 --- /dev/null +++ b/rpc/client_context_go1.7.go @@ -0,0 +1,51 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// +build go1.7 + +package rpc + +import ( + "context" + "net" + "net/http" + "time" +) + +// In Go 1.7, context moved into the standard library and support +// for cancelation via context was added to net.Dialer and http.Request. + +// contextDialer returns a dialer that applies the deadline value from the given context. +func contextDialer(ctx context.Context) *net.Dialer { + dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } else { + dialer.Deadline = time.Now().Add(defaultDialTimeout) + } + return dialer +} + +// dialContext connects to the given address, aborting the dial if ctx is canceled. +func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + d := &net.Dialer{KeepAlive: tcpKeepAliveInterval} + return d.DialContext(ctx, network, addr) +} + +// requestWithContext copies req, adding the cancelation channel and deadline from ctx. +func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) { + return c, req.WithContext(ctx) +} diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go new file mode 100644 index 000000000..84b4b67bb --- /dev/null +++ b/rpc/client_example_test.go @@ -0,0 +1,83 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rpc_test + +import ( + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/rpc" +) + +// In this example, our client whishes to track the latest 'block number' +// known to the server. The server supports two methods: +// +// eth_getBlockByNumber("latest", {}) +// returns the latest block object. +// +// eth_subscribe("newBlocks") +// creates a subscription which fires block objects when new blocks arrive. + +type Block struct { + Number *big.Int +} + +func ExampleClientSubscription() { + // Connect the client. + client, _ := rpc.Dial("ws://127.0.0.1:8485") + subch := make(chan Block) + go subscribeBlocks(client, subch) + + // Print events from the subscription as they arrive. + for block := range subch { + fmt.Println("latest block:", block.Number) + } +} + +// subscribeBlocks runs in its own goroutine and maintains +// a subscription for new blocks. +func subscribeBlocks(client *rpc.Client, subch chan Block) { + for i := 0; ; i++ { + if i > 0 { + time.Sleep(2 * time.Second) + } + + // Subscribe to new blocks. + sub, err := client.EthSubscribe(subch, "newBlocks") + if err == rpc.ErrClientQuit { + return // Stop reconnecting if the client was closed. + } else if err != nil { + fmt.Println("subscribe error:", err) + continue + } + + // The connection is established now. + // Update the channel with the current block. + var lastBlock Block + if err := client.Call(&lastBlock, "eth_getBlockByNumber", "latest"); err != nil { + fmt.Println("can't get latest block:", err) + continue + } + subch <- lastBlock + + // The subscription will deliver events to the channel. Wait for the + // subscription to end for any reason, then loop around to re-establish + // the connection. + fmt.Println("connection lost: ", <-sub.Err()) + } +} diff --git a/rpc/client_test.go b/rpc/client_test.go new file mode 100644 index 000000000..58dceada0 --- /dev/null +++ b/rpc/client_test.go @@ -0,0 +1,489 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rpc + +import ( + "fmt" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "os" + "reflect" + "runtime" + "sync" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" +) + +func TestClientRequest(t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + var resp Result + if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) { + t.Errorf("incorrect result %#v", resp) + } +} + +func TestClientBatchRequest(t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + batch := []BatchElem{ + { + Method: "service_echo", + Args: []interface{}{"hello", 10, &Args{"world"}}, + Result: new(Result), + }, + { + Method: "service_echo", + Args: []interface{}{"hello2", 11, &Args{"world"}}, + Result: new(Result), + }, + { + Method: "no_such_method", + Args: []interface{}{1, 2, 3}, + Result: new(int), + }, + } + if err := client.BatchCall(batch); err != nil { + t.Fatal(err) + } + wantResult := []BatchElem{ + { + Method: "service_echo", + Args: []interface{}{"hello", 10, &Args{"world"}}, + Result: &Result{"hello", 10, &Args{"world"}}, + }, + { + Method: "service_echo", + Args: []interface{}{"hello2", 11, &Args{"world"}}, + Result: &Result{"hello2", 11, &Args{"world"}}, + }, + { + Method: "no_such_method", + Args: []interface{}{1, 2, 3}, + Result: new(int), + Error: &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"}, + }, + } + if !reflect.DeepEqual(batch, wantResult) { + t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult)) + } +} + +// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) } +func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) } +func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) } +func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) } + +// This test checks that requests made through CallContext can be canceled by canceling +// the context. +func testClientCancel(transport string, t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + + // What we want to achieve is that the context gets canceled + // at various stages of request processing. The interesting cases + // are: + // - cancel during dial + // - cancel while performing a HTTP request + // - cancel while waiting for a response + // + // To trigger those, the times are chosen such that connections + // are killed within the deadline for every other call (maxKillTimeout + // is 2x maxCancelTimeout). + // + // Once a connection is dead, there is a fair chance it won't connect + // successfully because the accept is delayed by 1s. + maxContextCancelTimeout := 300 * time.Millisecond + fl := &flakeyListener{ + maxAcceptDelay: 1 * time.Second, + maxKillTimeout: 600 * time.Millisecond, + } + + var client *Client + switch transport { + case "ws", "http": + c, hs := httpTestClient(server, transport, fl) + defer hs.Close() + client = c + case "ipc": + c, l := ipcTestClient(server, fl) + defer l.Close() + client = c + default: + panic("unknown transport: " + transport) + } + + // These tests take a lot of time, run them all at once. + // You probably want to run with -parallel 1 or comment out + // the call to t.Parallel if you enable the logging. + t.Parallel() + // glog.SetV(6) + // glog.SetToStderr(true) + // defer glog.SetToStderr(false) + // glog.Infoln("testing ", transport) + + // The actual test starts here. + var ( + wg sync.WaitGroup + nreqs = 10 + ncallers = 6 + ) + caller := func(index int) { + defer wg.Done() + for i := 0; i < nreqs; i++ { + var ( + ctx context.Context + cancel func() + timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout))) + ) + if index < ncallers/2 { + // For half of the callers, create a context without deadline + // and cancel it later. + ctx, cancel = context.WithCancel(context.Background()) + time.AfterFunc(timeout, cancel) + } else { + // For the other half, create a context with a deadline instead. This is + // different because the context deadline is used to set the socket write + // deadline. + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } + // Now perform a call with the context. + // The key thing here is that no call will ever complete successfully. + err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout) + if err != nil { + glog.V(logger.Debug).Infoln("got expected error:", err) + } else { + t.Errorf("no error for call with %v wait time", timeout) + } + cancel() + } + } + wg.Add(ncallers) + for i := 0; i < ncallers; i++ { + go caller(i) + } + wg.Wait() +} + +func TestClientSubscribeInvalidArg(t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + check := func(shouldPanic bool, arg interface{}) { + defer func() { + err := recover() + if shouldPanic && err == nil { + t.Errorf("EthSubscribe should've panicked for %#v", arg) + } + if !shouldPanic && err != nil { + t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg) + buf := make([]byte, 1024*1024) + buf = buf[:runtime.Stack(buf, false)] + t.Error(err) + t.Error(string(buf)) + } + }() + client.EthSubscribe(arg, "foo_bar") + } + check(true, nil) + check(true, 1) + check(true, (chan int)(nil)) + check(true, make(<-chan int)) + check(false, make(chan int)) + check(false, make(chan<- int)) +} + +func TestClientSubscribe(t *testing.T) { + server := newTestServer("eth", new(NotificationTestService)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + nc := make(chan int) + count := 10 + sub, err := client.EthSubscribe(nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + for i := 0; i < count; i++ { + if val := <-nc; val != i { + t.Fatalf("value mismatch: got %d, want %d", val, i) + } + } + + sub.Unsubscribe() + select { + case v := <-nc: + t.Fatal("received value after unsubscribe:", v) + case err := <-sub.Err(): + if err != nil { + t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err) + } + case <-time.After(1 * time.Second): + t.Fatalf("subscription not closed within 1s after unsubscribe") + } +} + +// In this test, the connection drops while EthSubscribe is +// waiting for a response. +func TestClientSubscribeClose(t *testing.T) { + service := &NotificationTestService{ + gotHangSubscriptionReq: make(chan struct{}), + unblockHangSubscription: make(chan struct{}), + } + server := newTestServer("eth", service) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + var ( + nc = make(chan int) + errc = make(chan error) + sub *ClientSubscription + err error + ) + go func() { + sub, err = client.EthSubscribe(nc, "hangSubscription", 999) + errc <- err + }() + + <-service.gotHangSubscriptionReq + client.Close() + service.unblockHangSubscription <- struct{}{} + + select { + case err := <-errc: + if err == nil { + t.Errorf("EthSubscribe returned nil error after Close") + } + if sub != nil { + t.Error("EthSubscribe returned non-nil subscription after Close") + } + case <-time.After(1 * time.Second): + t.Fatalf("EthSubscribe did not return within 1s after Close") + } +} + +func TestClientHTTP(t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + + client, hs := httpTestClient(server, "http", nil) + defer hs.Close() + defer client.Close() + + // Launch concurrent requests. + var ( + results = make([]Result, 100) + errc = make(chan error) + wantResult = Result{"a", 1, new(Args)} + ) + defer client.Close() + for i := range results { + i := i + go func() { + errc <- client.Call(&results[i], "service_echo", + wantResult.String, wantResult.Int, wantResult.Args) + }() + } + + // Wait for all of them to complete. + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + for i := range results { + select { + case err := <-errc: + if err != nil { + t.Fatal(err) + } + case <-timeout.C: + t.Fatalf("timeout (got %d/%d) results)", i+1, len(results)) + } + } + + // Check results. + for i := range results { + if !reflect.DeepEqual(results[i], wantResult) { + t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult) + } + } +} + +func TestClientReconnect(t *testing.T) { + startServer := func(addr string) (*Server, net.Listener) { + srv := newTestServer("service", new(Service)) + l, err := net.Listen("tcp", addr) + if err != nil { + t.Fatal(err) + } + go http.Serve(l, srv.WebsocketHandler("*")) + return srv, l + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Start a server and corresponding client. + s1, l1 := startServer("127.0.0.1:0") + client, err := DialContext(ctx, "ws://"+l1.Addr().String()) + if err != nil { + t.Fatal("can't dial", err) + } + + // Perform a call. This should work because the server is up. + var resp Result + if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil { + t.Fatal(err) + } + + // Shut down the server and try calling again. It shouldn't work. + l1.Close() + s1.Stop() + if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil { + t.Error("successful call while the server is down") + t.Logf("resp: %#v", resp) + } + + // Allow for some cool down time so we can listen on the same address again. + time.Sleep(2 * time.Second) + + // Start it up again and call again. The connection should be reestablished. + // We spawn multiple calls here to check whether this hangs somehow. + s2, l2 := startServer(l1.Addr().String()) + defer l2.Close() + defer s2.Stop() + + start := make(chan struct{}) + errors := make(chan error, 20) + for i := 0; i < cap(errors); i++ { + go func() { + <-start + var resp Result + errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil) + }() + } + close(start) + errcount := 0 + for i := 0; i < cap(errors); i++ { + if err = <-errors; err != nil { + errcount++ + } + } + t.Log("err:", err) + if errcount > 1 { + t.Errorf("expected one error after disconnect, got %d", errcount) + } +} + +func newTestServer(serviceName string, service interface{}) *Server { + server := NewServer() + if err := server.RegisterName(serviceName, service); err != nil { + panic(err) + } + return server +} + +func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) { + // Create the HTTP server. + var hs *httptest.Server + switch transport { + case "ws": + hs = httptest.NewUnstartedServer(srv.WebsocketHandler("*")) + case "http": + hs = httptest.NewUnstartedServer(srv) + default: + panic("unknown HTTP transport: " + transport) + } + // Wrap the listener if required. + if fl != nil { + fl.Listener = hs.Listener + hs.Listener = fl + } + // Connect the client. + hs.Start() + client, err := Dial(transport + "://" + hs.Listener.Addr().String()) + if err != nil { + panic(err) + } + return client, hs +} + +func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) { + // Listen on a random endpoint. + endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63()) + if runtime.GOOS == "windows" { + endpoint = `\\.\pipe\` + endpoint + } else { + endpoint = os.TempDir() + "/" + endpoint + } + l, err := ipcListen(endpoint) + if err != nil { + panic(err) + } + // Connect the listener to the server. + if fl != nil { + fl.Listener = l + l = fl + } + go srv.ServeListener(l) + // Connect the client. + client, err := Dial(endpoint) + if err != nil { + panic(err) + } + return client, l +} + +// flakeyListener kills accepted connections after a random timeout. +type flakeyListener struct { + net.Listener + maxKillTimeout time.Duration + maxAcceptDelay time.Duration +} + +func (l *flakeyListener) Accept() (net.Conn, error) { + delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay))) + time.Sleep(delay) + + c, err := l.Listener.Accept() + if err == nil { + timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout))) + time.AfterFunc(timeout, func() { + glog.V(logger.Debug).Infof("killing conn %v after %v", c.LocalAddr(), timeout) + c.Close() + }) + } + return c, err +} diff --git a/rpc/errors.go b/rpc/errors.go index bc352fc45..9cf9dc60c 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -24,74 +24,43 @@ type methodNotFoundError struct { method string } -func (e *methodNotFoundError) Code() int { - return -32601 -} +func (e *methodNotFoundError) ErrorCode() int { return -32601 } func (e *methodNotFoundError) Error() string { return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method) } // received message isn't a valid request -type invalidRequestError struct { - message string -} +type invalidRequestError struct{ message string } -func (e *invalidRequestError) Code() int { - return -32600 -} +func (e *invalidRequestError) ErrorCode() int { return -32600 } -func (e *invalidRequestError) Error() string { - return e.message -} +func (e *invalidRequestError) Error() string { return e.message } // received message is invalid -type invalidMessageError struct { - message string -} +type invalidMessageError struct{ message string } -func (e *invalidMessageError) Code() int { - return -32700 -} +func (e *invalidMessageError) ErrorCode() int { return -32700 } -func (e *invalidMessageError) Error() string { - return e.message -} +func (e *invalidMessageError) Error() string { return e.message } // unable to decode supplied params, or an invalid number of parameters -type invalidParamsError struct { - message string -} +type invalidParamsError struct{ message string } -func (e *invalidParamsError) Code() int { - return -32602 -} +func (e *invalidParamsError) ErrorCode() int { return -32602 } -func (e *invalidParamsError) Error() string { - return e.message -} +func (e *invalidParamsError) Error() string { return e.message } // logic error, callback returned an error -type callbackError struct { - message string -} +type callbackError struct{ message string } -func (e *callbackError) Code() int { - return -32000 -} +func (e *callbackError) ErrorCode() int { return -32000 } -func (e *callbackError) Error() string { - return e.message -} +func (e *callbackError) Error() string { return e.message } // issued when a request is received after the server is issued to stop. -type shutdownError struct { -} +type shutdownError struct{} -func (e *shutdownError) Code() int { - return -32000 -} +func (e *shutdownError) ErrorCode() int { return -32000 } -func (e *shutdownError) Error() string { - return "server is shutting down" -} +func (e *shutdownError) Error() string { return "server is shutting down" } diff --git a/rpc/http.go b/rpc/http.go index 9283ce0ec..afcdd4bd6 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -22,71 +22,108 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" - "net/url" "strings" + "sync" + "time" "github.com/rs/cors" + "golang.org/x/net/context" ) const ( maxHTTPRequestContentLength = 1024 * 128 ) -// httpClient connects to a geth RPC server over HTTP. -type httpClient struct { - endpoint *url.URL // HTTP-RPC server endpoint - httpClient http.Client // reuse connection - lastRes []byte // HTTP requests are synchronous, store last response +var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0") + +type httpConn struct { + client *http.Client + req *http.Request + closeOnce sync.Once + closed chan struct{} +} + +// httpConn is treated specially by Client. +func (hc *httpConn) LocalAddr() net.Addr { return nullAddr } +func (hc *httpConn) RemoteAddr() net.Addr { return nullAddr } +func (hc *httpConn) SetReadDeadline(time.Time) error { return nil } +func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil } +func (hc *httpConn) SetDeadline(time.Time) error { return nil } +func (hc *httpConn) Write([]byte) (int, error) { panic("Write called") } + +func (hc *httpConn) Read(b []byte) (int, error) { + <-hc.closed + return 0, io.EOF +} + +func (hc *httpConn) Close() error { + hc.closeOnce.Do(func() { close(hc.closed) }) + return nil } -// NewHTTPClient create a new RPC clients that connection to a geth RPC server -// over HTTP. -func NewHTTPClient(endpoint string) (Client, error) { - url, err := url.Parse(endpoint) +// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP. +func DialHTTP(endpoint string) (*Client, error) { + req, err := http.NewRequest("POST", endpoint, nil) if err != nil { return nil, err } - return &httpClient{endpoint: url}, nil -} + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") -// Send will serialize the given msg to JSON and sends it to the RPC server. -// Since HTTP is synchronous the response is stored until Recv is called. -func (client *httpClient) Send(msg interface{}) error { - var body []byte - var err error + initctx := context.Background() + return newClient(initctx, func(context.Context) (net.Conn, error) { + return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil + }) +} - client.lastRes = nil - if body, err = json.Marshal(msg); err != nil { +func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { + hc := c.writeConn.(*httpConn) + respBody, err := hc.doRequest(ctx, msg) + if err != nil { return err } + defer respBody.Close() + var respmsg jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { + return err + } + op.resp <- &respmsg + return nil +} - resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body)) +func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { + hc := c.writeConn.(*httpConn) + respBody, err := hc.doRequest(ctx, msgs) if err != nil { return err } - - defer resp.Body.Close() - if resp.StatusCode == http.StatusOK { - client.lastRes, err = ioutil.ReadAll(resp.Body) + defer respBody.Close() + var respmsgs []jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } - - return fmt.Errorf("request failed: %s", resp.Status) -} - -// Recv will try to deserialize the last received response into the given msg. -func (client *httpClient) Recv(msg interface{}) error { - return json.Unmarshal(client.lastRes, &msg) + for _, respmsg := range respmsgs { + op.resp <- &respmsg + } + return nil } -// Close is not necessary for httpClient -func (client *httpClient) Close() { -} +func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { + body, err := json.Marshal(msg) + if err != nil { + return nil, err + } + client, req := requestWithContext(hc.client, hc.req, ctx) + req.Body = ioutil.NopCloser(bytes.NewReader(body)) + req.ContentLength = int64(len(body)) -// SupportedModules will return the collection of offered RPC modules. -func (client *httpClient) SupportedModules() (map[string]string, error) { - return SupportedModules(client) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + return resp.Body, nil } // httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method. @@ -100,43 +137,39 @@ func (t *httpReadWriteNopCloser) Close() error { return nil } -// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests, -// send the request to the given API provider and sends the response back to the caller. -func newJSONHTTPHandler(srv *Server) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.ContentLength > maxHTTPRequestContentLength { - http.Error(w, - fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength), - http.StatusRequestEntityTooLarge) - return - } - - w.Header().Set("content-type", "application/json") - - // create a codec that reads direct from the request body until - // EOF and writes the response to w and order the server to process - // a single request. - codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) - defer codec.Close() - srv.ServeSingleRequest(codec, OptionMethodInvocation) +// NewHTTPServer creates a new HTTP RPC server around an API provider. +// +// Deprecated: Server implements http.Handler +func NewHTTPServer(corsString string, srv *Server) *http.Server { + return &http.Server{Handler: newCorsHandler(srv, corsString)} +} + +// ServeHTTP serves JSON-RPC requests over HTTP. +func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > maxHTTPRequestContentLength { + http.Error(w, + fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength), + http.StatusRequestEntityTooLarge) + return } + w.Header().Set("content-type", "application/json") + + // create a codec that reads direct from the request body until + // EOF and writes the response to w and order the server to process + // a single request. + codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) + defer codec.Close() + srv.ServeSingleRequest(codec, OptionMethodInvocation) } -// NewHTTPServer creates a new HTTP RPC server around an API provider. -func NewHTTPServer(corsString string, srv *Server) *http.Server { +func newCorsHandler(srv *Server, corsString string) http.Handler { var allowedOrigins []string for _, domain := range strings.Split(corsString, ",") { allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain)) } - c := cors.New(cors.Options{ AllowedOrigins: allowedOrigins, AllowedMethods: []string{"POST", "GET"}, }) - - handler := c.Handler(newJSONHTTPHandler(srv)) - - return &http.Server{ - Handler: handler, - } + return c.Handler(srv) } diff --git a/rpc/inproc.go b/rpc/inproc.go index 250f5c787..f72b97497 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -17,45 +17,18 @@ package rpc import ( - "encoding/json" - "io" "net" -) - -// inProcClient is an in-process buffer stream attached to an RPC server. -type inProcClient struct { - server *Server - cl io.Closer - enc *json.Encoder - dec *json.Decoder -} -// Close tears down the request channel of the in-proc client. -func (c *inProcClient) Close() { - c.cl.Close() -} - -// NewInProcRPCClient creates an in-process buffer stream attachment to a given -// RPC server. -func NewInProcRPCClient(handler *Server) Client { - p1, p2 := net.Pipe() - go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions) - return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)} -} - -// Send marshals a message into a json format and injects in into the client -// request channel. -func (c *inProcClient) Send(msg interface{}) error { - return c.enc.Encode(msg) -} - -// Recv reads a message from the response channel and tries to parse it into the -// given msg interface. -func (c *inProcClient) Recv(msg interface{}) error { - return c.dec.Decode(msg) -} + "golang.org/x/net/context" +) -// Returns the collection of modules the RPC server offers. -func (c *inProcClient) SupportedModules() (map[string]string, error) { - return SupportedModules(c) +// NewInProcClient attaches an in-process connection to the given RPC server. +func DialInProc(handler *Server) *Client { + initctx := context.Background() + c, _ := newClient(initctx, func(context.Context) (net.Conn, error) { + p1, p2 := net.Pipe() + go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions) + return p2, nil + }) + return c } diff --git a/rpc/ipc.go b/rpc/ipc.go index 05d8909ca..c2b9e3871 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -17,68 +17,39 @@ package rpc import ( - "encoding/json" "net" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" ) -// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe +// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on +// Windows this is a named pipe func CreateIPCListener(endpoint string) (net.Listener, error) { return ipcListen(endpoint) } -// ipcClient represent an IPC RPC client. It will connect to a given endpoint and tries to communicate with a node using -// JSON serialization. -type ipcClient struct { - endpoint string - conn net.Conn - out *json.Encoder - in *json.Decoder -} - -// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded. -// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a -// named pipe. -func NewIPCClient(endpoint string) (Client, error) { - conn, err := newIPCConnection(endpoint) - if err != nil { - return nil, err - } - return &ipcClient{endpoint: endpoint, conn: conn, in: json.NewDecoder(conn), out: json.NewEncoder(conn)}, nil -} - -// Send will serialize the given message and send it to the server. -// When sending the message fails it will try to reconnect once and send the message again. -func (client *ipcClient) Send(msg interface{}) error { - if err := client.out.Encode(msg); err == nil { - return nil - } - - // retry once - client.conn.Close() - - conn, err := newIPCConnection(client.endpoint) - if err != nil { - return err +// ServeListener accepts connections on l, serving JSON-RPC on them. +func (srv *Server) ServeListener(l net.Listener) error { + for { + conn, err := l.Accept() + if err != nil { + return err + } + glog.V(logger.Detail).Infoln("accepted conn", conn.RemoteAddr()) + go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions) } - - client.conn = conn - client.in = json.NewDecoder(conn) - client.out = json.NewEncoder(conn) - - return client.out.Encode(msg) -} - -// Recv will read a message from the connection and tries to parse it. It assumes the received message is JSON encoded. -func (client *ipcClient) Recv(msg interface{}) error { - return client.in.Decode(&msg) -} - -// Close will close the underlying IPC connection -func (client *ipcClient) Close() { - client.conn.Close() } -// SupportedModules will return the collection of offered RPC modules. -func (client *ipcClient) SupportedModules() (map[string]string, error) { - return SupportedModules(client) +// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes +// the endpoint is the full path to a unix socket, and Windows the endpoint is an +// identifier for a named pipe. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +func DialIPC(ctx context.Context, endpoint string) (*Client, error) { + return newClient(ctx, func(ctx context.Context) (net.Conn, error) { + return newIPCConnection(ctx, endpoint) + }) } diff --git a/rpc/ipc_unix.go b/rpc/ipc_unix.go index 9ece01240..a25b21627 100644 --- a/rpc/ipc_unix.go +++ b/rpc/ipc_unix.go @@ -22,6 +22,8 @@ import ( "net" "os" "path/filepath" + + "golang.org/x/net/context" ) // ipcListen will create a Unix socket on the given endpoint. @@ -40,6 +42,6 @@ func ipcListen(endpoint string) (net.Listener, error) { } // newIPCConnection will connect to a Unix socket on the given endpoint. -func newIPCConnection(endpoint string) (net.Conn, error) { - return net.DialUnix("unix", nil, &net.UnixAddr{Name: endpoint, Net: "unix"}) +func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { + return dialContext(ctx, "unix", endpoint) } diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go index 8342d04d5..68234d215 100644 --- a/rpc/ipc_windows.go +++ b/rpc/ipc_windows.go @@ -22,16 +22,27 @@ import ( "net" "time" + "golang.org/x/net/context" "gopkg.in/natefinch/npipe.v2" ) +// This is used if the dialing context has no deadline. It is much smaller than the +// defaultDialTimeout because named pipes are local and there is no need to wait so long. +const defaultPipeDialTimeout = 2 * time.Second + // ipcListen will create a named pipe on the given endpoint. func ipcListen(endpoint string) (net.Listener, error) { return npipe.Listen(endpoint) } // newIPCConnection will connect to a named pipe with the given endpoint as name. -func newIPCConnection(endpoint string) (net.Conn, error) { - timeout := 5 * time.Second +func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { + timeout := defaultPipeDialTimeout + if deadline, ok := ctx.Deadline(); ok { + timeout = deadline.Sub(time.Now()) + if timeout < 0 { + timeout = 0 + } + } return npipe.DialTimeout(endpoint, timeout) } diff --git a/rpc/json.go b/rpc/json.go index 151ed546e..a7053e3f5 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -30,49 +30,43 @@ import ( ) const ( - JSONRPCVersion = "2.0" + jsonrpcVersion = "2.0" serviceMethodSeparator = "_" subscribeMethod = "eth_subscribe" unsubscribeMethod = "eth_unsubscribe" notificationMethod = "eth_subscription" ) -// JSON-RPC request -type JSONRequest struct { +type jsonRequest struct { Method string `json:"method"` Version string `json:"jsonrpc"` Id json.RawMessage `json:"id,omitempty"` Payload json.RawMessage `json:"params,omitempty"` } -// JSON-RPC response -type JSONSuccessResponse struct { +type jsonSuccessResponse struct { Version string `json:"jsonrpc"` Id interface{} `json:"id,omitempty"` Result interface{} `json:"result"` } -// JSON-RPC error object -type JSONError struct { +type jsonError struct { Code int `json:"code"` Message string `json:"message"` Data interface{} `json:"data,omitempty"` } -// JSON-RPC error response -type JSONErrResponse struct { +type jsonErrResponse struct { Version string `json:"jsonrpc"` Id interface{} `json:"id,omitempty"` - Error JSONError `json:"error"` + Error jsonError `json:"error"` } -// JSON-RPC notification payload type jsonSubscription struct { Subscription string `json:"subscription"` Result interface{} `json:"result,omitempty"` } -// JSON-RPC notification type jsonNotification struct { Version string `json:"jsonrpc"` Method string `json:"method"` @@ -91,6 +85,17 @@ type jsonCodec struct { rw io.ReadWriteCloser // connection } +func (err *jsonError) Error() string { + if err.Message == "" { + return fmt.Sprintf("json-rpc error %d", err.Code) + } + return err.Message +} + +func (err *jsonError) ErrorCode() int { + return err.Code +} + // NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0 func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec { d := json.NewDecoder(rwc) @@ -113,7 +118,7 @@ func isBatch(msg json.RawMessage) bool { // ReadRequestHeaders will read new requests without parsing the arguments. It will // return a collection of requests, an indication if these requests are in batch // form or an error when the incoming message could not be read/parsed. -func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) { +func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) { c.decMu.Lock() defer c.decMu.Unlock() @@ -148,8 +153,8 @@ func checkReqId(reqId json.RawMessage) error { // parseRequest will parse a single request from the given RawMessage. It will return // the parsed request, an indication if the request was a batch or an error when // the request could not be parsed. -func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { - var in JSONRequest +func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) { + var in jsonRequest if err := json.Unmarshal(incomingMsg, &in); err != nil { return nil, false, &invalidMessageError{err.Error()} } @@ -182,12 +187,12 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { method: unsubscribeMethod, params: in.Payload}}, false, nil } - // regular RPC call elems := strings.Split(in.Method, serviceMethodSeparator) if len(elems) != 2 { return nil, false, &methodNotFoundError{in.Method, ""} } + // regular RPC call if len(in.Payload) == 0 { return []rpcRequest{rpcRequest{service: elems[0], method: elems[1], id: &in.Id}}, false, nil } @@ -197,8 +202,8 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { // parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication // if the request was a batch or an error when the request could not be read. -func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { - var in []JSONRequest +func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) { + var in []jsonRequest if err := json.Unmarshal(incomingMsg, &in); err != nil { return nil, false, &invalidMessageError{err.Error()} } @@ -236,15 +241,15 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro continue } - elems := strings.Split(r.Method, serviceMethodSeparator) - if len(elems) != 2 { - return nil, true, &methodNotFoundError{r.Method, ""} - } - if len(r.Payload) == 0 { - requests[i] = rpcRequest{service: elems[0], method: elems[1], id: id, params: nil} + requests[i] = rpcRequest{id: id, params: nil} + } else { + requests[i] = rpcRequest{id: id, params: r.Payload} + } + if elem := strings.Split(r.Method, serviceMethodSeparator); len(elem) == 2 { + requests[i].service, requests[i].method = elem[0], elem[1] } else { - requests[i] = rpcRequest{service: elems[0], method: elems[1], id: id, params: r.Payload} + requests[i].err = &methodNotFoundError{r.Method, ""} } } @@ -253,7 +258,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro // ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed // values or an error when the parsing failed. -func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) { +func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error) { if args, ok := params.(json.RawMessage); !ok { return nil, &invalidParamsError{"Invalid params supplied"} } else { @@ -264,7 +269,7 @@ func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interf // parsePositionalArguments tries to parse the given args to an array of values with the given types. // It returns the parsed values or an error when the args could not be parsed. Missing optional arguments // are returned as reflect.Zero values. -func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, RPCError) { +func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, Error) { params := make([]interface{}, 0, len(callbackArgs)) for _, t := range callbackArgs { params = append(params, reflect.New(t).Interface()) @@ -302,31 +307,31 @@ func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) // CreateResponse will create a JSON-RPC success response with the given id and reply as result. func (c *jsonCodec) CreateResponse(id interface{}, reply interface{}) interface{} { if isHexNum(reflect.TypeOf(reply)) { - return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)} + return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)} } - return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: reply} + return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: reply} } // CreateErrorResponse will create a JSON-RPC error response with the given id and error. -func (c *jsonCodec) CreateErrorResponse(id interface{}, err RPCError) interface{} { - return &JSONErrResponse{Version: JSONRPCVersion, Id: id, Error: JSONError{Code: err.Code(), Message: err.Error()}} +func (c *jsonCodec) CreateErrorResponse(id interface{}, err Error) interface{} { + return &jsonErrResponse{Version: jsonrpcVersion, Id: id, Error: jsonError{Code: err.ErrorCode(), Message: err.Error()}} } // CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error. // info is optional and contains additional information about the error. When an empty string is passed it is ignored. -func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} { - return &JSONErrResponse{Version: JSONRPCVersion, Id: id, - Error: JSONError{Code: err.Code(), Message: err.Error(), Data: info}} +func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} { + return &jsonErrResponse{Version: jsonrpcVersion, Id: id, + Error: jsonError{Code: err.ErrorCode(), Message: err.Error(), Data: info}} } // CreateNotification will create a JSON-RPC notification with the given subscription id and event as params. func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} { if isHexNum(reflect.TypeOf(event)) { - return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod, + return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod, Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}} } - return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod, + return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod, Params: jsonSubscription{Subscription: subid, Result: event}} } diff --git a/rpc/notification.go b/rpc/notification.go index e84e26a58..875433071 100644 --- a/rpc/notification.go +++ b/rpc/notification.go @@ -28,7 +28,7 @@ import ( var ( // ErrNotificationsUnsupported is returned when the connection doesn't support notifications - ErrNotificationsUnsupported = errors.New("notifications not supported") + ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport") // ErrNotificationNotFound is returned when the notification for the given id is not found ErrNotificationNotFound = errors.New("notification not found") diff --git a/rpc/notification_test.go b/rpc/notification_test.go index 1bcede177..280503222 100644 --- a/rpc/notification_test.go +++ b/rpc/notification_test.go @@ -19,20 +19,31 @@ package rpc import ( "encoding/json" "net" + "sync" "testing" "time" "golang.org/x/net/context" ) -type NotificationTestService struct{} +type NotificationTestService struct { + mu sync.Mutex + unsubscribed bool -var ( - unsubCallbackCalled = false -) + gotHangSubscriptionReq chan struct{} + unblockHangSubscription chan struct{} +} + +func (s *NotificationTestService) wasUnsubCallbackCalled() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.unsubscribed +} func (s *NotificationTestService) Unsubscribe(subid string) { - unsubCallbackCalled = true + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() } func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) { @@ -60,6 +71,26 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i return subscription, nil } +// HangSubscription blocks on s.unblockHangSubscription before +// sending anything. +func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) { + notifier, supported := NotifierFromContext(ctx) + if !supported { + return nil, ErrNotificationsUnsupported + } + + s.gotHangSubscriptionReq <- struct{}{} + <-s.unblockHangSubscription + subscription, err := notifier.NewSubscription(s.Unsubscribe) + if err != nil { + return nil, err + } + go func() { + subscription.Notify(val) + }() + return subscription, nil +} + func TestNotifications(t *testing.T) { server := NewServer() service := &NotificationTestService{} @@ -90,7 +121,7 @@ func TestNotifications(t *testing.T) { } var subid string - response := JSONSuccessResponse{Result: subid} + response := jsonSuccessResponse{Result: subid} if err := in.Decode(&response); err != nil { t.Fatal(err) } @@ -114,7 +145,7 @@ func TestNotifications(t *testing.T) { clientConn.Close() // causes notification unsubscribe callback to be called time.Sleep(1 * time.Second) - if !unsubCallbackCalled { + if !service.wasUnsubCallbackCalled() { t.Error("unsubscribe callback not called after closing connection") } } diff --git a/rpc/server.go b/rpc/server.go index 7b7d22063..040805a5c 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -21,7 +21,6 @@ import ( "reflect" "runtime" "sync/atomic" - "time" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -30,8 +29,6 @@ import ( ) const ( - stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped - notificationBufferSize = 10000 // max buffered notifications before codec is closed MetadataApi = "rpc" @@ -183,7 +180,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO for atomic.LoadInt32(&s.run) == 1 { reqs, batch, err := s.readRequest(codec) if err != nil { - glog.V(logger.Debug).Infof("%v\n", err) + glog.V(logger.Debug).Infof("read error %v\n", err) codec.Write(codec.CreateErrorResponse(nil, err)) return nil } @@ -240,13 +237,11 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) { func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { glog.V(logger.Debug).Infoln("RPC Server shutdown initiatied") - time.AfterFunc(stopPendingRequestTimeout, func() { - s.codecsMu.Lock() - defer s.codecsMu.Unlock() - s.codecs.Each(func(c interface{}) bool { - c.(ServerCodec).Close() - return true - }) + s.codecsMu.Lock() + defer s.codecsMu.Unlock() + s.codecs.Each(func(c interface{}) bool { + c.(ServerCodec).Close() + return true }) } } @@ -386,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s // readRequest requests the next (batch) request from the codec. It will return the collection // of requests, an indication if the request was a batch, the invalid request identifier and an // error when the request could not be read/parsed. -func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) { +func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) { reqs, batch, err := codec.ReadRequestHeaders() if err != nil { return nil, batch, err @@ -399,6 +394,11 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro var ok bool var svc *service + if r.err != nil { + requests[i] = &serverRequest{id: r.id, err: r.err} + continue + } + if r.isPubSub && r.method == unsubscribeMethod { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg diff --git a/rpc/server_test.go b/rpc/server_test.go index de47e1afd..e6840bde4 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -21,6 +21,7 @@ import ( "net" "reflect" "testing" + "time" "golang.org/x/net/context" ) @@ -48,6 +49,13 @@ func (s *Service) EchoWithCtx(ctx context.Context, str string, i int, args *Args return Result{str, i, args} } +func (s *Service) Sleep(ctx context.Context, duration time.Duration) { + select { + case <-time.After(duration): + case <-ctx.Done(): + } +} + func (s *Service) Rets() (string, error) { return "", nil } @@ -85,8 +93,8 @@ func TestServerRegisterName(t *testing.T) { t.Fatalf("Expected service calc to be registered") } - if len(svc.callbacks) != 4 { - t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks)) + if len(svc.callbacks) != 5 { + t.Errorf("Expected 5 callbacks for service 'calc', got %d", len(svc.callbacks)) } if len(svc.subscriptions) != 1 { @@ -126,7 +134,7 @@ func testServerMethodExecution(t *testing.T, method string) { t.Fatal(err) } - response := JSONSuccessResponse{Result: &Result{}} + response := jsonSuccessResponse{Result: &Result{}} if err := in.Decode(&response); err != nil { t.Fatal(err) } diff --git a/rpc/types.go b/rpc/types.go index a1f36fbd2..2a7268ad8 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -62,7 +62,7 @@ type serverRequest struct { callb *callback args []reflect.Value isUnsubscribe bool - err RPCError + err Error } type serviceRegistry map[string]*service // collection of services @@ -88,14 +88,13 @@ type rpcRequest struct { id interface{} isPubSub bool params interface{} + err Error // invalid batch element } -// RPCError implements RPC error, is add support for error codec over regular go errors -type RPCError interface { - // RPC error code - Code() int - // Error message - Error() string +// Error wraps RPC errors, which contain an error code in addition to the message. +type Error interface { + Error() string // returns the message + ErrorCode() int // returns the code } // ServerCodec implements reading, parsing and writing RPC messages for the server side of @@ -103,15 +102,15 @@ type RPCError interface { // multiple go-routines concurrently. type ServerCodec interface { // Read next request - ReadRequestHeaders() ([]rpcRequest, bool, RPCError) + ReadRequestHeaders() ([]rpcRequest, bool, Error) // Parse request argument to the given types - ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError) + ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error) // Assemble success response, expects response id and payload CreateResponse(interface{}, interface{}) interface{} // Assemble error response, expects response id and error - CreateErrorResponse(interface{}, RPCError) interface{} + CreateErrorResponse(interface{}, Error) interface{} // Assemble error response with extra information about the error through info - CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} + CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} // Create notification response CreateNotification(string, interface{}) interface{} // Write msg to client. @@ -273,14 +272,3 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error { func (bn *BlockNumber) Int64() int64 { return (int64)(*bn) } - -// Client defines the interface for go client that wants to connect to a geth RPC endpoint -type Client interface { - // SupportedModules returns the collection of API's the server offers - SupportedModules() (map[string]string, error) - - Send(req interface{}) error - Recv(msg interface{}) error - - Close() -} diff --git a/rpc/utils.go b/rpc/utils.go index fe482e19d..1ac6698f5 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -20,7 +20,6 @@ import ( "crypto/rand" "encoding/hex" "errors" - "fmt" "math/big" "reflect" "unicode" @@ -227,31 +226,3 @@ func newSubscriptionID() (string, error) { } return "0x" + hex.EncodeToString(subid[:]), nil } - -// SupportedModules returns the collection of API's that the RPC server offers -// on which the given client connects. -func SupportedModules(client Client) (map[string]string, error) { - req := JSONRequest{ - Id: []byte("1"), - Version: "2.0", - Method: MetadataApi + "_modules", - } - if err := client.Send(req); err != nil { - return nil, err - } - - var response JSONSuccessResponse - if err := client.Recv(&response); err != nil { - return nil, err - } - if response.Result != nil { - mods := make(map[string]string) - if modules, ok := response.Result.(map[string]interface{}); ok { - for m, v := range modules { - mods[m] = fmt.Sprintf("%s", v) - } - return mods, nil - } - } - return nil, fmt.Errorf("unable to retrieve modules") -} diff --git a/rpc/websocket.go b/rpc/websocket.go index fe9354d94..fc3cd0709 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -17,36 +17,39 @@ package rpc import ( + "crypto/tls" "fmt" + "net" "net/http" + "net/url" "os" "strings" - "sync" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" "golang.org/x/net/websocket" "gopkg.in/fatih/set.v0" ) -// wsReaderWriterCloser reads and write payloads from and to a websocket connection. -type wsReaderWriterCloser struct { - c *websocket.Conn -} - -// Read will read incoming payload data into p. -func (rw *wsReaderWriterCloser) Read(p []byte) (int, error) { - return rw.c.Read(p) -} - -// Write writes p to the websocket. -func (rw *wsReaderWriterCloser) Write(p []byte) (int, error) { - return rw.c.Write(p) +// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. +// +// allowedOrigins should be a comma-separated list of allowed origin URLs. +// To allow connections with any origin, pass "*". +func (srv *Server) WebsocketHandler(allowedOrigins string) http.Handler { + return websocket.Server{ + Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")), + Handler: func(conn *websocket.Conn) { + srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions) + }, + } } -// Close closes the websocket connection. -func (rw *wsReaderWriterCloser) Close() error { - return rw.c.Close() +// NewWSServer creates a new websocket RPC server around an API provider. +// +// Deprecated: use Server.WebsocketHandler +func NewWSServer(allowedOrigins string, srv *Server) *http.Server { + return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)} } // wsHandshakeValidator returns a handler that verifies the origin during the @@ -87,96 +90,63 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http return f } -// NewWSServer creates a new websocket RPC server around an API provider. -func NewWSServer(allowedOrigins string, handler *Server) *http.Server { - return &http.Server{ - Handler: websocket.Server{ - Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")), - Handler: func(conn *websocket.Conn) { - handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}), - OptionMethodInvocation|OptionSubscriptions) - }, - }, +// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server +// that is listening on the given endpoint. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { + if origin == "" { + var err error + if origin, err = os.Hostname(); err != nil { + return nil, err + } + if strings.HasPrefix(endpoint, "wss") { + origin = "https://" + strings.ToLower(origin) + } else { + origin = "http://" + strings.ToLower(origin) + } + } + config, err := websocket.NewConfig(endpoint, origin) + if err != nil { + return nil, err } -} - -// wsClient represents a RPC client that communicates over websockets with a -// RPC server. -type wsClient struct { - endpoint string - connMu sync.Mutex - conn *websocket.Conn -} -// NewWSClientj creates a new RPC client that communicates with a RPC server -// that is listening on the given endpoint using JSON encoding. -func NewWSClient(endpoint string) (Client, error) { - return &wsClient{endpoint: endpoint}, nil + return newClient(ctx, func(ctx context.Context) (net.Conn, error) { + return wsDialContext(ctx, config) + }) } -// connection will return a websocket connection to the RPC server. It will -// (re)connect when necessary. -func (client *wsClient) connection() (*websocket.Conn, error) { - if client.conn != nil { - return client.conn, nil +func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) { + var conn net.Conn + var err error + switch config.Location.Scheme { + case "ws": + conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location)) + case "wss": + dialer := contextDialer(ctx) + conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig) + default: + err = websocket.ErrBadScheme } - - origin, err := os.Hostname() if err != nil { return nil, err } - - origin = "http://" + origin - client.conn, err = websocket.Dial(client.endpoint, "", origin) - - return client.conn, err -} - -// SupportedModules is the collection of modules the RPC server offers. -func (client *wsClient) SupportedModules() (map[string]string, error) { - return SupportedModules(client) -} - -// Send writes the JSON serialized msg to the websocket. It will create a new -// websocket connection to the server if the client is currently not connected. -func (client *wsClient) Send(msg interface{}) (err error) { - client.connMu.Lock() - defer client.connMu.Unlock() - - var conn *websocket.Conn - if conn, err = client.connection(); err == nil { - if err = websocket.JSON.Send(conn, msg); err != nil { - client.conn.Close() - client.conn = nil - } + ws, err := websocket.NewClient(config, conn) + if err != nil { + conn.Close() + return nil, err } - - return err + return ws, err } -// Recv reads a JSON message from the websocket and unmarshals it into msg. -func (client *wsClient) Recv(msg interface{}) (err error) { - client.connMu.Lock() - defer client.connMu.Unlock() +var wsPortMap = map[string]string{"ws": "80", "wss": "443"} - var conn *websocket.Conn - if conn, err = client.connection(); err == nil { - if err = websocket.JSON.Receive(conn, msg); err != nil { - client.conn.Close() - client.conn = nil +func wsDialAddress(location *url.URL) string { + if _, ok := wsPortMap[location.Scheme]; ok { + if _, _, err := net.SplitHostPort(location.Host); err != nil { + return net.JoinHostPort(location.Host, wsPortMap[location.Scheme]) } } - return -} - -// Close closes the underlaying websocket connection. -func (client *wsClient) Close() { - client.connMu.Lock() - defer client.connMu.Unlock() - - if client.conn != nil { - client.conn.Close() - client.conn = nil - } - + return location.Host } |