aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-06-15 23:21:30 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-06-15 23:21:30 +0800
commitf475a013264b5036dfe9fcc04e21c4112845b9a2 (patch)
tree03c5347f0abf1b88131a20f55479924f657ae6fb
parent263903378b81f4a17ab34f5ad0d3a7ceb2b5dea9 (diff)
parent70da79f04c14e562c024e85c6b081b6b4b8e45ec (diff)
downloadgo-tangerine-f475a013264b5036dfe9fcc04e21c4112845b9a2.tar.gz
go-tangerine-f475a013264b5036dfe9fcc04e21c4112845b9a2.tar.zst
go-tangerine-f475a013264b5036dfe9fcc04e21c4112845b9a2.zip
Merge pull request #1261 from fjl/p2p-no-writes-at-shutdown
p2p: prevent writes at shutdown time
-rw-r--r--p2p/peer.go87
-rw-r--r--p2p/peer_test.go2
2 files changed, 61 insertions, 28 deletions
diff --git a/p2p/peer.go b/p2p/peer.go
index cbe5ccc84..40466cf84 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -115,41 +115,60 @@ func newPeer(conn *conn, protocols []Protocol) *Peer {
}
func (p *Peer) run() DiscReason {
- readErr := make(chan error, 1)
+ var (
+ writeStart = make(chan struct{}, 1)
+ writeErr = make(chan error, 1)
+ readErr = make(chan error, 1)
+ reason DiscReason
+ requested bool
+ )
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()
- p.startProtocols()
+ // Start all protocol handlers.
+ writeStart <- struct{}{}
+ p.startProtocols(writeStart, writeErr)
// Wait for an error or disconnect.
- var (
- reason DiscReason
- requested bool
- )
- select {
- case err := <-readErr:
- if r, ok := err.(DiscReason); ok {
- reason = r
- } else {
- // Note: We rely on protocols to abort if there is a write
- // error. It might be more robust to handle them here as well.
- glog.V(logger.Detail).Infof("%v: Read error: %v\n", p, err)
- reason = DiscNetworkError
+loop:
+ for {
+ select {
+ case err := <-writeErr:
+ // A write finished. Allow the next write to start if
+ // there was no error.
+ if err != nil {
+ glog.V(logger.Detail).Infof("%v: write error: %v\n", p, err)
+ reason = DiscNetworkError
+ break loop
+ }
+ writeStart <- struct{}{}
+ case err := <-readErr:
+ if r, ok := err.(DiscReason); ok {
+ glog.V(logger.Debug).Infof("%v: remote requested disconnect: %v\n", p, r)
+ requested = true
+ reason = r
+ } else {
+ glog.V(logger.Detail).Infof("%v: read error: %v\n", p, err)
+ reason = DiscNetworkError
+ }
+ break loop
+ case err := <-p.protoErr:
+ reason = discReasonForError(err)
+ glog.V(logger.Debug).Infof("%v: protocol error: %v (%v)\n", p, err, reason)
+ break loop
+ case reason = <-p.disc:
+ glog.V(logger.Debug).Infof("%v: locally requested disconnect: %v\n", p, reason)
+ break loop
}
- case err := <-p.protoErr:
- reason = discReasonForError(err)
- case reason = <-p.disc:
- requested = true
}
+
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
-
if requested {
reason = DiscRequested
}
- glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason)
return reason
}
@@ -196,7 +215,6 @@ func (p *Peer) handle(msg Msg) error {
// This is the last message. We don't need to discard or
// check errors because, the connection will be closed after it.
rlp.Decode(msg.Payload, &reason)
- glog.V(logger.Debug).Infof("%v: Disconnect Requested: %v\n", p, reason[0])
return reason[0]
case msg.Code < baseProtocolLength:
// ignore other base protocol messages
@@ -247,11 +265,13 @@ outer:
return result
}
-func (p *Peer) startProtocols() {
+func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running))
for _, proto := range p.running {
proto := proto
proto.closed = p.closed
+ proto.wstart = writeStart
+ proto.werr = writeErr
glog.V(logger.Detail).Infof("%v: Starting protocol %s/%d\n", p, proto.Name, proto.Version)
go func() {
err := proto.Run(p, proto)
@@ -280,18 +300,31 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) {
type protoRW struct {
Protocol
- in chan Msg
- closed <-chan struct{}
+ in chan Msg // receices read messages
+ closed <-chan struct{} // receives when peer is shutting down
+ wstart <-chan struct{} // receives when write may start
+ werr chan<- error // for write results
offset uint64
w MsgWriter
}
-func (rw *protoRW) WriteMsg(msg Msg) error {
+func (rw *protoRW) WriteMsg(msg Msg) (err error) {
if msg.Code >= rw.Length {
return newPeerError(errInvalidMsgCode, "not handled")
}
msg.Code += rw.offset
- return rw.w.WriteMsg(msg)
+ select {
+ case <-rw.wstart:
+ err = rw.w.WriteMsg(msg)
+ // Report write status back to Peer.run. It will initiate
+ // shutdown if the error is non-nil and unblock the next write
+ // otherwise. The calling protocol code should exit for errors
+ // as well but we don't want to rely on that.
+ rw.werr <- err
+ case <-rw.closed:
+ err = fmt.Errorf("shutting down")
+ }
+ return err
}
func (rw *protoRW) ReadMsg() (Msg, error) {
diff --git a/p2p/peer_test.go b/p2p/peer_test.go
index 7b772e198..575d0ff79 100644
--- a/p2p/peer_test.go
+++ b/p2p/peer_test.go
@@ -121,7 +121,7 @@ func TestPeerDisconnect(t *testing.T) {
}
select {
case reason := <-disc:
- if reason != DiscQuitting {
+ if reason != DiscRequested {
t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested)
}
case <-time.After(500 * time.Millisecond):