diff options
Diffstat (limited to 'p2p/peer.go')
-rw-r--r-- | p2p/peer.go | 17 |
1 files changed, 12 insertions, 5 deletions
diff --git a/p2p/peer.go b/p2p/peer.go index 893ba86d7..86c4d7ab5 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -300,7 +300,7 @@ func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) proto.in <- msg } else { wait = true - pr := &eofSignal{msg.Payload, protoDone} + pr := &eofSignal{msg.Payload, int64(msg.Size), protoDone} msg.Payload = pr proto.in <- msg } @@ -438,18 +438,25 @@ func (rw *proto) ReadMsg() (Msg, error) { return msg, nil } -// eofSignal wraps a reader with eof signaling. -// the eof channel is closed when the wrapped reader -// reaches EOF. +// eofSignal wraps a reader with eof signaling. the eof channel is +// closed when the wrapped reader returns an error or when count bytes +// have been read. +// type eofSignal struct { wrapped io.Reader + count int64 eof chan<- struct{} } +// note: when using eofSignal to detect whether a message payload +// has been read, Read might not be called for zero sized messages. + func (r *eofSignal) Read(buf []byte) (int, error) { n, err := r.wrapped.Read(buf) - if err != nil { + r.count -= int64(n) + if (err != nil || r.count <= 0) && r.eof != nil { r.eof <- struct{}{} // tell Peer that msg has been consumed + r.eof = nil } return n, err } |