diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-06-15 23:21:30 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-06-15 23:21:30 +0800 |
commit | f475a013264b5036dfe9fcc04e21c4112845b9a2 (patch) | |
tree | 03c5347f0abf1b88131a20f55479924f657ae6fb | |
parent | 263903378b81f4a17ab34f5ad0d3a7ceb2b5dea9 (diff) | |
parent | 70da79f04c14e562c024e85c6b081b6b4b8e45ec (diff) | |
download | dexon-f475a013264b5036dfe9fcc04e21c4112845b9a2.tar.gz dexon-f475a013264b5036dfe9fcc04e21c4112845b9a2.tar.zst dexon-f475a013264b5036dfe9fcc04e21c4112845b9a2.zip |
Merge pull request #1261 from fjl/p2p-no-writes-at-shutdown
p2p: prevent writes at shutdown time
-rw-r--r-- | p2p/peer.go | 87 | ||||
-rw-r--r-- | p2p/peer_test.go | 2 |
2 files changed, 61 insertions, 28 deletions
diff --git a/p2p/peer.go b/p2p/peer.go index cbe5ccc84..40466cf84 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -115,41 +115,60 @@ func newPeer(conn *conn, protocols []Protocol) *Peer { } func (p *Peer) run() DiscReason { - readErr := make(chan error, 1) + var ( + writeStart = make(chan struct{}, 1) + writeErr = make(chan error, 1) + readErr = make(chan error, 1) + reason DiscReason + requested bool + ) p.wg.Add(2) go p.readLoop(readErr) go p.pingLoop() - p.startProtocols() + // Start all protocol handlers. + writeStart <- struct{}{} + p.startProtocols(writeStart, writeErr) // Wait for an error or disconnect. - var ( - reason DiscReason - requested bool - ) - select { - case err := <-readErr: - if r, ok := err.(DiscReason); ok { - reason = r - } else { - // Note: We rely on protocols to abort if there is a write - // error. It might be more robust to handle them here as well. - glog.V(logger.Detail).Infof("%v: Read error: %v\n", p, err) - reason = DiscNetworkError +loop: + for { + select { + case err := <-writeErr: + // A write finished. Allow the next write to start if + // there was no error. + if err != nil { + glog.V(logger.Detail).Infof("%v: write error: %v\n", p, err) + reason = DiscNetworkError + break loop + } + writeStart <- struct{}{} + case err := <-readErr: + if r, ok := err.(DiscReason); ok { + glog.V(logger.Debug).Infof("%v: remote requested disconnect: %v\n", p, r) + requested = true + reason = r + } else { + glog.V(logger.Detail).Infof("%v: read error: %v\n", p, err) + reason = DiscNetworkError + } + break loop + case err := <-p.protoErr: + reason = discReasonForError(err) + glog.V(logger.Debug).Infof("%v: protocol error: %v (%v)\n", p, err, reason) + break loop + case reason = <-p.disc: + glog.V(logger.Debug).Infof("%v: locally requested disconnect: %v\n", p, reason) + break loop } - case err := <-p.protoErr: - reason = discReasonForError(err) - case reason = <-p.disc: - requested = true } + close(p.closed) p.rw.close(reason) p.wg.Wait() - if requested { reason = DiscRequested } - glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason) return reason } @@ -196,7 +215,6 @@ func (p *Peer) handle(msg Msg) error { // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. rlp.Decode(msg.Payload, &reason) - glog.V(logger.Debug).Infof("%v: Disconnect Requested: %v\n", p, reason[0]) return reason[0] case msg.Code < baseProtocolLength: // ignore other base protocol messages @@ -247,11 +265,13 @@ outer: return result } -func (p *Peer) startProtocols() { +func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) { p.wg.Add(len(p.running)) for _, proto := range p.running { proto := proto proto.closed = p.closed + proto.wstart = writeStart + proto.werr = writeErr glog.V(logger.Detail).Infof("%v: Starting protocol %s/%d\n", p, proto.Name, proto.Version) go func() { err := proto.Run(p, proto) @@ -280,18 +300,31 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) { type protoRW struct { Protocol - in chan Msg - closed <-chan struct{} + in chan Msg // receices read messages + closed <-chan struct{} // receives when peer is shutting down + wstart <-chan struct{} // receives when write may start + werr chan<- error // for write results offset uint64 w MsgWriter } -func (rw *protoRW) WriteMsg(msg Msg) error { +func (rw *protoRW) WriteMsg(msg Msg) (err error) { if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset - return rw.w.WriteMsg(msg) + select { + case <-rw.wstart: + err = rw.w.WriteMsg(msg) + // Report write status back to Peer.run. It will initiate + // shutdown if the error is non-nil and unblock the next write + // otherwise. The calling protocol code should exit for errors + // as well but we don't want to rely on that. + rw.werr <- err + case <-rw.closed: + err = fmt.Errorf("shutting down") + } + return err } func (rw *protoRW) ReadMsg() (Msg, error) { diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 7b772e198..575d0ff79 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -121,7 +121,7 @@ func TestPeerDisconnect(t *testing.T) { } select { case reason := <-disc: - if reason != DiscQuitting { + if reason != DiscRequested { t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested) } case <-time.After(500 * time.Millisecond): |