From df75dbfd6804923b1c8a8388b67523072d59f155 Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Tue, 9 Feb 2016 14:10:40 +0200 Subject: cmd, node, rpc: readd inproc RPC client, expose via node --- rpc/http.go | 2 +- rpc/inproc.go | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++ rpc/ipc.go | 2 +- rpc/ipc_windows.go | 6 --- rpc/websocket.go | 2 +- 5 files changed, 114 insertions(+), 9 deletions(-) create mode 100644 rpc/inproc.go (limited to 'rpc') diff --git a/rpc/http.go b/rpc/http.go index e58a88c08..d9053b003 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -259,7 +259,7 @@ type httpClient struct { // NewHTTPClient create a new RPC clients that connection to a geth RPC server // over HTTP. -func NewHTTPClient(endpoint string) (*httpClient, error) { +func NewHTTPClient(endpoint string) (Client, error) { url, err := url.Parse(endpoint) if err != nil { return nil, err diff --git a/rpc/inproc.go b/rpc/inproc.go new file mode 100644 index 000000000..e138ba2c3 --- /dev/null +++ b/rpc/inproc.go @@ -0,0 +1,111 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import "encoding/json" + +// NewInProcRPCClient creates an in-process buffer stream attachment to a given +// RPC server. +func NewInProcRPCClient(handler *Server) Client { + buffer := &inprocBuffer{ + requests: make(chan []byte, 16), + responses: make(chan []byte, 16), + } + client := &inProcClient{ + server: handler, + buffer: buffer, + } + go handler.ServeCodec(NewJSONCodec(client.buffer)) + return client +} + +// inProcClient is an in-process buffer stream attached to an RPC server. +type inProcClient struct { + server *Server + buffer *inprocBuffer +} + +// Close tears down the request channel of the in-proc client. +func (c *inProcClient) Close() { + c.buffer.Close() +} + +// Send marshals a message into a json format and injects in into the client +// request channel. +func (c *inProcClient) Send(msg interface{}) error { + d, err := json.Marshal(msg) + if err != nil { + return err + } + c.buffer.requests <- d + return nil +} + +// Recv reads a message from the response channel and tries to parse it into the +// given msg interface. +func (c *inProcClient) Recv(msg interface{}) error { + data := <-c.buffer.responses + return json.Unmarshal(data, &msg) +} + +// Returns the collection of modules the RPC server offers. +func (c *inProcClient) SupportedModules() (map[string]string, error) { + return SupportedModules(c) +} + +// inprocBuffer represents the connection between the RPC server and console +type inprocBuffer struct { + readBuf []byte // store remaining request bytes after a partial read + requests chan []byte // list with raw serialized requests + responses chan []byte // list with raw serialized responses +} + +// Read will read the next request in json format. +func (b *inprocBuffer) Read(p []byte) (int, error) { + // last read didn't read entire request, return remaining bytes + if len(b.readBuf) > 0 { + n := copy(p, b.readBuf) + if n < len(b.readBuf) { + b.readBuf = b.readBuf[:n] + } else { + b.readBuf = b.readBuf[:0] + } + return n, nil + } + // read next request + req := <-b.requests + n := copy(p, req) + if n < len(req) { + // inprocBuffer too small, store remaining chunk for next read + b.readBuf = req[n:] + } + return n, nil +} + +// Write sends the given buffer to the backend. +func (b *inprocBuffer) Write(p []byte) (n int, err error) { + b.responses <- p + return len(p), nil +} + +// Close cleans up obtained resources. +func (b *inprocBuffer) Close() error { + close(b.requests) + close(b.responses) + + return nil +} diff --git a/rpc/ipc.go b/rpc/ipc.go index b87bfcbd7..05d8909ca 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -38,7 +38,7 @@ type ipcClient struct { // NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded. // On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a // named pipe. -func NewIPCClient(endpoint string) (*ipcClient, error) { +func NewIPCClient(endpoint string) (Client, error) { conn, err := newIPCConnection(endpoint) if err != nil { return nil, err diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go index 09b01974e..1d4672ad2 100644 --- a/rpc/ipc_windows.go +++ b/rpc/ipc_windows.go @@ -239,9 +239,6 @@ func Dial(address string) (*PipeConn, error) { for { conn, err := dial(address, nmpwait_wait_forever) if err == nil { - // Ugly hack working around some async connectivity issues - time.Sleep(100 * time.Millisecond) - return conn, nil } if isPipeNotReady(err) { @@ -363,9 +360,6 @@ func Listen(address string) (*PipeListener, error) { if err != nil { return nil, err } - // Ugly hack working around some async connectivity issues - time.Sleep(100 * time.Millisecond) - return &PipeListener{ addr: PipeAddr(address), handle: handle, diff --git a/rpc/websocket.go b/rpc/websocket.go index 548847602..92615494e 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -109,7 +109,7 @@ type wsClient struct { // NewWSClientj creates a new RPC client that communicates with a RPC server // that is listening on the given endpoint using JSON encoding. -func NewWSClient(endpoint string) (*wsClient, error) { +func NewWSClient(endpoint string) (Client, error) { return &wsClient{endpoint: endpoint}, nil } -- cgit