diff options
-rw-r--r-- | cmd/geth/js_test.go | 6 | ||||
-rw-r--r-- | cmd/geth/main.go | 4 | ||||
-rw-r--r-- | cmd/utils/client.go | 1 | ||||
-rw-r--r-- | node/api.go | 2 | ||||
-rw-r--r-- | node/node.go | 55 | ||||
-rw-r--r-- | node/node_test.go | 12 | ||||
-rw-r--r-- | rpc/http.go | 2 | ||||
-rw-r--r-- | rpc/inproc.go | 111 | ||||
-rw-r--r-- | rpc/ipc.go | 2 | ||||
-rw-r--r-- | rpc/ipc_windows.go | 6 | ||||
-rw-r--r-- | rpc/websocket.go | 2 |
11 files changed, 178 insertions, 25 deletions
diff --git a/cmd/geth/js_test.go b/cmd/geth/js_test.go index ed6e5b319..4330b484c 100644 --- a/cmd/geth/js_test.go +++ b/cmd/geth/js_test.go @@ -20,7 +20,6 @@ import ( "fmt" "io/ioutil" "math/big" - "math/rand" "os" "path/filepath" "regexp" @@ -30,7 +29,6 @@ import ( "time" "github.com/ethereum/go-ethereum/accounts" - "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/compiler" "github.com/ethereum/go-ethereum/common/httpclient" @@ -96,7 +94,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod t.Fatal(err) } // Create a networkless protocol stack - stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true, IPCPath: fmt.Sprintf("geth-test-%d.ipc", rand.Int63())}) + stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true}) if err != nil { t.Fatalf("failed to create node: %v", err) } @@ -142,7 +140,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod stack.Service(ðereum) assetPath := filepath.Join(os.Getenv("GOPATH"), "src", "github.com", "ethereum", "go-ethereum", "cmd", "mist", "assets", "ext") - client, err := utils.NewRemoteRPCClientFromString("ipc:" + stack.IPCEndpoint()) + client, err := stack.Attach() if err != nil { t.Fatalf("failed to attach to node: %v", err) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 5c07be3f6..8594d18c5 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -425,7 +425,7 @@ func console(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node, and either execute script or become interactive - client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IPCEndpoint()) + client, err := node.Attach() if err != nil { utils.Fatalf("Failed to attach to the inproc geth: %v", err) } @@ -451,7 +451,7 @@ func execScripts(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node and execute the given scripts - client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IPCEndpoint()) + client, err := node.Attach() if err != nil { utils.Fatalf("Failed to attach to the inproc geth: %v", err) } diff --git a/cmd/utils/client.go b/cmd/utils/client.go index 1144af6f6..3913d007b 100644 --- a/cmd/utils/client.go +++ b/cmd/utils/client.go @@ -51,6 +51,5 @@ func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) { if strings.HasPrefix(endpoint, "ws:") { return rpc.NewWSClient(endpoint) } - return nil, fmt.Errorf("invalid endpoint") } diff --git a/node/api.go b/node/api.go index 879b33816..48cbd0150 100644 --- a/node/api.go +++ b/node/api.go @@ -89,7 +89,7 @@ func (api *PrivateAdminAPI) StartWS(host string, port int, cors string, apis str defer api.node.lock.Unlock() if api.node.wsHandler != nil { - return false, fmt.Errorf("WebSocker RPC already running on %s", api.node.wsEndpoint) + return false, fmt.Errorf("WebSocket RPC already running on %s", api.node.wsEndpoint) } if err := api.node.startWS(fmt.Sprintf("%s:%d", host, port), api.node.rpcAPIs, strings.Split(apis, ","), cors); err != nil { return false, err diff --git a/node/node.go b/node/node.go index 6d9290034..7d3a10874 100644 --- a/node/node.go +++ b/node/node.go @@ -55,7 +55,9 @@ type Node struct { serviceFuncs []ServiceConstructor // Service constructors (in dependency order) services map[reflect.Type]Service // Currently running services - rpcAPIs []rpc.API // List of APIs currently provided by the node + rpcAPIs []rpc.API // List of APIs currently provided by the node + inprocHandler *rpc.Server // In-process RPC request handler to process the API requests + ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled) ipcListener net.Listener // IPC RPC listener socket to serve API requests ipcHandler *rpc.Server // IPC RPC request handler to process the API requests @@ -217,16 +219,22 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { apis = append(apis, service.APIs()...) } // Start the various API endpoints, terminating all in case of errors + if err := n.startInProc(apis); err != nil { + return err + } if err := n.startIPC(apis); err != nil { + n.stopInProc() return err } if err := n.startHTTP(n.httpEndpoint, apis, n.httpWhitelist, n.httpCors); err != nil { n.stopIPC() + n.stopInProc() return err } if err := n.startWS(n.wsEndpoint, apis, n.wsWhitelist, n.wsDomains); err != nil { n.stopHTTP() n.stopIPC() + n.stopInProc() return err } // All API endpoints started successfully @@ -234,6 +242,28 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { return nil } +// startInProc initializes an in-process RPC endpoint. +func (n *Node) startInProc(apis []rpc.API) error { + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + glog.V(logger.Debug).Infof("InProc registered %T under '%s'", api.Service, api.Namespace) + } + n.inprocHandler = handler + return nil +} + +// stopInProc terminates the in-process RPC endpoint. +func (n *Node) stopInProc() { + if n.inprocHandler != nil { + n.inprocHandler.Stop() + n.inprocHandler = nil + } +} + // startIPC initializes and starts the IPC RPC endpoint. func (n *Node) startIPC(apis []rpc.API) error { // Short circuit if the IPC endpoint isn't being exposed @@ -468,6 +498,19 @@ func (n *Node) Restart() error { return nil } +// Attach creates an RPC client attached to an in-process API handler. +func (n *Node) Attach() (rpc.Client, error) { + n.lock.RLock() + defer n.lock.RUnlock() + + // Short circuit if the node's not running + if n.server == nil { + return nil, ErrNodeStopped + } + // Otherwise attach to the API and return + return rpc.NewInProcRPCClient(n.inprocHandler), nil +} + // Server retrieves the currently running P2P network layer. This method is meant // only to inspect fields of the currently running server, life cycle management // should be left to this Node entity. @@ -506,6 +549,16 @@ func (n *Node) IPCEndpoint() string { return n.ipcEndpoint } +// HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack. +func (n *Node) HTTPEndpoint() string { + return n.httpEndpoint +} + +// WSEndpoint retrieves the current WS endpoint used by the protocol stack. +func (n *Node) WSEndpoint() string { + return n.wsEndpoint +} + // EventMux retrieves the event multiplexer used by all the network services in // the current protocol stack. func (n *Node) EventMux() *event.TypeMux { diff --git a/node/node_test.go b/node/node_test.go index 38bfe27e2..532115d3c 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -18,9 +18,7 @@ package node import ( "errors" - "fmt" "io/ioutil" - "math/rand" "os" "reflect" "testing" @@ -37,7 +35,6 @@ var ( func testNodeConfig() *Config { return &Config{ - IPCPath: fmt.Sprintf("test-%d.ipc", rand.Int63()), PrivateKey: testNodeKey, Name: "test node", } @@ -541,10 +538,11 @@ func TestAPIGather(t *testing.T) { defer stack.Stop() // Connect to the RPC server and verify the various registered endpoints - ipcClient, err := rpc.NewIPCClient(stack.IPCEndpoint()) + client, err := stack.Attach() if err != nil { - t.Fatalf("failed to connect to the IPC API server: %v", err) + t.Fatalf("failed to connect to the inproc API server: %v", err) } + defer client.Close() tests := []struct { Method string @@ -556,11 +554,11 @@ func TestAPIGather(t *testing.T) { {"multi.v2.nested_theOneMethod", "multi.v2.nested"}, } for i, test := range tests { - if err := ipcClient.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil { + if err := client.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil { t.Fatalf("test %d: failed to send API request: %v", i, err) } reply := new(rpc.JSONSuccessResponse) - if err := ipcClient.Recv(reply); err != nil { + if err := client.Recv(reply); err != nil { t.Fatalf("test %d: failed to read API reply: %v", i, err) } select { diff --git a/rpc/http.go b/rpc/http.go index e58a88c08..d9053b003 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -259,7 +259,7 @@ type httpClient struct { // NewHTTPClient create a new RPC clients that connection to a geth RPC server // over HTTP. -func NewHTTPClient(endpoint string) (*httpClient, error) { +func NewHTTPClient(endpoint string) (Client, error) { url, err := url.Parse(endpoint) if err != nil { return nil, err diff --git a/rpc/inproc.go b/rpc/inproc.go new file mode 100644 index 000000000..e138ba2c3 --- /dev/null +++ b/rpc/inproc.go @@ -0,0 +1,111 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rpc + +import "encoding/json" + +// NewInProcRPCClient creates an in-process buffer stream attachment to a given +// RPC server. +func NewInProcRPCClient(handler *Server) Client { + buffer := &inprocBuffer{ + requests: make(chan []byte, 16), + responses: make(chan []byte, 16), + } + client := &inProcClient{ + server: handler, + buffer: buffer, + } + go handler.ServeCodec(NewJSONCodec(client.buffer)) + return client +} + +// inProcClient is an in-process buffer stream attached to an RPC server. +type inProcClient struct { + server *Server + buffer *inprocBuffer +} + +// Close tears down the request channel of the in-proc client. +func (c *inProcClient) Close() { + c.buffer.Close() +} + +// Send marshals a message into a json format and injects in into the client +// request channel. +func (c *inProcClient) Send(msg interface{}) error { + d, err := json.Marshal(msg) + if err != nil { + return err + } + c.buffer.requests <- d + return nil +} + +// Recv reads a message from the response channel and tries to parse it into the +// given msg interface. +func (c *inProcClient) Recv(msg interface{}) error { + data := <-c.buffer.responses + return json.Unmarshal(data, &msg) +} + +// Returns the collection of modules the RPC server offers. +func (c *inProcClient) SupportedModules() (map[string]string, error) { + return SupportedModules(c) +} + +// inprocBuffer represents the connection between the RPC server and console +type inprocBuffer struct { + readBuf []byte // store remaining request bytes after a partial read + requests chan []byte // list with raw serialized requests + responses chan []byte // list with raw serialized responses +} + +// Read will read the next request in json format. +func (b *inprocBuffer) Read(p []byte) (int, error) { + // last read didn't read entire request, return remaining bytes + if len(b.readBuf) > 0 { + n := copy(p, b.readBuf) + if n < len(b.readBuf) { + b.readBuf = b.readBuf[:n] + } else { + b.readBuf = b.readBuf[:0] + } + return n, nil + } + // read next request + req := <-b.requests + n := copy(p, req) + if n < len(req) { + // inprocBuffer too small, store remaining chunk for next read + b.readBuf = req[n:] + } + return n, nil +} + +// Write sends the given buffer to the backend. +func (b *inprocBuffer) Write(p []byte) (n int, err error) { + b.responses <- p + return len(p), nil +} + +// Close cleans up obtained resources. +func (b *inprocBuffer) Close() error { + close(b.requests) + close(b.responses) + + return nil +} diff --git a/rpc/ipc.go b/rpc/ipc.go index b87bfcbd7..05d8909ca 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -38,7 +38,7 @@ type ipcClient struct { // NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded. // On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a // named pipe. -func NewIPCClient(endpoint string) (*ipcClient, error) { +func NewIPCClient(endpoint string) (Client, error) { conn, err := newIPCConnection(endpoint) if err != nil { return nil, err diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go index 09b01974e..1d4672ad2 100644 --- a/rpc/ipc_windows.go +++ b/rpc/ipc_windows.go @@ -239,9 +239,6 @@ func Dial(address string) (*PipeConn, error) { for { conn, err := dial(address, nmpwait_wait_forever) if err == nil { - // Ugly hack working around some async connectivity issues - time.Sleep(100 * time.Millisecond) - return conn, nil } if isPipeNotReady(err) { @@ -363,9 +360,6 @@ func Listen(address string) (*PipeListener, error) { if err != nil { return nil, err } - // Ugly hack working around some async connectivity issues - time.Sleep(100 * time.Millisecond) - return &PipeListener{ addr: PipeAddr(address), handle: handle, diff --git a/rpc/websocket.go b/rpc/websocket.go index 548847602..92615494e 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -109,7 +109,7 @@ type wsClient struct { // NewWSClientj creates a new RPC client that communicates with a RPC server // that is listening on the given endpoint using JSON encoding. -func NewWSClient(endpoint string) (*wsClient, error) { +func NewWSClient(endpoint string) (Client, error) { return &wsClient{endpoint: endpoint}, nil } |