diff options
-rw-r--r-- | dex/blockproposer.go | 4 | ||||
-rw-r--r-- | dex/handler.go | 34 | ||||
-rw-r--r-- | dex/peer.go | 43 | ||||
-rw-r--r-- | p2p/peer.go | 4 | ||||
-rw-r--r-- | p2p/server.go | 1 |
5 files changed, 68 insertions, 18 deletions
diff --git a/dex/blockproposer.go b/dex/blockproposer.go index 58e6eafee..b51c9d10b 100644 --- a/dex/blockproposer.go +++ b/dex/blockproposer.go @@ -156,6 +156,7 @@ Loop: blocks := blocksToSync(coreHeight, currentBlock.NumberU64()) if len(blocks) == 0 { + log.Debug("No new block to sync", "current", currentBlock.NumberU64()) break Loop } b.watchCat.Feed(blocks[len(blocks)-1].Position) @@ -164,6 +165,7 @@ Loop: "first", blocks[0].Finalization.Height, "last", blocks[len(blocks)-1].Finalization.Height) if _, err := consensusSync.SyncBlocks(blocks, false); err != nil { + log.Debug("SyncBlocks fail", "err", err) return nil, err } coreHeight = blocks[len(blocks)-1].Finalization.Height @@ -182,6 +184,8 @@ Loop: sub := b.dex.blockchain.SubscribeChainHeadEvent(ch) defer sub.Unsubscribe() + log.Debug("Listen chain head event until synced") + // Listen chain head event until synced. ListenLoop: for { diff --git a/dex/handler.go b/dex/handler.go index e887d54d9..6cbd62a8f 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -259,13 +259,17 @@ func (pm *ProtocolManager) removePeer(id string) { // Unregister the peer from the downloader and Ethereum peer set pm.downloader.UnregisterPeer(id) + log.Debug("after downloader unregister peer", "id", id) if err := pm.peers.Unregister(id); err != nil { log.Error("Peer removal failed", "peer", id, "err", err) } + log.Debug("after unregister peer", "id", id) // Hard disconnect at the networking layer if peer != nil { + log.Debug("removePeer: peer disconnect") peer.Peer.Disconnect(p2p.DiscUselessPeer) } + log.Debug("peer removed", "id", id) } func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { @@ -392,16 +396,46 @@ func (pm *ProtocolManager) handle(p *peer) error { // handleMsg is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. func (pm *ProtocolManager) handleMsg(p *peer) error { + ch := make(chan struct{}) + defer close(ch) + + go func() { + n := 0 + for { + select { + case <-time.After(time.Second): + p.Log().Debug("no msg more than 1s", "n", n) + n++ + case <-ch: + return + } + } + }() + // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { return err } + ch <- struct{}{} if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } defer msg.Discard() + go func() { + n := 0 + for { + select { + case <-time.After(100 * time.Millisecond): + p.Log().Debug("handle msg more than 100ms", "n", n, "code", msg.Code) + n++ + case <-ch: + return + } + } + }() + // Handle the message depending on its contents switch { case msg.Code == StatusMsg: diff --git a/dex/peer.go b/dex/peer.go index e015ed9e5..f92fce130 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -380,13 +380,20 @@ func (p *peer) MarkDKGPrivateShares(hash common.Hash) { p.knownDKGPrivateShares.Add(hash) } +func (p *peer) logSend(err error, code uint64) error { + if err != nil { + p.Log().Error("Failed to send peer message", "code", code, "err", err) + } + return err +} + // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *peer) SendTransactions(txs types.Transactions) error { for _, tx := range txs { p.knownTxs.Add(tx.Hash()) } - return p2p.Send(p.rw, TxMsg, txs) + return p.logSend(p2p.Send(p.rw, TxMsg, txs), TxMsg) } // AsyncSendTransactions queues list of transactions propagation to a remote @@ -408,7 +415,7 @@ func (p *peer) SendNodeRecords(records []*enr.Record) error { for _, record := range records { p.knownRecords.Add(rlpHash(record)) } - return p2p.Send(p.rw, RecordMsg, records) + return p.logSend(p2p.Send(p.rw, RecordMsg, records), RecordMsg) } // AsyncSendNodeRecord queues list of notary node records propagation to a @@ -436,7 +443,7 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error request[i].Hash = hashes[i] request[i].Number = numbers[i] } - return p2p.Send(p.rw, NewBlockHashesMsg, request) + return p.logSend(p2p.Send(p.rw, NewBlockHashesMsg, request), NewBlockHashesMsg) } // AsyncSendNewBlockHash queues the availability of a block for propagation to a @@ -454,7 +461,7 @@ func (p *peer) AsyncSendNewBlockHash(block *types.Block) { // SendNewBlock propagates an entire block to a remote peer. func (p *peer) SendNewBlock(block *types.Block) error { p.knownBlocks.Add(block.Hash()) - return p2p.Send(p.rw, NewBlockMsg, block) + return p.logSend(p2p.Send(p.rw, NewBlockMsg, block), NewBlockMsg) } // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If @@ -469,7 +476,7 @@ func (p *peer) AsyncSendNewBlock(block *types.Block) { } func (p *peer) SendCoreBlocks(blocks []*coreTypes.Block) error { - return p2p.Send(p.rw, CoreBlockMsg, blocks) + return p.logSend(p2p.Send(p.rw, CoreBlockMsg, blocks), CoreBlockMsg) } func (p *peer) AsyncSendCoreBlocks(blocks []*coreTypes.Block) { @@ -481,7 +488,7 @@ func (p *peer) AsyncSendCoreBlocks(blocks []*coreTypes.Block) { } func (p *peer) SendVotes(votes []*coreTypes.Vote) error { - return p2p.Send(p.rw, VoteMsg, votes) + return p.logSend(p2p.Send(p.rw, VoteMsg, votes), VoteMsg) } func (p *peer) AsyncSendVotes(votes []*coreTypes.Vote) { @@ -494,7 +501,7 @@ func (p *peer) AsyncSendVotes(votes []*coreTypes.Vote) { func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error { p.knownAgreements.Add(rlpHash(agreement)) - return p2p.Send(p.rw, AgreementMsg, agreement) + return p.logSend(p2p.Send(p.rw, AgreementMsg, agreement), AgreementMsg) } func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) { @@ -510,7 +517,7 @@ func (p *peer) SendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) for _, randomness := range randomnesses { p.knownRandomnesses.Add(rlpHash(randomness)) } - return p2p.Send(p.rw, RandomnessMsg, randomnesses) + return p.logSend(p2p.Send(p.rw, RandomnessMsg, randomnesses), RandomnessMsg) } func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessResult) { @@ -526,7 +533,7 @@ func (p *peer) AsyncSendRandomnesses(randomnesses []*coreTypes.BlockRandomnessRe func (p *peer) SendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) error { p.knownDKGPrivateShares.Add(rlpHash(privateShare)) - return p2p.Send(p.rw, DKGPrivateShareMsg, privateShare) + return p.logSend(p2p.Send(p.rw, DKGPrivateShareMsg, privateShare), DKGPrivateShareMsg) } func (p *peer) AsyncSendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) { @@ -539,7 +546,7 @@ func (p *peer) AsyncSendDKGPrivateShare(privateShare *dkgTypes.PrivateShare) { } func (p *peer) SendDKGPartialSignature(psig *dkgTypes.PartialSignature) error { - return p2p.Send(p.rw, DKGPartialSignatureMsg, psig) + return p.logSend(p2p.Send(p.rw, DKGPartialSignatureMsg, psig), DKGPartialSignatureMsg) } func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) { @@ -551,7 +558,7 @@ func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) { } func (p *peer) SendPullBlocks(hashes coreCommon.Hashes) error { - return p2p.Send(p.rw, PullBlocksMsg, hashes) + return p.logSend(p2p.Send(p.rw, PullBlocksMsg, hashes), PullBlocksMsg) } func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) { @@ -563,7 +570,7 @@ func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) { } func (p *peer) SendPullVotes(pos coreTypes.Position) error { - return p2p.Send(p.rw, PullVotesMsg, pos) + return p.logSend(p2p.Send(p.rw, PullVotesMsg, pos), PullVotesMsg) } func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { @@ -575,7 +582,7 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { } func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error { - return p2p.Send(p.rw, PullRandomnessMsg, hashes) + return p.logSend(p2p.Send(p.rw, PullRandomnessMsg, hashes), PullRandomnessMsg) } func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) { @@ -588,29 +595,29 @@ func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) { // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(flag uint8, headers []*types.HeaderWithGovState) error { - return p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers}) + return p.logSend(p2p.Send(p.rw, BlockHeadersMsg, headersData{Flag: flag, Headers: headers}), BlockHeadersMsg) } // SendBlockBodiesRLP sends a batch of block contents to the remote peer from // an already RLP encoded format. func (p *peer) SendBlockBodiesRLP(flag uint8, bodies []rlp.RawValue) error { - return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesDataRLP{Flag: flag, Bodies: bodies}) + return p.logSend(p2p.Send(p.rw, BlockBodiesMsg, blockBodiesDataRLP{Flag: flag, Bodies: bodies}), BlockBodiesMsg) } // SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the // hashes requested. func (p *peer) SendNodeData(data [][]byte) error { - return p2p.Send(p.rw, NodeDataMsg, data) + return p.logSend(p2p.Send(p.rw, NodeDataMsg, data), NodeDataMsg) } // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the // ones requested from an already RLP encoded format. func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error { - return p2p.Send(p.rw, ReceiptsMsg, receipts) + return p.logSend(p2p.Send(p.rw, ReceiptsMsg, receipts), ReceiptsMsg) } func (p *peer) SendGovState(govState *types.GovState) error { - return p2p.Send(p.rw, GovStateMsg, govState) + return p.logSend(p2p.Send(p.rw, GovStateMsg, govState), GovStateMsg) } // RequestOneHeader is a wrapper around the header query functions to fetch a diff --git a/p2p/peer.go b/p2p/peer.go index 9183082a1..cb6a7e484 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -249,6 +249,7 @@ loop: close(p.closed) p.rw.close(reason) + p.Log().Debug("wait for close") p.wg.Wait() return remoteRequested, err } @@ -257,6 +258,7 @@ func (p *Peer) pingLoop() { ping := time.NewTimer(pingInterval) defer p.wg.Done() defer ping.Stop() + defer p.Log().Debug("pingLoop stopped") for { select { case <-ping.C: @@ -273,6 +275,7 @@ func (p *Peer) pingLoop() { func (p *Peer) readLoop(errc chan<- error) { defer p.wg.Done() + defer p.Log().Debug("readLoop stopped") for { msg, err := p.rw.ReadMsg() if err != nil { @@ -317,6 +320,7 @@ func (p *Peer) handle(msg Msg) error { case proto.in <- msg: return nil case <-p.closed: + p.Log().Debug("peer handle closed return EOF") return io.EOF } } diff --git a/p2p/server.go b/p2p/server.go index c1cf0f9fe..bfd991b1b 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -1087,6 +1087,7 @@ func (srv *Server) runPeer(p *Peer) { Error: err.Error(), }) + p.Log().Debug("send peer drop", "req", remoteRequested, "err", err) // Note: run waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. srv.delpeer <- peerDrop{p, err, remoteRequested} |