aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/simulations/adapters/exec.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/simulations/adapters/exec.go')
-rw-r--r--p2p/simulations/adapters/exec.go225
1 files changed, 127 insertions, 98 deletions
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go
index dc7d277ca..abb196717 100644
--- a/p2p/simulations/adapters/exec.go
+++ b/p2p/simulations/adapters/exec.go
@@ -17,7 +17,7 @@
package adapters
import (
- "bufio"
+ "bytes"
"context"
"crypto/ecdsa"
"encoding/json"
@@ -25,6 +25,7 @@ import (
"fmt"
"io"
"net"
+ "net/http"
"os"
"os/exec"
"os/signal"
@@ -43,12 +44,14 @@ import (
"golang.org/x/net/websocket"
)
-// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the
-// current binary as a child process.
-//
-// An init hook is used so that the child process executes the node services
-// (rather than whataver the main() function would normally do), see the
-// execP2PNode function for more information.
+func init() {
+ // Register a reexec function to start a simulation node when the current binary is
+ // executed as "p2p-node" (rather than whataver the main() function would normally do).
+ reexec.Register("p2p-node", execP2PNode)
+}
+
+// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the current binary
+// as a child process.
type ExecAdapter struct {
// BaseDir is the directory under which the data directories for each
// simulation node are created.
@@ -150,15 +153,13 @@ func (n *ExecNode) Client() (*rpc.Client, error) {
}
// Start exec's the node passing the ID and service as command line arguments
-// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
-// variable
+// and the node config encoded as JSON in an environment variable.
func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
if n.Cmd != nil {
return errors.New("already started")
}
defer func() {
if err != nil {
- log.Error("node failed to start", "err", err)
n.Stop()
}
}()
@@ -175,59 +176,78 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
return fmt.Errorf("error generating node config: %s", err)
}
- // use a pipe for stderr so we can both copy the node's stderr to
- // os.Stderr and read the WebSocket address from the logs
- stderrR, stderrW := io.Pipe()
- stderr := io.MultiWriter(os.Stderr, stderrW)
+ // start the one-shot server that waits for startup information
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ statusURL, statusC := n.waitForStartupJSON(ctx)
// start the node
cmd := n.newCmd()
cmd.Stdout = os.Stdout
- cmd.Stderr = stderr
- cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData))
+ cmd.Stderr = os.Stderr
+ cmd.Env = append(os.Environ(),
+ envStatusURL+"="+statusURL,
+ envNodeConfig+"="+string(confData),
+ )
if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting node: %s", err)
}
n.Cmd = cmd
// read the WebSocket address from the stderr logs
- var wsAddr string
- wsAddrC := make(chan string)
- go func() {
- s := bufio.NewScanner(stderrR)
- for s.Scan() {
- if strings.Contains(s.Text(), "WebSocket endpoint opened") {
- wsAddrC <- wsAddrPattern.FindString(s.Text())
- }
- }
- }()
- select {
- case wsAddr = <-wsAddrC:
- if wsAddr == "" {
- return errors.New("failed to read WebSocket address from stderr")
- }
- case <-time.After(10 * time.Second):
- return errors.New("timed out waiting for WebSocket address on stderr")
+ status := <-statusC
+ if status.Err != "" {
+ return errors.New(status.Err)
}
-
- // create the RPC client and load the node info
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- client, err := rpc.DialWebsocket(ctx, wsAddr, "")
+ client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "http://localhost")
if err != nil {
- return fmt.Errorf("error dialing rpc websocket: %s", err)
+ return fmt.Errorf("can't connect to RPC server: %v", err)
}
- var info p2p.NodeInfo
- if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil {
- return fmt.Errorf("error getting node info: %s", err)
- }
- n.client = client
- n.wsAddr = wsAddr
- n.Info = &info
+ // node ready :)
+ n.client = client
+ n.wsAddr = status.WSEndpoint
+ n.Info = status.NodeInfo
return nil
}
+// waitForStartupJSON runs a one-shot HTTP server to receive a startup report.
+func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeStartupJSON) {
+ var (
+ ch = make(chan nodeStartupJSON, 1)
+ quitOnce sync.Once
+ srv http.Server
+ )
+ l, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ ch <- nodeStartupJSON{Err: err.Error()}
+ return "", ch
+ }
+ quit := func(status nodeStartupJSON) {
+ quitOnce.Do(func() {
+ l.Close()
+ ch <- status
+ })
+ }
+ srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var status nodeStartupJSON
+ if err := json.NewDecoder(r.Body).Decode(&status); err != nil {
+ status.Err = fmt.Sprintf("can't decode startup report: %v", err)
+ }
+ quit(status)
+ })
+ // Run the HTTP server, but don't wait forever and shut it down
+ // if the context is canceled.
+ go srv.Serve(l)
+ go func() {
+ <-ctx.Done()
+ quit(nodeStartupJSON{Err: "didn't get startup report"})
+ }()
+
+ url := "http://" + l.Addr().String()
+ return url, ch
+}
+
// execCommand returns a command which runs the node locally by exec'ing
// the current binary but setting argv[0] to "p2p-node" so that the child
// runs execP2PNode
@@ -318,12 +338,6 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) {
return snapshots, n.client.Call(&snapshots, "simulation_snapshot")
}
-func init() {
- // register a reexec function to start a devp2p node when the current
- // binary is executed as "p2p-node"
- reexec.Register("p2p-node", execP2PNode)
-}
-
// execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable
type execNodeConfig struct {
@@ -333,55 +347,69 @@ type execNodeConfig struct {
PeerAddrs map[string]string `json:"peer_addrs,omitempty"`
}
-// ExternalIP gets an external IP address so that Enode URL is usable
-func ExternalIP() net.IP {
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- log.Crit("error getting IP address", "err", err)
- }
- for _, addr := range addrs {
- if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() && !ip.IP.IsLinkLocalUnicast() {
- return ip.IP
- }
- }
- log.Warn("unable to determine explicit IP address, falling back to loopback")
- return net.IP{127, 0, 0, 1}
-}
-
-// execP2PNode starts a devp2p node when the current binary is executed with
+// execP2PNode starts a simulation node when the current binary is executed with
// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
-// and the node config from the _P2P_NODE_CONFIG environment variable
+// and the node config from an environment variable.
func execP2PNode() {
glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
glogger.Verbosity(log.LvlInfo)
log.Root().SetHandler(glogger)
+ statusURL := os.Getenv(envStatusURL)
+ if statusURL == "" {
+ log.Crit("missing " + envStatusURL)
+ }
+
+ // Start the node and gather startup report.
+ var status nodeStartupJSON
+ stack, stackErr := startExecNodeStack()
+ if stackErr != nil {
+ status.Err = stackErr.Error()
+ } else {
+ status.WSEndpoint = "ws://" + stack.WSEndpoint()
+ status.NodeInfo = stack.Server().NodeInfo()
+ }
+
+ // Send status to the host.
+ statusJSON, _ := json.Marshal(status)
+ if _, err := http.Post(statusURL, "application/json", bytes.NewReader(statusJSON)); err != nil {
+ log.Crit("Can't post startup info", "url", statusURL, "err", err)
+ }
+ if stackErr != nil {
+ os.Exit(1)
+ }
+
+ // Stop the stack if we get a SIGTERM signal.
+ go func() {
+ sigc := make(chan os.Signal, 1)
+ signal.Notify(sigc, syscall.SIGTERM)
+ defer signal.Stop(sigc)
+ <-sigc
+ log.Info("Received SIGTERM, shutting down...")
+ stack.Stop()
+ }()
+ stack.Wait() // Wait for the stack to exit.
+}
+func startExecNodeStack() (*node.Node, error) {
// read the services from argv
serviceNames := strings.Split(os.Args[1], ",")
// decode the config
- confEnv := os.Getenv("_P2P_NODE_CONFIG")
+ confEnv := os.Getenv(envNodeConfig)
if confEnv == "" {
- log.Crit("missing _P2P_NODE_CONFIG")
+ return nil, fmt.Errorf("missing " + envNodeConfig)
}
var conf execNodeConfig
if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
- log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
+ return nil, fmt.Errorf("error decoding %s: %v", envNodeConfig, err)
}
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
- if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
- conf.Stack.P2P.ListenAddr = ExternalIP().String() + conf.Stack.P2P.ListenAddr
- }
- if conf.Stack.WSHost == "0.0.0.0" {
- conf.Stack.WSHost = ExternalIP().String()
- }
-
// initialize the devp2p stack
stack, err := node.New(&conf.Stack)
if err != nil {
- log.Crit("error creating node stack", "err", err)
+ return nil, fmt.Errorf("error creating node stack: %v", err)
}
// register the services, collecting them into a map so we can wrap
@@ -390,7 +418,7 @@ func execP2PNode() {
for _, name := range serviceNames {
serviceFunc, exists := serviceFuncs[name]
if !exists {
- log.Crit("unknown node service", "name", name)
+ return nil, fmt.Errorf("unknown node service %q", err)
}
constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) {
ctx := &ServiceContext{
@@ -409,34 +437,35 @@ func execP2PNode() {
return service, nil
}
if err := stack.Register(constructor); err != nil {
- log.Crit("error starting service", "name", name, "err", err)
+ return stack, fmt.Errorf("error registering service %q: %v", name, err)
}
}
// register the snapshot service
- if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
+ err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return &snapshotService{services}, nil
- }); err != nil {
- log.Crit("error starting snapshot service", "err", err)
+ })
+ if err != nil {
+ return stack, fmt.Errorf("error starting snapshot service: %v", err)
}
// start the stack
- if err := stack.Start(); err != nil {
- log.Crit("error stating node stack", "err", err)
+ if err = stack.Start(); err != nil {
+ err = fmt.Errorf("error starting stack: %v", err)
}
+ return stack, err
+}
- // stop the stack if we get a SIGTERM signal
- go func() {
- sigc := make(chan os.Signal, 1)
- signal.Notify(sigc, syscall.SIGTERM)
- defer signal.Stop(sigc)
- <-sigc
- log.Info("Received SIGTERM, shutting down...")
- stack.Stop()
- }()
+const (
+ envStatusURL = "_P2P_STATUS_URL"
+ envNodeConfig = "_P2P_NODE_CONFIG"
+)
- // wait for the stack to exit
- stack.Wait()
+// nodeStartupJSON is sent to the simulation host after startup.
+type nodeStartupJSON struct {
+ Err string
+ WSEndpoint string
+ NodeInfo *p2p.NodeInfo
}
// snapshotService is a node.Service which wraps a list of services and