diff options
-rw-r--r-- | p2p/message.go | 77 | ||||
-rw-r--r-- | p2p/message_test.go | 63 |
2 files changed, 140 insertions, 0 deletions
diff --git a/p2p/message.go b/p2p/message.go index d3b8b74d4..f5418ff47 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -3,9 +3,11 @@ package p2p import ( "bytes" "encoding/binary" + "errors" "io" "io/ioutil" "math/big" + "sync/atomic" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/rlp" @@ -153,3 +155,78 @@ func (r *postrack) ReadByte() (byte, error) { } return b, err } + +// MsgPipe creates a message pipe. Reads on one end are matched +// with writes on the other. The pipe is full-duplex, both ends +// implement MsgReadWriter. +func MsgPipe() (*MsgPipeRW, *MsgPipeRW) { + var ( + c1, c2 = make(chan Msg), make(chan Msg) + closing = make(chan struct{}) + closed = new(int32) + rw1 = &MsgPipeRW{c1, c2, closing, closed} + rw2 = &MsgPipeRW{c2, c1, closing, closed} + ) + return rw1, rw2 +} + +// ErrPipeClosed is returned from pipe operations after the +// pipe has been closed. +var ErrPipeClosed = errors.New("p2p: read or write on closed message pipe") + +// MsgPipeRW is an endpoint of a MsgReadWriter pipe. +type MsgPipeRW struct { + w chan<- Msg + r <-chan Msg + closing chan struct{} + closed *int32 +} + +// WriteMsg sends a messsage on the pipe. +// It blocks until the receiver has consumed the message payload. +func (p *MsgPipeRW) WriteMsg(msg Msg) error { + if atomic.LoadInt32(p.closed) == 0 { + consumed := make(chan struct{}, 1) + msg.Payload = &eofSignal{msg.Payload, int64(msg.Size), consumed} + select { + case p.w <- msg: + if msg.Size > 0 { + // wait for payload read or discard + <-consumed + } + return nil + case <-p.closing: + } + } + return ErrPipeClosed +} + +// EncodeMsg is a convenient shorthand for sending an RLP-encoded message. +func (p *MsgPipeRW) EncodeMsg(code uint64, data ...interface{}) error { + return p.WriteMsg(NewMsg(code, data...)) +} + +// ReadMsg returns a message sent on the other end of the pipe. +func (p *MsgPipeRW) ReadMsg() (Msg, error) { + if atomic.LoadInt32(p.closed) == 0 { + select { + case msg := <-p.r: + return msg, nil + case <-p.closing: + } + } + return Msg{}, ErrPipeClosed +} + +// Close unblocks any pending ReadMsg and WriteMsg calls on both ends +// of the pipe. They will return ErrPipeClosed. Note that Close does +// not interrupt any reads from a message payload. +func (p *MsgPipeRW) Close() error { + if atomic.AddInt32(p.closed, 1) != 1 { + // someone else is already closing + atomic.StoreInt32(p.closed, 1) // avoid overflow + return nil + } + close(p.closing) + return nil +} diff --git a/p2p/message_test.go b/p2p/message_test.go index 7b39b061d..0fbcfeef0 100644 --- a/p2p/message_test.go +++ b/p2p/message_test.go @@ -2,8 +2,11 @@ package p2p import ( "bytes" + "fmt" "io/ioutil" + "runtime" "testing" + "time" "github.com/ethereum/go-ethereum/ethutil" ) @@ -68,3 +71,63 @@ func TestDecodeRealMsg(t *testing.T) { t.Errorf("incorrect code %d, want %d", msg.Code, 0) } } + +func ExampleMsgPipe() { + rw1, rw2 := MsgPipe() + go func() { + rw1.EncodeMsg(8, []byte{0, 0}) + rw1.EncodeMsg(5, []byte{1, 1}) + rw1.Close() + }() + + for { + msg, err := rw2.ReadMsg() + if err != nil { + break + } + var data [1][]byte + msg.Decode(&data) + fmt.Printf("msg: %d, %x\n", msg.Code, data[0]) + } + // Output: + // msg: 8, 0000 + // msg: 5, 0101 +} + +func TestMsgPipeUnblockWrite(t *testing.T) { +loop: + for i := 0; i < 100; i++ { + rw1, rw2 := MsgPipe() + done := make(chan struct{}) + go func() { + if err := rw1.EncodeMsg(1); err == nil { + t.Error("EncodeMsg returned nil error") + } else if err != ErrPipeClosed { + t.Error("EncodeMsg returned wrong error: got %v, want %v", err, ErrPipeClosed) + } + close(done) + }() + + // this call should ensure that EncodeMsg is waiting to + // deliver sometimes. if this isn't done, Close is likely to + // be executed before EncodeMsg starts and then we won't test + // all the cases. + runtime.Gosched() + + rw2.Close() + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Errorf("write didn't unblock") + break loop + } + } +} + +// This test should panic if concurrent close isn't implemented correctly. +func TestMsgPipeConcurrentClose(t *testing.T) { + rw1, _ := MsgPipe() + for i := 0; i < 10; i++ { + go rw1.Close() + } +} |