diff options
author | Péter Szilágyi <peterke@gmail.com> | 2016-02-05 21:08:48 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2016-02-05 22:53:47 +0800 |
commit | 7486904b92449c5955bb682f4ff98752906912b8 (patch) | |
tree | 87d1119581754ba411396c0c698fdab4bd65b253 /node | |
parent | a13bc9d7a1bc96fab93ace40045c0f0fea4da836 (diff) | |
download | dexon-7486904b92449c5955bb682f4ff98752906912b8.tar.gz dexon-7486904b92449c5955bb682f4ff98752906912b8.tar.zst dexon-7486904b92449c5955bb682f4ff98752906912b8.zip |
cmd, node, rpc: move websockets into node, break singleton
Diffstat (limited to 'node')
-rw-r--r-- | node/api.go | 49 | ||||
-rw-r--r-- | node/config.go | 34 | ||||
-rw-r--r-- | node/node.go | 85 |
3 files changed, 122 insertions, 46 deletions
diff --git a/node/api.go b/node/api.go index 1b185c6f1..879b33816 100644 --- a/node/api.go +++ b/node/api.go @@ -25,9 +25,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/rpc" "github.com/rcrowley/go-metrics" - "gopkg.in/fatih/set.v0" ) // PrivateAdminAPI is the collection of administrative API methods exposed only @@ -86,44 +84,29 @@ func (api *PrivateAdminAPI) StopRPC() (bool, error) { } // StartWS starts the websocket RPC API server. -func (api *PrivateAdminAPI) StartWS(address string, port int, cors string, apis string) (bool, error) { - var offeredAPIs []rpc.API - if len(apis) > 0 { - namespaces := set.New() - for _, a := range strings.Split(apis, ",") { - namespaces.Add(strings.TrimSpace(a)) - } - for _, api := range api.node.APIs() { - if namespaces.Has(api.Namespace) { - offeredAPIs = append(offeredAPIs, api) - } - } - } else { - // use by default all public API's - for _, api := range api.node.APIs() { - if api.Public { - offeredAPIs = append(offeredAPIs, api) - } - } - } +func (api *PrivateAdminAPI) StartWS(host string, port int, cors string, apis string) (bool, error) { + api.node.lock.Lock() + defer api.node.lock.Unlock() - if address == "" { - address = "127.0.0.1" + if api.node.wsHandler != nil { + return false, fmt.Errorf("WebSocker RPC already running on %s", api.node.wsEndpoint) } - if port == 0 { - port = 8546 + if err := api.node.startWS(fmt.Sprintf("%s:%d", host, port), api.node.rpcAPIs, strings.Split(apis, ","), cors); err != nil { + return false, err } - - corsDomains := strings.Split(cors, " ") - - err := rpc.StartWS(address, port, corsDomains, offeredAPIs) - return err == nil, err + return true, nil } // StopRPC terminates an already running websocket RPC API endpoint. func (api *PrivateAdminAPI) StopWS() (bool, error) { - err := rpc.StopWS() - return err == nil, err + api.node.lock.Lock() + defer api.node.lock.Unlock() + + if api.node.wsHandler == nil { + return false, fmt.Errorf("WebSocket RPC not running") + } + api.node.stopWS() + return true, nil } // PublicAdminAPI is the collection of administrative API methods exposed over diff --git a/node/config.go b/node/config.go index 94c6e2e56..f8252b63a 100644 --- a/node/config.go +++ b/node/config.go @@ -117,6 +117,25 @@ type Config struct { // If the module list is empty, all RPC API endpoints designated public will be // exposed. HttpModules []string + + // WsHost is the host interface on which to start the websocket RPC server. If + // this field is empty, no websocket API endpoint will be started. + WsHost string + + // WsPort is the TCP port number on which to start the websocket RPC server. The + // default zero value is/ valid and will pick a port number randomly (useful for + // ephemeral nodes). + WsPort int + + // WsCors is the Cross-Origin Resource Sharing header to send to requesting clients. + // Please be aware that CORS is a browser enforced security, it's fully useless + // for custom websocket clients. + WsCors string + + // WsModules is a list of API modules to expose via the websocket RPC interface. + // If the module list is empty, all RPC API endpoints designated public will be + // exposed. + WsModules []string } // IpcEndpoint resolves an IPC endpoint based on a configured value, taking into @@ -165,6 +184,21 @@ func DefaultHttpEndpoint() string { return config.HttpEndpoint() } +// WsEndpoint resolves an websocket endpoint based on the configured host interface +// and port parameters. +func (c *Config) WsEndpoint() string { + if c.WsHost == "" { + return "" + } + return fmt.Sprintf("%s:%d", c.WsHost, c.WsPort) +} + +// DefaultWsEndpoint returns the websocket endpoint used by default. +func DefaultWsEndpoint() string { + config := &Config{WsHost: common.DefaultWsHost, WsPort: common.DefaultWsPort} + return config.WsEndpoint() +} + // NodeKey retrieves the currently configured private key of the node, checking // first any manually set key, falling back to the one found in the configured // data folder. If no key can be found, a new one is generated. diff --git a/node/node.go b/node/node.go index 44c88d378..804748b6b 100644 --- a/node/node.go +++ b/node/node.go @@ -66,6 +66,12 @@ type Node struct { httpListener net.Listener // HTTP RPC listener socket to server API requests httpHandler *rpc.Server // HTTP RPC request handler to process the API requests + wsEndpoint string // Websocket endpoint (interface + port) to listen at (empty = websocket disabled) + wsWhitelist []string // Websocket RPC modules to allow through this endpoint + wsCors string // Websocket RPC Cross-Origin Resource Sharing header + wsListener net.Listener // Websocket RPC listener socket to server API requests + wsHandler *rpc.Server // Websocket RPC request handler to process the API requests + stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex } @@ -105,6 +111,9 @@ func New(conf *Config) (*Node, error) { httpEndpoint: conf.HttpEndpoint(), httpWhitelist: conf.HttpModules, httpCors: conf.HttpCors, + wsEndpoint: conf.WsEndpoint(), + wsWhitelist: conf.WsModules, + wsCors: conf.WsCors, eventmux: new(event.TypeMux), }, nil } @@ -215,6 +224,11 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { n.stopIPC() return err } + if err := n.startWS(n.wsEndpoint, apis, n.wsWhitelist, n.wsCors); err != nil { + n.stopHTTP() + n.stopIPC() + return err + } // All API endpoints started successfully n.rpcAPIs = apis return nil @@ -285,7 +299,7 @@ func (n *Node) stopIPC() { // startHTTP initializes and starts the HTTP RPC endpoint. func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors string) error { - // Short circuit if the IPC endpoint isn't being exposed + // Short circuit if the HTTP endpoint isn't being exposed if endpoint == "" { return nil } @@ -338,6 +352,61 @@ func (n *Node) stopHTTP() { } } +// startWS initializes and starts the websocket RPC endpoint. +func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, cors string) error { + // Short circuit if the WS endpoint isn't being exposed + if endpoint == "" { + return nil + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + glog.V(logger.Debug).Infof("WebSocket registered %T under '%s'", api.Service, api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return err + } + go rpc.NewWSServer(cors, handler).Serve(listener) + glog.V(logger.Info).Infof("WebSocket endpoint opened: ws://%s", endpoint) + + // All listeners booted successfully + n.wsEndpoint = endpoint + n.wsListener = listener + n.wsHandler = handler + n.wsCors = cors + + return nil +} + +// stopWS terminates the websocket RPC endpoint. +func (n *Node) stopWS() { + if n.wsListener != nil { + n.wsListener.Close() + n.wsListener = nil + + glog.V(logger.Info).Infof("WebSocket endpoint closed: ws://%s", n.wsEndpoint) + } + if n.wsHandler != nil { + n.wsHandler.Stop() + n.wsHandler = nil + } +} + // Stop terminates a running node along with all it's services. In the node was // not started, an error is returned. func (n *Node) Stop() error { @@ -349,8 +418,9 @@ func (n *Node) Stop() error { return ErrNodeStopped } // Otherwise terminate the API, all services and the P2P server too - n.stopIPC() + n.stopWS() n.stopHTTP() + n.stopIPC() n.rpcAPIs = nil failure := &StopError{ @@ -471,14 +541,3 @@ func (n *Node) apis() []rpc.API { }, } } - -// APIs returns the collection of RPC descriptor this node offers. This method -// is just a quick placeholder passthrough for the RPC update, which in the next -// step will be fully integrated into the node itself. -func (n *Node) APIs() []rpc.API { - apis := n.apis() - for _, api := range n.services { - apis = append(apis, api.APIs()...) - } - return apis -} |