diff options
Diffstat (limited to 'whisper/peer_test.go')
-rw-r--r-- | whisper/peer_test.go | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/whisper/peer_test.go b/whisper/peer_test.go new file mode 100644 index 000000000..9008cdc59 --- /dev/null +++ b/whisper/peer_test.go @@ -0,0 +1,242 @@ +package whisper + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +type testPeer struct { + client *Whisper + stream *p2p.MsgPipeRW + termed chan struct{} +} + +func startTestPeer() *testPeer { + // Create a simulated P2P remote peer and data streams to it + remote := p2p.NewPeer(discover.NodeID{}, "", nil) + tester, tested := p2p.MsgPipe() + + // Create a whisper client and connect with it to the tester peer + client := New() + client.Start() + + termed := make(chan struct{}) + go func() { + defer client.Stop() + defer close(termed) + defer tested.Close() + + client.handlePeer(remote, tested) + }() + + return &testPeer{ + client: client, + stream: tester, + termed: termed, + } +} + +func startTestPeerInited() (*testPeer, error) { + peer := startTestPeer() + + if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil { + peer.stream.Close() + return nil, err + } + if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil { + peer.stream.Close() + return nil, err + } + return peer, nil +} + +func TestPeerStatusMessage(t *testing.T) { + tester := startTestPeer() + + // Wait for the handshake status message and check it + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Terminate the node + tester.stream.Close() + + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("local close timed out") + } +} + +func TestPeerHandshakeFail(t *testing.T) { + tester := startTestPeer() + + // Wait for and check the handshake + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Send an invalid handshake status and verify disconnect + if err := p2p.SendItems(tester.stream, messagesCode); err != nil { + t.Fatalf("failed to send malformed status: %v", err) + } + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("remote close timed out") + } +} + +func TestPeerHandshakeSuccess(t *testing.T) { + tester := startTestPeer() + + // Wait for and check the handshake + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Send a valid handshake status and make sure connection stays live + if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { + t.Fatalf("failed to send status: %v", err) + } + select { + case <-tester.termed: + t.Fatalf("valid handshake disconnected") + + case <-time.After(100 * time.Millisecond): + } + // Clean up the test + tester.stream.Close() + + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("local close timed out") + } +} + +func TestPeerSend(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Construct a message and inject into the tester + message := NewMessage([]byte("peer broadcast test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + // Check that the message is eventually forwarded + payload := []interface{}{envelope} + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + t.Fatalf("message mismatch: %v", err) + } + // Make sure that even with a re-insert, an empty batch is received + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { + t.Fatalf("message mismatch: %v", err) + } +} + +func TestPeerDeliver(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Watch for all inbound messages + arrived := make(chan struct{}, 1) + tester.client.Watch(Filter{ + Fn: func(message *Message) { + arrived <- struct{}{} + }, + }) + // Construct a message and deliver it to the tester peer + message := NewMessage([]byte("peer broadcast test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { + t.Fatalf("failed to transfer message: %v", err) + } + // Check that the message is delivered upstream + select { + case <-arrived: + case <-time.After(time.Second): + t.Fatalf("message delivery timeout") + } + // Check that a resend is not delivered + if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { + t.Fatalf("failed to transfer message: %v", err) + } + select { + case <-time.After(2 * transmissionCycle): + case <-arrived: + t.Fatalf("repeating message arrived") + } +} + +func TestPeerMessageExpiration(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Fetch the peer instance for later inspection + tester.client.peerMu.RLock() + if peers := len(tester.client.peers); peers != 1 { + t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1) + } + var peer *peer + for peer, _ = range tester.client.peers { + break + } + tester.client.peerMu.RUnlock() + + // Construct a message and pass it through the tester + message := NewMessage([]byte("peer test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: time.Second, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + payload := []interface{}{envelope} + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + t.Fatalf("message mismatch: %v", err) + } + // Check that the message is inside the cache + if !peer.known.Has(envelope.Hash()) { + t.Fatalf("message not found in cache") + } + // Discard messages until expiration and check cache again + exp := time.Now().Add(time.Second + expirationCycle) + for time.Now().Before(exp) { + if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { + t.Fatalf("message mismatch: %v", err) + } + } + if peer.known.Has(envelope.Hash()) { + t.Fatalf("message not expired from cache") + } +} |