diff options
-rw-r--r-- | dex/downloader/downloader.go | 32 | ||||
-rw-r--r-- | dex/downloader/downloader_test.go | 105 | ||||
-rw-r--r-- | dex/downloader/peer.go | 5 | ||||
-rw-r--r-- | dex/downloader/testchain_test.go | 12 | ||||
-rw-r--r-- | dex/handler.go | 60 | ||||
-rw-r--r-- | dex/helper_test.go | 8 | ||||
-rw-r--r-- | dex/peer.go | 71 | ||||
-rw-r--r-- | dex/protocol.go | 4 | ||||
-rw-r--r-- | dex/protocol_test.go | 8 | ||||
-rw-r--r-- | dex/sync.go | 15 |
10 files changed, 143 insertions, 177 deletions
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go index 07c72262c..809fe7e4a 100644 --- a/dex/downloader/downloader.go +++ b/dex/downloader/downloader.go @@ -19,7 +19,6 @@ package downloader import ( "errors" "fmt" - "math/big" "sync" "sync/atomic" "time" @@ -177,9 +176,6 @@ type LightChain interface { GetGovStateByNumber(number uint64) (*types.GovState, error) - // GetTd returns the total difficulty of a local block. - GetTd(common.Hash, uint64) *big.Int - // InsertDexonHeaderChain inserts a batch of headers into the local chain. InsertDexonHeaderChain([]*types.HeaderWithGovState, *dexCore.TSigVerifierCache) (int, error) @@ -330,8 +326,8 @@ func (d *Downloader) UnregisterPeer(id string) error { // Synchronise tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. -func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { - err := d.synchronise(id, head, td, mode) +func (d *Downloader) Synchronise(id string, head common.Hash, number uint64, mode SyncMode) error { + err := d.synchronise(id, head, number, mode) switch err { case nil: case errBusy: @@ -354,9 +350,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode } // synchronise will select the peer and use it for synchronising. If an empty string is given -// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the +// it will use the best peer possible and synchronize if its number is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { +func (d *Downloader) synchronise(id string, hash common.Hash, number uint64, mode SyncMode) error { // Mock out the synchronisation if testing if d.synchroniseMock != nil { return d.synchroniseMock(id, hash) @@ -413,12 +409,12 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if p == nil { return errUnknownPeer } - return d.syncWithPeer(p, hash, td) + return d.syncWithPeer(p, hash, number) } // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. -func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { +func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, number uint64) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error @@ -432,7 +428,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I return errTooOld } - log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) + log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "number", number, "mode", d.mode) defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) }(time.Now()) @@ -510,7 +506,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.processHeaders(origin+1, pivot, td) }, + func() error { return d.processHeaders(origin+1, pivot, number) }, } if d.mode == FastSync { fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) @@ -1305,7 +1301,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { +func (d *Downloader) processHeaders(origin uint64, pivot uint64, number uint64) error { // Keep a count of uncertain headers to roll back rollback := []*types.Header{} defer func() { @@ -1351,21 +1347,21 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er case <-d.cancelCh: } } - // If no headers were retrieved at all, the peer violated its TD promise that it had a + // If no headers were retrieved at all, the peer violated its number promise that it had a // better chain compared to ours. The only exception is if its promised blocks were // already imported by other means (e.g. fecher): // // R <remote peer>, L <local node>: Both at block 10 // R: Mine block 11, and propagate it to L // L: Queue block 11 for import - // L: Notice that R's head and TD increased compared to ours, start sync + // L: Notice that R's head and number increased compared to ours, start sync // L: Import of block 11 finishes // L: Sync begins, and finds common ancestor at 11 - // L: Request new headers up from 11 (R's TD was higher, it must have something) + // L: Request new headers up from 11 (R's number was higher, it must have something) // R: Nothing to give if d.mode != LightSync { head := d.blockchain.CurrentBlock() - if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { + if !gotHeaders && number > head.NumberU64() { return errStallingPeer } } @@ -1378,7 +1374,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // peer gave us something useful, we're already happy/progressed (above check). if d.mode == FastSync || d.mode == LightSync { head := d.lightchain.CurrentHeader() - if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { + if number > head.Number.Uint64() { return errStallingPeer } } diff --git a/dex/downloader/downloader_test.go b/dex/downloader/downloader_test.go index c5e1451f5..0d92ad97f 100644 --- a/dex/downloader/downloader_test.go +++ b/dex/downloader/downloader_test.go @@ -19,7 +19,6 @@ package downloader import ( "errors" "fmt" - "math/big" "sync" "sync/atomic" "testing" @@ -56,7 +55,6 @@ type downloadTester struct { ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester - ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain lock sync.RWMutex } @@ -71,7 +69,6 @@ func newTester() *downloadTester { ownHeaders: map[common.Hash]*types.Header{testGenesis.Hash(): testGenesis.Header()}, ownBlocks: map[common.Hash]*types.Block{testGenesis.Hash(): testGenesis}, ownReceipts: map[common.Hash]types.Receipts{testGenesis.Hash(): nil}, - ownChainTd: map[common.Hash]*big.Int{testGenesis.Hash(): testGenesis.Difficulty()}, } tester.stateDb = ethdb.NewMemDatabase() tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00}) @@ -86,17 +83,17 @@ func (dl *downloadTester) terminate() { } // sync starts synchronizing with a remote peer, blocking until it completes. -func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { +func (dl *downloadTester) sync(id string, number uint64, mode SyncMode) error { dl.lock.RLock() hash := dl.peers[id].chain.headBlock().Hash() - // If no particular TD was requested, load from the peer's blockchain - if td == nil { - td = dl.peers[id].chain.td(hash) + // If no particular number was requested, load from the peer's blockchain + if number == 0 { + number, _ = dl.peers[id].chain.hashToNumber(hash) } dl.lock.RUnlock() // Synchronise with the chosen peer and ensure proper cleanup afterwards - err := dl.downloader.synchronise(id, hash, td, mode) + err := dl.downloader.synchronise(id, hash, number, mode) select { case <-dl.downloader.cancelCh: // Ok, downloader fully cancelled after sync cycle @@ -202,14 +199,6 @@ func (dl *downloadTester) FastSyncCommitHead(hash common.Hash) error { return fmt.Errorf("non existent block: %x", hash[:4]) } -// GetTd retrieves the block's total difficulty from the canonical chain. -func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int { - dl.lock.RLock() - defer dl.lock.RUnlock() - - return dl.ownChainTd[hash] -} - // InsertDexonHeaderChain injects a new batch of headers into the simulated chain. func (dl *downloadTester) InsertDexonHeaderChain(headers []*types.HeaderWithGovState, verifierCache *dexCore.TSigVerifierCache) (i int, err error) { dl.lock.Lock() @@ -234,7 +223,6 @@ func (dl *downloadTester) InsertDexonHeaderChain(headers []*types.HeaderWithGovS } dl.ownHashes = append(dl.ownHashes, header.Hash()) dl.ownHeaders[header.Hash()] = header.Header - dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty) } return len(headers), nil } @@ -256,7 +244,6 @@ func (dl *downloadTester) InsertDexonChain(blocks types.Blocks) (i int, err erro } dl.ownBlocks[block.Hash()] = block dl.stateDb.Put(block.Root().Bytes(), []byte{0x00}) - dl.ownChainTd[block.Hash()] = new(big.Int).Add(dl.ownChainTd[block.ParentHash()], block.Difficulty()) } return len(blocks), nil } @@ -288,7 +275,6 @@ func (dl *downloadTester) Rollback(hashes []common.Hash) { if dl.ownHashes[len(dl.ownHashes)-1] == hashes[i] { dl.ownHashes = dl.ownHashes[:len(dl.ownHashes)-1] } - delete(dl.ownChainTd, hashes[i]) delete(dl.ownHeaders, hashes[i]) delete(dl.ownReceipts, hashes[i]) delete(dl.ownBlocks, hashes[i]) @@ -324,9 +310,10 @@ type downloadTesterPeer struct { // Head constructs a function to retrieve a peer's current head hash // and total difficulty. -func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { +func (dlp *downloadTesterPeer) Head() (common.Hash, uint64) { b := dlp.chain.headBlock() - return b.Hash(), dlp.chain.td(b.Hash()) + number, _ := dlp.chain.hashToNumber(b.Hash()) + return b.Hash(), number } // RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed @@ -457,7 +444,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { tester.newPeer("peer", protocol, chain) // Synchronise with the peer and make sure all relevant data was retrieved - if err := tester.sync("peer", nil, mode); err != nil { + if err := tester.sync("peer", 0, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } assertOwnChain(t, tester, chain.len()) @@ -489,7 +476,7 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { // Start a synchronisation concurrently errc := make(chan error) go func() { - errc <- tester.sync("peer", nil, mode) + errc <- tester.sync("peer", 0, mode) }() // Iteratively take some blocks, always checking the retrieval count for { @@ -604,7 +591,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { t.Errorf("download queue not idle") } // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer", nil, mode); err != nil { + if err := tester.sync("peer", 0, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } tester.downloader.Cancel() @@ -635,7 +622,7 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { id := fmt.Sprintf("peer #%d", i) tester.newPeer(id, protocol, chain.shorten(chain.len()/(i+1))) } - if err := tester.sync("peer #0", nil, mode); err != nil { + if err := tester.sync("peer #0", 0, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } assertOwnChain(t, tester, chain.len()) @@ -665,7 +652,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { tester.newPeer("peer 64", 64, chain) // Synchronise with the requested peer and make sure all blocks were retrieved - if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil { + if err := tester.sync(fmt.Sprintf("peer %d", protocol), 0, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } assertOwnChain(t, tester, chain.len()) @@ -707,7 +694,7 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { atomic.AddInt32(&receiptsHave, int32(len(headers))) } // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("peer", nil, mode); err != nil { + if err := tester.sync("peer", 0, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } assertOwnChain(t, tester, chain.len()) @@ -752,12 +739,12 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2]) tester.newPeer("attack", protocol, brokenChain) - if err := tester.sync("attack", nil, mode); err == nil { + if err := tester.sync("attack", 0, mode); err == nil { t.Fatalf("succeeded attacker synchronisation") } // Synchronise with the valid peer and make sure sync succeeds tester.newPeer("valid", protocol, chain) - if err := tester.sync("valid", nil, mode); err != nil { + if err := tester.sync("valid", 0, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } assertOwnChain(t, tester, chain.len()) @@ -786,13 +773,13 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { delete(brokenChain.blockm, brokenChain.chain[1]) delete(brokenChain.receiptm, brokenChain.chain[1]) tester.newPeer("attack", protocol, brokenChain) - if err := tester.sync("attack", nil, mode); err == nil { + if err := tester.sync("attack", 0, mode); err == nil { t.Fatalf("succeeded attacker synchronisation") } // Synchronise with the valid peer and make sure sync succeeds tester.newPeer("valid", protocol, chain) - if err := tester.sync("valid", nil, mode); err != nil { + if err := tester.sync("valid", 0, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } assertOwnChain(t, tester, chain.len()) @@ -822,7 +809,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { delete(fastAttackChain.headerm, fastAttackChain.chain[missing]) tester.newPeer("fast-attack", protocol, fastAttackChain) - if err := tester.sync("fast-attack", nil, mode); err == nil { + if err := tester.sync("fast-attack", 0, mode); err == nil { t.Fatalf("succeeded fast attacker synchronisation") } if head := tester.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch { @@ -838,7 +825,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { delete(blockAttackChain.headerm, blockAttackChain.chain[missing]) tester.newPeer("block-attack", protocol, blockAttackChain) - if err := tester.sync("block-attack", nil, mode); err == nil { + if err := tester.sync("block-attack", 0, mode); err == nil { t.Fatalf("succeeded block attacker synchronisation") } if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { @@ -861,7 +848,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { } tester.downloader.syncInitHook = nil } - if err := tester.sync("withhold-attack", nil, mode); err == nil { + if err := tester.sync("withhold-attack", 0, mode); err == nil { t.Fatalf("succeeded withholding attacker synchronisation") } if head := tester.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch { @@ -878,7 +865,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { // sync. Note, we can't assert anything about the receipts since we won't purge the // database of them, hence we can't use assertOwnChain. tester.newPeer("valid", protocol, chain) - if err := tester.sync("valid", nil, mode); err != nil { + if err := tester.sync("valid", 0, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if hs := len(tester.ownHeaders); hs != chain.len() { @@ -891,16 +878,26 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { } } -// Tests that a peer advertising an high TD doesn't get to stall the downloader +// Tests that a peer advertising an high number doesn't get to stall the downloader // afterwards by not sending any useful hashes. -func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) } -func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) } -func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) } -func TestHighTDStarvationAttack64Full(t *testing.T) { testHighTDStarvationAttack(t, 64, FullSync) } -func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttack(t, 64, FastSync) } -func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) } - -func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { +func TestHighNumberStarvationAttack62(t *testing.T) { testHighNumberStarvationAttack(t, 62, FullSync) } +func TestHighNumberStarvationAttack63Full(t *testing.T) { + testHighNumberStarvationAttack(t, 63, FullSync) +} +func TestHighNumberStarvationAttack63Fast(t *testing.T) { + testHighNumberStarvationAttack(t, 63, FastSync) +} +func TestHighNumberStarvationAttack64Full(t *testing.T) { + testHighNumberStarvationAttack(t, 64, FullSync) +} +func TestHighNumberStarvationAttack64Fast(t *testing.T) { + testHighNumberStarvationAttack(t, 64, FastSync) +} +func TestHighNumberStarvationAttack64Light(t *testing.T) { + testHighNumberStarvationAttack(t, 64, LightSync) +} + +func testHighNumberStarvationAttack(t *testing.T, protocol int, mode SyncMode) { t.Parallel() tester := newTester() @@ -908,7 +905,7 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { chain := testChainBase.shorten(1) tester.newPeer("attack", protocol, chain) - if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { + if err := tester.sync("attack", 1000000, mode); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } } @@ -964,7 +961,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { // Simulate a synchronisation and check the required result tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } - tester.downloader.Synchronise(id, tester.genesis.Hash(), big.NewInt(1000), FullSync) + tester.downloader.Synchronise(id, tester.genesis.Hash(), 1000, FullSync) if _, ok := tester.peers[id]; !ok != tt.drop { t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) } @@ -1004,7 +1001,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() - if err := tester.sync("peer-half", nil, mode); err != nil { + if err := tester.sync("peer-half", 0, mode); err != nil { panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() @@ -1020,7 +1017,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { pending.Add(1) go func() { defer pending.Done() - if err := tester.sync("peer-full", nil, mode); err != nil { + if err := tester.sync("peer-full", 0, mode); err != nil { panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() @@ -1090,7 +1087,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { pending.Add(1) go func() { defer pending.Done() - if err := tester.sync("faulty", nil, mode); err == nil { + if err := tester.sync("faulty", 0, mode); err == nil { panic("succeeded faulty synchronisation") } }() @@ -1108,7 +1105,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { pending.Add(1) go func() { defer pending.Done() - if err := tester.sync("valid", nil, mode); err != nil { + if err := tester.sync("valid", 0, mode); err != nil { panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() @@ -1161,7 +1158,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { pending.Add(1) go func() { defer pending.Done() - if err := tester.sync("attack", nil, mode); err == nil { + if err := tester.sync("attack", 0, mode); err == nil { panic("succeeded attacker synchronisation") } }() @@ -1181,7 +1178,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() - if err := tester.sync("valid", nil, mode); err != nil { + if err := tester.sync("valid", 0, mode); err != nil { panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() @@ -1240,7 +1237,7 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { peer: tester.downloader.peers.peers["peer"].peer, tester: tester, } - if err := tester.sync("peer", nil, mode); err != nil { + if err := tester.sync("peer", 0, mode); err != nil { t.Errorf("test %d: sync failed: %v", i, err) } tester.terminate() @@ -1252,7 +1249,7 @@ type floodingTestPeer struct { tester *downloadTester } -func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() } +func (ftp *floodingTestPeer) Head() (common.Hash, uint64) { return ftp.peer.Head() } func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse, withGov bool) error { return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse, withGov) } diff --git a/dex/downloader/peer.go b/dex/downloader/peer.go index b6f2936b7..25c355df1 100644 --- a/dex/downloader/peer.go +++ b/dex/downloader/peer.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "math" - "math/big" "sort" "sync" "sync/atomic" @@ -77,7 +76,7 @@ type peerConnection struct { // LightPeer encapsulates the methods required to synchronise with a remote light peer. type LightPeer interface { - Head() (common.Hash, *big.Int) + Head() (common.Hash, uint64) RequestHeadersByHash(common.Hash, int, int, bool, bool) error RequestHeadersByNumber(uint64, int, int, bool, bool) error RequestGovStateByHash(common.Hash) error @@ -96,7 +95,7 @@ type lightPeerWrapper struct { peer LightPeer } -func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() } +func (w *lightPeerWrapper) Head() (common.Hash, uint64) { return w.peer.Head() } func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse, withGov bool) error { return w.peer.RequestHeadersByHash(h, amount, skip, reverse, withGov) } diff --git a/dex/downloader/testchain_test.go b/dex/downloader/testchain_test.go index 5dcc1f403..810fb41b3 100644 --- a/dex/downloader/testchain_test.go +++ b/dex/downloader/testchain_test.go @@ -96,7 +96,6 @@ type testChain struct { headerm map[common.Hash]*types.Header blockm map[common.Hash]*types.Block receiptm map[common.Hash][]*types.Receipt - tdm map[common.Hash]*big.Int } // newTestChain creates a blockchain of the given length. @@ -105,7 +104,6 @@ func newTestChain(length int, genesis *types.Block) *testChain { tc.genesis = genesis tc.chain = append(tc.chain, genesis.Hash()) tc.headerm[tc.genesis.Hash()] = tc.genesis.Header() - tc.tdm[tc.genesis.Hash()] = tc.genesis.Difficulty() tc.blockm[tc.genesis.Hash()] = tc.genesis tc.generate(length-1, 0, genesis, false) return tc @@ -133,12 +131,10 @@ func (tc *testChain) copy(newlen int) *testChain { headerm: make(map[common.Hash]*types.Header, newlen), blockm: make(map[common.Hash]*types.Block, newlen), receiptm: make(map[common.Hash][]*types.Receipt, newlen), - tdm: make(map[common.Hash]*big.Int, newlen), } for i := 0; i < len(tc.chain) && i < newlen; i++ { hash := tc.chain[i] cpy.chain = append(cpy.chain, tc.chain[i]) - cpy.tdm[hash] = tc.tdm[hash] cpy.blockm[hash] = tc.blockm[hash] cpy.headerm[hash] = tc.headerm[hash] cpy.receiptm[hash] = tc.receiptm[hash] @@ -179,15 +175,12 @@ func (tc *testChain) generate(n int, seed byte, parent *types.Block, heavy bool) }) // Convert the block-chain into a hash-chain and header/block maps - td := new(big.Int).Set(tc.td(parent.Hash())) for i, b := range blocks { - td := td.Add(td, b.Difficulty()) hash := b.Hash() tc.chain = append(tc.chain, hash) tc.blockm[hash] = b tc.headerm[hash] = b.Header() tc.receiptm[hash] = receipts[i] - tc.tdm[hash] = new(big.Int).Set(td) } } @@ -201,11 +194,6 @@ func (tc *testChain) headBlock() *types.Block { return tc.blockm[tc.chain[len(tc.chain)-1]] } -// td returns the total difficulty of the given block. -func (tc *testChain) td(hash common.Hash) *big.Int { - return tc.tdm[hash] -} - // headersByHash returns headers in ascending order from the given hash. func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int) []*types.HeaderWithGovState { num, _ := tc.hashToNumber(origin) diff --git a/dex/handler.go b/dex/handler.go index c64da2c87..5753a7cd8 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -38,7 +38,6 @@ import ( "errors" "fmt" "math" - "math/big" "net" "sync" "sync/atomic" @@ -350,9 +349,8 @@ func (pm *ProtocolManager) handle(p *peer) error { head = pm.blockchain.CurrentHeader() hash = head.Hash() number = head.Number.Uint64() - td = pm.blockchain.GetTd(hash, number) ) - if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil { + if err := p.Handshake(pm.networkID, number, hash, genesis.Hash()); err != nil { p.Log().Debug("Ethereum handshake failed", "err", err) return err } @@ -727,32 +725,32 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { break } // Retrieve and decode the propagated block - var request newBlockData - if err := msg.Decode(&request); err != nil { + var block types.Block + if err := msg.Decode(&block); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } - request.Block.ReceivedAt = msg.ReceivedAt - request.Block.ReceivedFrom = p + block.ReceivedAt = msg.ReceivedAt + block.ReceivedFrom = p // Mark the peer as owning the block and schedule it for import - p.MarkBlock(request.Block.Hash()) - pm.fetcher.Enqueue(p.id, request.Block) + p.MarkBlock(block.Hash()) + pm.fetcher.Enqueue(p.id, &block) // Assuming the block is importable by the peer, but possibly not yet done so, - // calculate the head hash and TD that the peer truly must have. + // calculate the head hash and number that the peer truly must have. var ( - trueHead = request.Block.ParentHash() - trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) + trueHead = block.ParentHash() + trueNumber = block.NumberU64() - 1 ) - // Update the peers total difficulty if better than the previous - if _, td := p.Head(); trueTD.Cmp(td) > 0 { - p.SetHead(trueHead, trueTD) + // Update the peers number if better than the previous + if _, number := p.Head(); trueNumber > number { + p.SetHead(trueHead, trueNumber) // Schedule a sync if above ours. Note, this will not fire a sync for a gap of - // a singe block (as the true TD is below the propagated block), however this + // a singe block (as the true number is below the propagated block), however this // scenario should easily be covered by the fetcher. currentBlock := pm.blockchain.CurrentBlock() - if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { + if trueNumber > currentBlock.NumberU64() { go pm.synchronise(p) } } @@ -917,18 +915,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // If propagation is requested, send to a subset of the peer if propagate { - // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) - var td *big.Int - if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { - td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) - } else { + if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent == nil { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { - peer.AsyncSendNewBlock(block, td) + peer.AsyncSendNewBlock(block) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return @@ -1201,21 +1195,21 @@ func (pm *ProtocolManager) peerSetLoop() { // NodeInfo represents a short summary of the Ethereum sub-protocol metadata // known about the host peer. type NodeInfo struct { - Network uint64 `json:"network"` // DEXON network ID (237=Mainnet, 238=Taiwan, 239=Taipei) - Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain - Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block - Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules - Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block + Network uint64 `json:"network"` // DEXON network ID (237=Mainnet, 238=Taiwan, 239=Taipei) + Number uint64 `json:"number"` // Total difficulty of the host's blockchain + Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block + Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules + Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block } // NodeInfo retrieves some protocol metadata about the running host node. func (pm *ProtocolManager) NodeInfo() *NodeInfo { currentBlock := pm.blockchain.CurrentBlock() return &NodeInfo{ - Network: pm.networkID, - Difficulty: pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()), - Genesis: pm.blockchain.Genesis().Hash(), - Config: pm.blockchain.Config(), - Head: currentBlock.Hash(), + Network: pm.networkID, + Number: currentBlock.NumberU64(), + Genesis: pm.blockchain.Genesis().Hash(), + Config: pm.blockchain.Config(), + Head: currentBlock.Hash(), } } diff --git a/dex/helper_test.go b/dex/helper_test.go index abe0f6b89..62dd6c5f4 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -264,20 +264,20 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te var ( genesis = pm.blockchain.Genesis() head = pm.blockchain.CurrentHeader() - td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + number = head.Number.Uint64() ) - tp.handshake(nil, td, head.Hash(), genesis.Hash()) + tp.handshake(nil, number, head.Hash(), genesis.Hash()) } return tp, errc } // handshake simulates a trivial handshake that expects the same state from the // remote side as we are simulating locally. -func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, genesis common.Hash) { +func (p *testPeer) handshake(t *testing.T, number uint64, head common.Hash, genesis common.Hash) { msg := &statusData{ ProtocolVersion: uint32(p.version), NetworkId: DefaultConfig.NetworkId, - TD: td, + Number: number, CurrentBlock: head, GenesisBlock: genesis, } diff --git a/dex/peer.go b/dex/peer.go index f60145f0d..49a9b64f8 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -37,7 +37,6 @@ import ( "encoding/hex" "errors" "fmt" - "math/big" "net" "sync" "time" @@ -110,15 +109,9 @@ const ( // PeerInfo represents a short summary of the Ethereum sub-protocol metadata known // about a connected peer. type PeerInfo struct { - Version int `json:"version"` // Ethereum protocol version negotiated - Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain - Head string `json:"head"` // SHA3 hash of the peer's best owned block -} - -// propEvent is a block propagation, waiting for its turn in the broadcast queue. -type propEvent struct { - block *types.Block - td *big.Int + Version int `json:"version"` // Ethereum protocol version negotiated + Number uint64 `json:"number"` // Number the peer's blockchain + Head string `json:"head"` // SHA3 hash of the peer's best owned block } type setType uint32 @@ -142,9 +135,9 @@ type peer struct { version int // Protocol version negotiated - head common.Hash - td *big.Int - lock sync.RWMutex + head common.Hash + number uint64 + lock sync.RWMutex knownTxs mapset.Set // Set of transaction hashes known to be known by this peer knownMetas mapset.Set // Set of node metas known to be known by this peer @@ -157,7 +150,7 @@ type peer struct { knownDKGPartialSignatures mapset.Set queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer - queuedProps chan *propEvent // Queue of blocks to broadcast to the peer + queuedProps chan *types.Block // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer queuedLatticeBlocks chan *coreTypes.Block queuedVotes chan *coreTypes.Vote @@ -187,7 +180,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { knownDKGPartialSignatures: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), - queuedProps: make(chan *propEvent, maxQueuedProps), + queuedProps: make(chan *types.Block, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks), queuedVotes: make(chan *coreTypes.Vote, maxQueuedVotes), @@ -213,11 +206,11 @@ func (p *peer) broadcast() { } p.Log().Trace("Broadcast node metas", "count", len(metas)) - case prop := <-p.queuedProps: - if err := p.SendNewBlock(prop.block, prop.td); err != nil { + case block := <-p.queuedProps: + if err := p.SendNewBlock(block); err != nil { return } - p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + p.Log().Trace("Propagated block", "number", block.Number(), "hash", block.Hash()) case block := <-p.queuedAnns: if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { @@ -286,32 +279,32 @@ func (p *peer) close() { // Info gathers and returns a collection of metadata known about a peer. func (p *peer) Info() *PeerInfo { - hash, td := p.Head() + hash, number := p.Head() return &PeerInfo{ - Version: p.version, - Difficulty: td, - Head: hash.Hex(), + Version: p.version, + Number: number, + Head: hash.Hex(), } } -// Head retrieves a copy of the current head hash and total difficulty of the +// Head retrieves a copy of the current head hash and number of the // peer. -func (p *peer) Head() (hash common.Hash, td *big.Int) { +func (p *peer) Head() (hash common.Hash, number uint64) { p.lock.RLock() defer p.lock.RUnlock() copy(hash[:], p.head[:]) - return hash, new(big.Int).Set(p.td) + return hash, p.number } -// SetHead updates the head hash and total difficulty of the peer. -func (p *peer) SetHead(hash common.Hash, td *big.Int) { +// SetHead updates the head hash and number of the peer. +func (p *peer) SetHead(hash common.Hash, number uint64) { p.lock.Lock() defer p.lock.Unlock() copy(p.head[:], hash[:]) - p.td.Set(td) + p.number = number } // MarkBlock marks a block as known for the peer, ensuring that the block will @@ -413,16 +406,16 @@ func (p *peer) AsyncSendNewBlockHash(block *types.Block) { } // SendNewBlock propagates an entire block to a remote peer. -func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { +func (p *peer) SendNewBlock(block *types.Block) error { p.knownBlocks.Add(block.Hash()) - return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) + return p2p.Send(p.rw, NewBlockMsg, block) } // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If // the peer's broadcast queue is full, the event is silently dropped. -func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { +func (p *peer) AsyncSendNewBlock(block *types.Block) { select { - case p.queuedProps <- &propEvent{block: block, td: td}: + case p.queuedProps <- block: p.knownBlocks.Add(block.Hash()) default: p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) @@ -617,7 +610,7 @@ func (p *peer) RequestReceipts(hashes []common.Hash) error { // Handshake executes the eth protocol handshake, negotiating version number, // network IDs, difficulties, head and genesis blocks. -func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error { +func (p *peer) Handshake(network uint64, number uint64, head common.Hash, genesis common.Hash) error { // Send out own handshake in a new thread errc := make(chan error, 2) var status statusData // safe to read after two values have been received from errc @@ -626,7 +619,7 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis errc <- p2p.Send(p.rw, StatusMsg, &statusData{ ProtocolVersion: uint32(p.version), NetworkId: network, - TD: td, + Number: number, CurrentBlock: head, GenesisBlock: genesis, }) @@ -646,7 +639,7 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis return p2p.DiscReadTimeout } } - p.td, p.head = status.TD, status.CurrentBlock + p.number, p.head = status.Number, status.CurrentBlock return nil } @@ -906,12 +899,12 @@ func (ps *peerSet) BestPeer() *peer { defer ps.lock.RUnlock() var ( - bestPeer *peer - bestTd *big.Int + bestPeer *peer + bestNumber uint64 ) for _, p := range ps.peers { - if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { - bestPeer, bestTd = p, td + if _, number := p.Head(); bestPeer == nil || number > bestNumber { + bestPeer, bestNumber = p, number } } return bestPeer diff --git a/dex/protocol.go b/dex/protocol.go index 21168e917..49bd0cc20 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -37,7 +37,6 @@ import ( "crypto/ecdsa" "fmt" "io" - "math/big" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core" @@ -172,7 +171,7 @@ type p2pServer interface { type statusData struct { ProtocolVersion uint32 NetworkId uint64 - TD *big.Int + Number uint64 CurrentBlock common.Hash GenesisBlock common.Hash } @@ -232,7 +231,6 @@ func (hn *hashOrNumber) DecodeRLP(s *rlp.Stream) error { // newBlockData is the network packet for the block propagation message. type newBlockData struct { Block *types.Block - TD *big.Int } // blockBody represents the data content of a single block. diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 1b380cac7..686ba372f 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -54,7 +54,7 @@ func testStatusMsgErrors(t *testing.T, protocol int) { var ( genesis = pm.blockchain.Genesis() head = pm.blockchain.CurrentHeader() - td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + number = head.Number.Uint64() ) defer pm.Stop() @@ -68,15 +68,15 @@ func testStatusMsgErrors(t *testing.T, protocol int) { wantError: errResp(ErrNoStatusMsg, "first msg has code 2 (!= 0)"), }, { - code: StatusMsg, data: statusData{10, DefaultConfig.NetworkId, td, head.Hash(), genesis.Hash()}, + code: StatusMsg, data: statusData{10, DefaultConfig.NetworkId, number, head.Hash(), genesis.Hash()}, wantError: errResp(ErrProtocolVersionMismatch, "10 (!= %d)", protocol), }, { - code: StatusMsg, data: statusData{uint32(protocol), 999, td, head.Hash(), genesis.Hash()}, + code: StatusMsg, data: statusData{uint32(protocol), 999, number, head.Hash(), genesis.Hash()}, wantError: errResp(ErrNetworkIdMismatch, "999 (!= 237)"), }, { - code: StatusMsg, data: statusData{uint32(protocol), DefaultConfig.NetworkId, td, head.Hash(), common.Hash{3}}, + code: StatusMsg, data: statusData{uint32(protocol), DefaultConfig.NetworkId, number, head.Hash(), common.Hash{3}}, wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000 (!= %x)", genesis.Hash().Bytes()[:8]), }, } diff --git a/dex/sync.go b/dex/sync.go index b6a8035d4..43f1291ff 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -257,12 +257,13 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if peer == nil { return } - // Make sure the peer's TD is higher than our own + // Make sure the peer's number is higher than our own currentBlock := pm.blockchain.CurrentBlock() - td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) + number := currentBlock.NumberU64() - pHead, pTd := peer.Head() - if pTd.Cmp(td) <= 0 { + pHead, pNumber := peer.Head() + + if pNumber <= number { return } // Otherwise try to sync with the downloader @@ -282,13 +283,13 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if mode == downloader.FastSync { // Make sure the peer's total difficulty we are synchronizing is higher. - if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 { + if pm.blockchain.CurrentFastBlock().NumberU64() >= pNumber { return } } // Run the sync cycle, and disable fast sync if we've went past the pivot block - if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { + if err := pm.downloader.Synchronise(peer.id, pHead, pNumber, mode); err != nil { return } if atomic.LoadUint32(&pm.fastSync) == 1 { @@ -302,7 +303,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { // all its out-of-date peers of the availability of a new block. This failure // scenario will most often crop up in private and hackathon networks with // degenerate connectivity, but it should be healthy for the mainnet too to - // more reliably update peers or the local TD state. + // more reliably update peers or the local number state. go pm.BroadcastBlock(head, false) } } |