diff options
Diffstat (limited to 'p2p/simulations/adapters/exec.go')
-rw-r--r-- | p2p/simulations/adapters/exec.go | 225 |
1 files changed, 127 insertions, 98 deletions
diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index dc7d277ca..abb196717 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -17,7 +17,7 @@ package adapters import ( - "bufio" + "bytes" "context" "crypto/ecdsa" "encoding/json" @@ -25,6 +25,7 @@ import ( "fmt" "io" "net" + "net/http" "os" "os/exec" "os/signal" @@ -43,12 +44,14 @@ import ( "golang.org/x/net/websocket" ) -// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the -// current binary as a child process. -// -// An init hook is used so that the child process executes the node services -// (rather than whataver the main() function would normally do), see the -// execP2PNode function for more information. +func init() { + // Register a reexec function to start a simulation node when the current binary is + // executed as "p2p-node" (rather than whataver the main() function would normally do). + reexec.Register("p2p-node", execP2PNode) +} + +// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the current binary +// as a child process. type ExecAdapter struct { // BaseDir is the directory under which the data directories for each // simulation node are created. @@ -150,15 +153,13 @@ func (n *ExecNode) Client() (*rpc.Client, error) { } // Start exec's the node passing the ID and service as command line arguments -// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment -// variable +// and the node config encoded as JSON in an environment variable. func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { if n.Cmd != nil { return errors.New("already started") } defer func() { if err != nil { - log.Error("node failed to start", "err", err) n.Stop() } }() @@ -175,59 +176,78 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { return fmt.Errorf("error generating node config: %s", err) } - // use a pipe for stderr so we can both copy the node's stderr to - // os.Stderr and read the WebSocket address from the logs - stderrR, stderrW := io.Pipe() - stderr := io.MultiWriter(os.Stderr, stderrW) + // start the one-shot server that waits for startup information + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + statusURL, statusC := n.waitForStartupJSON(ctx) // start the node cmd := n.newCmd() cmd.Stdout = os.Stdout - cmd.Stderr = stderr - cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData)) + cmd.Stderr = os.Stderr + cmd.Env = append(os.Environ(), + envStatusURL+"="+statusURL, + envNodeConfig+"="+string(confData), + ) if err := cmd.Start(); err != nil { return fmt.Errorf("error starting node: %s", err) } n.Cmd = cmd // read the WebSocket address from the stderr logs - var wsAddr string - wsAddrC := make(chan string) - go func() { - s := bufio.NewScanner(stderrR) - for s.Scan() { - if strings.Contains(s.Text(), "WebSocket endpoint opened") { - wsAddrC <- wsAddrPattern.FindString(s.Text()) - } - } - }() - select { - case wsAddr = <-wsAddrC: - if wsAddr == "" { - return errors.New("failed to read WebSocket address from stderr") - } - case <-time.After(10 * time.Second): - return errors.New("timed out waiting for WebSocket address on stderr") + status := <-statusC + if status.Err != "" { + return errors.New(status.Err) } - - // create the RPC client and load the node info - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - client, err := rpc.DialWebsocket(ctx, wsAddr, "") + client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "http://localhost") if err != nil { - return fmt.Errorf("error dialing rpc websocket: %s", err) + return fmt.Errorf("can't connect to RPC server: %v", err) } - var info p2p.NodeInfo - if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil { - return fmt.Errorf("error getting node info: %s", err) - } - n.client = client - n.wsAddr = wsAddr - n.Info = &info + // node ready :) + n.client = client + n.wsAddr = status.WSEndpoint + n.Info = status.NodeInfo return nil } +// waitForStartupJSON runs a one-shot HTTP server to receive a startup report. +func (n *ExecNode) waitForStartupJSON(ctx context.Context) (string, chan nodeStartupJSON) { + var ( + ch = make(chan nodeStartupJSON, 1) + quitOnce sync.Once + srv http.Server + ) + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + ch <- nodeStartupJSON{Err: err.Error()} + return "", ch + } + quit := func(status nodeStartupJSON) { + quitOnce.Do(func() { + l.Close() + ch <- status + }) + } + srv.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var status nodeStartupJSON + if err := json.NewDecoder(r.Body).Decode(&status); err != nil { + status.Err = fmt.Sprintf("can't decode startup report: %v", err) + } + quit(status) + }) + // Run the HTTP server, but don't wait forever and shut it down + // if the context is canceled. + go srv.Serve(l) + go func() { + <-ctx.Done() + quit(nodeStartupJSON{Err: "didn't get startup report"}) + }() + + url := "http://" + l.Addr().String() + return url, ch +} + // execCommand returns a command which runs the node locally by exec'ing // the current binary but setting argv[0] to "p2p-node" so that the child // runs execP2PNode @@ -318,12 +338,6 @@ func (n *ExecNode) Snapshots() (map[string][]byte, error) { return snapshots, n.client.Call(&snapshots, "simulation_snapshot") } -func init() { - // register a reexec function to start a devp2p node when the current - // binary is executed as "p2p-node" - reexec.Register("p2p-node", execP2PNode) -} - // execNodeConfig is used to serialize the node configuration so it can be // passed to the child process as a JSON encoded environment variable type execNodeConfig struct { @@ -333,55 +347,69 @@ type execNodeConfig struct { PeerAddrs map[string]string `json:"peer_addrs,omitempty"` } -// ExternalIP gets an external IP address so that Enode URL is usable -func ExternalIP() net.IP { - addrs, err := net.InterfaceAddrs() - if err != nil { - log.Crit("error getting IP address", "err", err) - } - for _, addr := range addrs { - if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() && !ip.IP.IsLinkLocalUnicast() { - return ip.IP - } - } - log.Warn("unable to determine explicit IP address, falling back to loopback") - return net.IP{127, 0, 0, 1} -} - -// execP2PNode starts a devp2p node when the current binary is executed with +// execP2PNode starts a simulation node when the current binary is executed with // argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2] -// and the node config from the _P2P_NODE_CONFIG environment variable +// and the node config from an environment variable. func execP2PNode() { glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat())) glogger.Verbosity(log.LvlInfo) log.Root().SetHandler(glogger) + statusURL := os.Getenv(envStatusURL) + if statusURL == "" { + log.Crit("missing " + envStatusURL) + } + + // Start the node and gather startup report. + var status nodeStartupJSON + stack, stackErr := startExecNodeStack() + if stackErr != nil { + status.Err = stackErr.Error() + } else { + status.WSEndpoint = "ws://" + stack.WSEndpoint() + status.NodeInfo = stack.Server().NodeInfo() + } + + // Send status to the host. + statusJSON, _ := json.Marshal(status) + if _, err := http.Post(statusURL, "application/json", bytes.NewReader(statusJSON)); err != nil { + log.Crit("Can't post startup info", "url", statusURL, "err", err) + } + if stackErr != nil { + os.Exit(1) + } + + // Stop the stack if we get a SIGTERM signal. + go func() { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM) + defer signal.Stop(sigc) + <-sigc + log.Info("Received SIGTERM, shutting down...") + stack.Stop() + }() + stack.Wait() // Wait for the stack to exit. +} +func startExecNodeStack() (*node.Node, error) { // read the services from argv serviceNames := strings.Split(os.Args[1], ",") // decode the config - confEnv := os.Getenv("_P2P_NODE_CONFIG") + confEnv := os.Getenv(envNodeConfig) if confEnv == "" { - log.Crit("missing _P2P_NODE_CONFIG") + return nil, fmt.Errorf("missing " + envNodeConfig) } var conf execNodeConfig if err := json.Unmarshal([]byte(confEnv), &conf); err != nil { - log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) + return nil, fmt.Errorf("error decoding %s: %v", envNodeConfig, err) } conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) - if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") { - conf.Stack.P2P.ListenAddr = ExternalIP().String() + conf.Stack.P2P.ListenAddr - } - if conf.Stack.WSHost == "0.0.0.0" { - conf.Stack.WSHost = ExternalIP().String() - } - // initialize the devp2p stack stack, err := node.New(&conf.Stack) if err != nil { - log.Crit("error creating node stack", "err", err) + return nil, fmt.Errorf("error creating node stack: %v", err) } // register the services, collecting them into a map so we can wrap @@ -390,7 +418,7 @@ func execP2PNode() { for _, name := range serviceNames { serviceFunc, exists := serviceFuncs[name] if !exists { - log.Crit("unknown node service", "name", name) + return nil, fmt.Errorf("unknown node service %q", err) } constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) { ctx := &ServiceContext{ @@ -409,34 +437,35 @@ func execP2PNode() { return service, nil } if err := stack.Register(constructor); err != nil { - log.Crit("error starting service", "name", name, "err", err) + return stack, fmt.Errorf("error registering service %q: %v", name, err) } } // register the snapshot service - if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { return &snapshotService{services}, nil - }); err != nil { - log.Crit("error starting snapshot service", "err", err) + }) + if err != nil { + return stack, fmt.Errorf("error starting snapshot service: %v", err) } // start the stack - if err := stack.Start(); err != nil { - log.Crit("error stating node stack", "err", err) + if err = stack.Start(); err != nil { + err = fmt.Errorf("error starting stack: %v", err) } + return stack, err +} - // stop the stack if we get a SIGTERM signal - go func() { - sigc := make(chan os.Signal, 1) - signal.Notify(sigc, syscall.SIGTERM) - defer signal.Stop(sigc) - <-sigc - log.Info("Received SIGTERM, shutting down...") - stack.Stop() - }() +const ( + envStatusURL = "_P2P_STATUS_URL" + envNodeConfig = "_P2P_NODE_CONFIG" +) - // wait for the stack to exit - stack.Wait() +// nodeStartupJSON is sent to the simulation host after startup. +type nodeStartupJSON struct { + Err string + WSEndpoint string + NodeInfo *p2p.NodeInfo } // snapshotService is a node.Service which wraps a list of services and |