aboutsummaryrefslogtreecommitdiffstats
path: root/eth/fetcher
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-08-25 18:57:49 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-08-25 22:48:47 +0800
commit17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d (patch)
tree946f0e2478d8002194c09705e638c74b9d3a8ec5 /eth/fetcher
parent47a7fe5d22fe2a6be783f6576070814fe951eaaf (diff)
downloaddexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.tar.gz
dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.tar.zst
dexon-17f65cd1e5e0fea6e4f7b96c60767aaa0ada366d.zip
eth: update metrics collection to handle eth/62 algos
Diffstat (limited to 'eth/fetcher')
-rw-r--r--eth/fetcher/fetcher.go35
-rw-r--r--eth/fetcher/metrics.go26
2 files changed, 46 insertions, 15 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
index f54256788..b8ec1fc55 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/fetcher.go
@@ -347,18 +347,19 @@ func (f *Fetcher) loop() {
case notification := <-f.notify:
// A block was announced, make sure the peer isn't DOSing us
- announceMeter.Mark(1)
+ propAnnounceInMeter.Mark(1)
count := f.announces[notification.origin] + 1
if count > hashLimit {
glog.V(logger.Debug).Infof("Peer %s: exceeded outstanding announces (%d)", notification.origin, hashLimit)
+ propAnnounceDOSMeter.Mark(1)
break
}
// If we have a valid block number, check that it's potentially useful
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
glog.V(logger.Debug).Infof("[eth/62] Peer %s: discarded announcement #%d [%x…], distance %d", notification.origin, notification.number, notification.hash[:4], dist)
- discardMeter.Mark(1)
+ propAnnounceDropMeter.Mark(1)
break
}
}
@@ -377,7 +378,7 @@ func (f *Fetcher) loop() {
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
- broadcastMeter.Mark(1)
+ propBroadcastInMeter.Mark(1)
f.enqueue(op.origin, op.block)
case hash := <-f.done:
@@ -425,10 +426,12 @@ func (f *Fetcher) loop() {
}
if fetchBlocks != nil {
// Use old eth/61 protocol to retrieve whole blocks
+ blockFetchMeter.Mark(int64(len(hashes)))
fetchBlocks(hashes)
} else {
// Use new eth/62 protocol to retrieve headers first
for _, hash := range hashes {
+ headerFetchMeter.Mark(1)
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
}
}
@@ -467,6 +470,7 @@ func (f *Fetcher) loop() {
if f.completingHook != nil {
f.completingHook(hashes)
}
+ bodyFetchMeter.Mark(int64(len(hashes)))
go f.completing[hashes[0]].fetchBodies(hashes)
}
// Schedule the next fetch if blocks are still pending
@@ -480,6 +484,7 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
+ blockFilterInMeter.Mark(int64(len(blocks)))
explicit, download := []*types.Block{}, []*types.Block{}
for _, block := range blocks {
@@ -498,6 +503,7 @@ func (f *Fetcher) loop() {
}
}
+ blockFilterOutMeter.Mark(int64(len(download)))
select {
case filter <- download:
case <-f.quit:
@@ -520,6 +526,8 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
+ headerFilterInMeter.Mark(int64(len(task.headers)))
+
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
@@ -544,7 +552,10 @@ func (f *Fetcher) loop() {
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
glog.V(logger.Detail).Infof("[eth/62] Peer %s: block #%d [%x…] empty, skipping body retrieval", announce.origin, header.Number.Uint64(), header.Hash().Bytes()[:4])
- complete = append(complete, types.NewBlockWithHeader(header))
+ block := types.NewBlockWithHeader(header)
+ block.ReceivedAt = task.time
+
+ complete = append(complete, block)
f.completing[hash] = announce
continue
}
@@ -559,6 +570,7 @@ func (f *Fetcher) loop() {
unknown = append(unknown, header)
}
}
+ headerFilterOutMeter.Mark(int64(len(unknown)))
select {
case filter <- &headerFilterTask{headers: unknown, time: task.time}:
case <-f.quit:
@@ -590,6 +602,7 @@ func (f *Fetcher) loop() {
case <-f.quit:
return
}
+ bodyFilterInMeter.Mark(int64(len(task.transactions)))
blocks := []*types.Block{}
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
@@ -606,7 +619,10 @@ func (f *Fetcher) loop() {
matched = true
if f.getBlock(hash) == nil {
- blocks = append(blocks, types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]))
+ block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
+ block.ReceivedAt = task.time
+
+ blocks = append(blocks, block)
} else {
f.forgetHash(hash)
}
@@ -621,6 +637,7 @@ func (f *Fetcher) loop() {
}
}
+ bodyFilterOutMeter.Mark(int64(len(task.transactions)))
select {
case filter <- task:
case <-f.quit:
@@ -677,13 +694,14 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
count := f.queues[peer] + 1
if count > blockLimit {
glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], exceeded allowance (%d)", peer, block.NumberU64(), hash.Bytes()[:4], blockLimit)
+ propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
glog.V(logger.Debug).Infof("Peer %s: discarded block #%d [%x…], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist)
- discardMeter.Mark(1)
+ propBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
@@ -724,11 +742,10 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
switch err := f.validateBlock(block, parent); err {
case nil:
// All ok, quickly propagate to our peers
- broadcastTimer.UpdateSince(block.ReceivedAt)
+ propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, true)
case core.BlockFutureErr:
- futureMeter.Mark(1)
// Weird future block, don't fail, but neither propagate
default:
@@ -743,7 +760,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
return
}
// If import succeeded, broadcast the block
- announceTimer.UpdateSince(block.ReceivedAt)
+ propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, false)
// Invoke the testing hook if needed
diff --git a/eth/fetcher/metrics.go b/eth/fetcher/metrics.go
index 76cc49226..b82d3ca01 100644
--- a/eth/fetcher/metrics.go
+++ b/eth/fetcher/metrics.go
@@ -23,10 +23,24 @@ import (
)
var (
- announceMeter = metrics.NewMeter("eth/sync/RemoteAnnounces")
- announceTimer = metrics.NewTimer("eth/sync/LocalAnnounces")
- broadcastMeter = metrics.NewMeter("eth/sync/RemoteBroadcasts")
- broadcastTimer = metrics.NewTimer("eth/sync/LocalBroadcasts")
- discardMeter = metrics.NewMeter("eth/sync/DiscardedBlocks")
- futureMeter = metrics.NewMeter("eth/sync/FutureBlocks")
+ propAnnounceInMeter = metrics.NewMeter("eth/fetcher/prop/announces/in")
+ propAnnounceOutTimer = metrics.NewTimer("eth/fetcher/prop/announces/out")
+ propAnnounceDropMeter = metrics.NewMeter("eth/fetcher/prop/announces/drop")
+ propAnnounceDOSMeter = metrics.NewMeter("eth/fetcher/prop/announces/dos")
+
+ propBroadcastInMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/in")
+ propBroadcastOutTimer = metrics.NewTimer("eth/fetcher/prop/broadcasts/out")
+ propBroadcastDropMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/drop")
+ propBroadcastDOSMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/dos")
+
+ blockFetchMeter = metrics.NewMeter("eth/fetcher/fetch/blocks")
+ headerFetchMeter = metrics.NewMeter("eth/fetcher/fetch/headers")
+ bodyFetchMeter = metrics.NewMeter("eth/fetcher/fetch/bodies")
+
+ blockFilterInMeter = metrics.NewMeter("eth/fetcher/filter/blocks/in")
+ blockFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/blocks/out")
+ headerFilterInMeter = metrics.NewMeter("eth/fetcher/filter/headers/in")
+ headerFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/headers/out")
+ bodyFilterInMeter = metrics.NewMeter("eth/fetcher/filter/bodies/in")
+ bodyFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/bodies/out")
)