aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dex/blockproposer.go4
-rw-r--r--dex/handler.go34
-rw-r--r--dex/peer.go43
-rw-r--r--p2p/peer.go4
-rw-r--r--p2p/server.go1
5 files changed, 68 insertions, 18 deletions
diff --git a/dex/blockproposer.go b/dex/blockproposer.go
index 58e6eafee..b51c9d10b 100644
--- a/dex/blockproposer.go
+++ b/dex/blockproposer.go
@@ -156,6 +156,7 @@ Loop:
blocks := blocksToSync(coreHeight, currentBlock.NumberU64())
if len(blocks) == 0 {
+ log.Debug("No new block to sync", "current", currentBlock.NumberU64())
break Loop
}
b.watchCat.Feed(blocks[len(blocks)-1].Position)
@@ -164,6 +165,7 @@ Loop:
"first", blocks[0].Finalization.Height,
"last", blocks[len(blocks)-1].Finalization.Height)
if _, err := consensusSync.SyncBlocks(blocks, false); err != nil {
+ log.Debug("SyncBlocks fail", "err", err)
return nil, err
}
coreHeight = blocks[len(blocks)-1].Finalization.Height
@@ -182,6 +184,8 @@ Loop:
sub := b.dex.blockchain.SubscribeChainHeadEvent(ch)
defer sub.Unsubscribe()
+ log.Debug("Listen chain head event until synced")
+
// Listen chain head event until synced.
ListenLoop:
for {
diff --git a/dex/handler.go b/dex/handler.go
index e887d54d9..6cbd62a8f 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -259,13 +259,17 @@ func (pm *ProtocolManager) removePeer(id string) {
// Unregister the peer from the downloader and Ethereum peer set
pm.downloader.UnregisterPeer(id)
+ log.Debug("after downloader unregister peer", "id", id)
if err := pm.peers.Unregister(id); err != nil {
log.Error("Peer removal failed", "peer", id, "err", err)
}
+ log.Debug("after unregister peer", "id", id)
// Hard disconnect at the networking layer
if peer != nil {
+ log.Debug("removePeer: peer disconnect")
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}
+ log.Debug("peer removed", "id", id)
}
func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
@@ -392,16 +396,46 @@ func (pm *ProtocolManager) handle(p *peer) error {
// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
+ ch := make(chan struct{})
+ defer close(ch)
+
+ go func() {
+ n := 0
+ for {
+ select {
+ case <-time.After(time.Second):
+ p.Log().Debug("no msg more than 1s", "n", n)
+ n++
+ case <-ch:
+ return
+ }
+ }
+ }()
+
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
+ ch <- struct{}{}
if msg.Size > ProtocolMaxMsgSize {
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
}
defer msg.Discard()
+ go func() {
+ n := 0
+ for {
+ select {
+ case <-time.After(100 * time.Millisecond):
+ p.Log().Debug("handle msg more than 100ms", "n", n, "code", msg.Code)
+ n++
+ case <-ch:
+ return
+ }
+ }
+ }()
+
// Handle the message depending on its contents
switch {
case msg.Code == StatusMsg:
diff --git a/dex/peer.go b/dex/peer.go
index e015ed9e5..f92fce130 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -380,13 +380,20 @@ func (p *peer) MarkDKGPrivateShares(hash common.Hash) {
p.knownDKGPrivateShares.Add(hash)
}
+func (p *peer) logSend(err error, code uint64) error {
+ if err != nil {
+ p.Log().Error("Failed to send peer message", "code", code, "err", err)
+ }
+ return err
+}
+
// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func (p *peer) SendTransactions(txs types.Transactions) error {
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
- return p2p.Send(p.rw, TxMsg, txs)
+ return p.logSend(p2p.Send(p.rw, TxMsg, txs), TxMsg)
}
// AsyncSendTransactions queues list of transactions propagation to a remote
@@ -408,7 +415,7 @@ func (p *peer) SendNodeRecords(records []*enr.Record) error {
for _, record := range records {
p.knownRecords.Add(rlpHash(record))
}
- return p2p.Send(p.rw, RecordMsg, records)
+ return p.logSend(p2p.Send(p.rw, RecordMsg, records), RecordMsg)
}
// AsyncSendNodeRecord queues list of notary node records propagation to a
@@ -436,7 +443,7 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
request[i].Hash = hashes[i]
request[i].Number = numbers[i]
}
- return p2p.Send(p.rw, NewBlockHashesMsg, request)
+ return p.logSend(p2p.Send(p.rw, NewBlockHashesMsg, request), NewBlockHashesMsg)
}
// AsyncSendNewBlockHash queues the availability of a block for propagation to a
@@ -454,7 +461,7 @@ func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
// SendNewBlock propagates an entire block to a remote peer.
func (p *peer) SendNewBlock(block *types.Block) error {
p.knownBlocks.Add(block.Hash())
- return p2p.Send(p.rw, NewBlockMsg, block)
+ return p.logSend(p2p.Send(p.rw, NewBlockMsg, block), NewBlockMsg)
}
// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
@@ -469,7 +476,7 @@ func (p *peer) AsyncSendNewBlock(block *types.Block) {
}
func (p *peer) SendCoreBlocks(blocks []*coreTypes.Block) error {
- return p2p.Send(p.rw, CoreBlockMsg, blocks)
+ return p.logSend(p2p.Send(p.rw, CoreBlockMsg, blocks), CoreBlockMsg)
}
func (p *peer) AsyncSendCoreBlocks(blocks []*coreTypes.Block) {
@@ -481,7 +488,7 @@ func (p *peer) AsyncSendCoreBlocks(blocks []*coreTypes.Block) {
}
func (p *peer) SendVotes(votes []*coreTypes.Vote) error {
- return p2p.Send(p.rw, VoteMsg, votes)
+ return p.logSend(p2p.Send(p.rw, VoteMsg, votes), VoteMsg)
}
func (p *peer) AsyncSendVotes(votes []*coreTypes.Vote) {
@@ -494,7 +501,7 @@ func (p *peer) AsyncSendVotes(votes []*coreTypes.Vote) {
func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error {
p.knownAgreements.Add(rlpHash(agreement))
- return p2p.Send(p.rw, AgreementMsg, agreement)
+ return p.logSend(p2p.Send(p.rw, AgreementMsg, agreement), AgreementMsg)
}
func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
@@ -510,7 +517,7 @@ func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult)
for _, randomness := range randomnesses {
p.knownRandomnesses.Add(rlpHash(randomness))
}
- return p2p.Send(p.rw, RandomnessMsg, randomnesses)
+ return p.logSend(p2p.Send(p.rw, RandomnessMsg, randomnesses), RandomnessMsg)
}
func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) {
@@ -526,7 +533,7 @@ func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessRe
func (p *peer) SendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) error {
p.knownDKGPrivateShares.Add(rlpHash(privateShare))
- return p2p.Send(p.rw, DKGPrivateShareMsg, privateShare)
+ return p.logSend(p2p.Send(p.rw, DKGPrivateShareMsg, privateShare), DKGPrivateShareMsg)
}
func (p *peer) AsyncSendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) {
@@ -539,7 +546,7 @@ func (p *peer) AsyncSendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) {
}
func (p *peer) SendDKGPartialSignature(psig *dkgTypes.PartialSignature) error {
- return p2p.Send(p.rw, DKGPartialSignatureMsg, psig)
+ return p.logSend(p2p.Send(p.rw, DKGPartialSignatureMsg, psig), DKGPartialSignatureMsg)
}
func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) {
@@ -551,7 +558,7 @@ func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) {
}
func (p *peer) SendPullBlocks(hashes coreCommon.Hashes) error {
- return p2p.Send(p.rw, PullBlocksMsg, hashes)
+ return p.logSend(p2p.Send(p.rw, PullBlocksMsg, hashes), PullBlocksMsg)
}
func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) {
@@ -563,7 +570,7 @@ func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) {
}
func (p *peer) SendPullVotes(pos coreTypes.Position) error {
- return p2p.Send(p.rw, PullVotesMsg, pos)
+ return p.logSend(p2p.Send(p.rw, PullVotesMsg, pos), PullVotesMsg)
}
func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
@@ -575,7 +582,7 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
}
func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error {
- return p2p.Send(p.rw, PullRandomnessMsg, hashes)
+ return p.logSend(p2p.Send(p.rw, PullRandomnessMsg, hashes), PullRandomnessMsg)
}
func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) {
@@ -588,29 +595,29 @@ func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) {
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(flag uint8, headers []*types.HeaderWithGovState) error {
- return p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers})
+ return p.logSend(p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers}), BlockHeadersMsg)
}
// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
// an already RLP encoded format.
func (p *peer) SendBlockBodiesRLP(flag uint8, bodies []rlp.RawValue) error {
- return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesDataRLP{Flag: flag, Bodies: bodies})
+ return p.logSend(p2p.Send(p.rw, BlockBodiesMsg, blockBodiesDataRLP{Flag: flag, Bodies: bodies}), BlockBodiesMsg)
}
// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func (p *peer) SendNodeData(data [][]byte) error {
- return p2p.Send(p.rw, NodeDataMsg, data)
+ return p.logSend(p2p.Send(p.rw, NodeDataMsg, data), NodeDataMsg)
}
// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
// ones requested from an already RLP encoded format.
func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
- return p2p.Send(p.rw, ReceiptsMsg, receipts)
+ return p.logSend(p2p.Send(p.rw, ReceiptsMsg, receipts), ReceiptsMsg)
}
func (p *peer) SendGovState(govState *types.GovState) error {
- return p2p.Send(p.rw, GovStateMsg, govState)
+ return p.logSend(p2p.Send(p.rw, GovStateMsg, govState), GovStateMsg)
}
// RequestOneHeader is a wrapper around the header query functions to fetch a
diff --git a/p2p/peer.go b/p2p/peer.go
index 9183082a1..cb6a7e484 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -249,6 +249,7 @@ loop:
close(p.closed)
p.rw.close(reason)
+ p.Log().Debug("wait for close")
p.wg.Wait()
return remoteRequested, err
}
@@ -257,6 +258,7 @@ func (p *Peer) pingLoop() {
ping := time.NewTimer(pingInterval)
defer p.wg.Done()
defer ping.Stop()
+ defer p.Log().Debug("pingLoop stopped")
for {
select {
case <-ping.C:
@@ -273,6 +275,7 @@ func (p *Peer) pingLoop() {
func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done()
+ defer p.Log().Debug("readLoop stopped")
for {
msg, err := p.rw.ReadMsg()
if err != nil {
@@ -317,6 +320,7 @@ func (p *Peer) handle(msg Msg) error {
case proto.in <- msg:
return nil
case <-p.closed:
+ p.Log().Debug("peer handle closed return EOF")
return io.EOF
}
}
diff --git a/p2p/server.go b/p2p/server.go
index c1cf0f9fe..bfd991b1b 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -1087,6 +1087,7 @@ func (srv *Server) runPeer(p *Peer) {
Error: err.Error(),
})
+ p.Log().Debug("send peer drop", "req", remoteRequested, "err", err)
// Note: run waits for existing peers to be sent on srv.delpeer
// before returning, so this send should not select on srv.quit.
srv.delpeer <- peerDrop{p, err, remoteRequested}