aboutsummaryrefslogtreecommitdiffstats
path: root/les
diff options
context:
space:
mode:
authorgary rong <garyrong0905@gmail.com>2018-08-28 15:08:16 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-08-28 15:08:16 +0800
commitb69476b372a26679e5bdb33db3d508f2c955e7ff (patch)
tree47757ef2b65302f19aca96327b7a34ad73f652a5 /les
parentc64d72bea207ccaca3f6aded25d8730a4b8696cd (diff)
downloaddexon-b69476b372a26679e5bdb33db3d508f2c955e7ff.tar.gz
dexon-b69476b372a26679e5bdb33db3d508f2c955e7ff.tar.zst
dexon-b69476b372a26679e5bdb33db3d508f2c955e7ff.zip
all: make indexer configurable (#17188)
Diffstat (limited to 'les')
-rw-r--r--les/api_backend.go2
-rw-r--r--les/backend.go13
-rw-r--r--les/bloombits.go4
-rw-r--r--les/commons.go6
-rw-r--r--les/handler.go13
-rw-r--r--les/handler_test.go179
-rw-r--r--les/helper_test.go159
-rw-r--r--les/odr.go15
-rw-r--r--les/odr_requests.go20
-rw-r--r--les/odr_test.go45
-rw-r--r--les/peer.go30
-rw-r--r--les/request_test.go43
-rw-r--r--les/server.go14
13 files changed, 332 insertions, 211 deletions
diff --git a/les/api_backend.go b/les/api_backend.go
index 4232d3ae0..aa748a4ea 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -192,7 +192,7 @@ func (b *LesApiBackend) BloomStatus() (uint64, uint64) {
return 0, 0
}
sections, _, _ := b.eth.bloomIndexer.Sections()
- return light.BloomTrieFrequency, sections
+ return params.BloomBitsBlocksClient, sections
}
func (b *LesApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
diff --git a/les/backend.go b/les/backend.go
index 00025ba63..75049da08 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -95,6 +95,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
lesCommons: lesCommons{
chainDb: chainDb,
config: config,
+ iConfig: light.DefaultClientIndexerConfig,
},
chainConfig: chainConfig,
eventMux: ctx.EventMux,
@@ -105,16 +106,16 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
shutdownChan: make(chan bool),
networkId: config.NetworkId,
bloomRequests: make(chan chan *bloombits.Retrieval),
- bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency, light.HelperTrieConfirmations),
+ bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
}
leth.relay = NewLesTxRelay(peers, leth.reqDist)
leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
- leth.odr = NewLesOdr(chainDb, leth.retriever)
- leth.chtIndexer = light.NewChtIndexer(chainDb, true, leth.odr)
- leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true, leth.odr)
+ leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
+ leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequencyClient, params.HelperTrieConfirmations)
+ leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency)
leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer)
// Note: NewLightChain adds the trusted checkpoint so it needs an ODR with
@@ -135,7 +136,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
}
leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay)
- if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil {
+ if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil {
return nil, err
}
leth.ApiBackend = &LesApiBackend{leth, nil}
@@ -230,8 +231,8 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *LightEthereum) Start(srvr *p2p.Server) error {
- s.startBloomHandlers()
log.Warn("Light client mode is an experimental feature")
+ s.startBloomHandlers(params.BloomBitsBlocksClient)
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId)
// clients are searching for the first advertised protocol in the list
protocolVersion := AdvertiseProtocolVersions[0]
diff --git a/les/bloombits.go b/les/bloombits.go
index 2871a9006..aea0fcd5f 100644
--- a/les/bloombits.go
+++ b/les/bloombits.go
@@ -43,7 +43,7 @@ const (
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
// retrievals from possibly a range of filters and serving the data to satisfy.
-func (eth *LightEthereum) startBloomHandlers() {
+func (eth *LightEthereum) startBloomHandlers(sectionSize uint64) {
for i := 0; i < bloomServiceThreads; i++ {
go func() {
for {
@@ -57,7 +57,7 @@ func (eth *LightEthereum) startBloomHandlers() {
compVectors, err := light.GetBloomBits(task.Context, eth.odr, task.Bit, task.Sections)
if err == nil {
for i := range task.Sections {
- if blob, err := bitutil.DecompressBytes(compVectors[i], int(light.BloomTrieFrequency/8)); err == nil {
+ if blob, err := bitutil.DecompressBytes(compVectors[i], int(sectionSize/8)); err == nil {
task.Bitsets[i] = blob
} else {
task.Error = err
diff --git a/les/commons.go b/les/commons.go
index d8e941295..a97687993 100644
--- a/les/commons.go
+++ b/les/commons.go
@@ -33,6 +33,7 @@ import (
// lesCommons contains fields needed by both server and client.
type lesCommons struct {
config *eth.Config
+ iConfig *light.IndexerConfig
chainDb ethdb.Database
protocolManager *ProtocolManager
chtIndexer, bloomTrieIndexer *core.ChainIndexer
@@ -81,7 +82,7 @@ func (c *lesCommons) nodeInfo() interface{} {
if !c.protocolManager.lightSync {
// convert to client section size if running in server mode
- sections /= light.CHTFrequencyClient / light.CHTFrequencyServer
+ sections /= c.iConfig.PairChtSize / c.iConfig.ChtSize
}
if sections2 < sections {
@@ -94,7 +95,8 @@ func (c *lesCommons) nodeInfo() interface{} {
if c.protocolManager.lightSync {
chtRoot = light.GetChtRoot(c.chainDb, sectionIndex, sectionHead)
} else {
- chtRoot = light.GetChtV2Root(c.chainDb, sectionIndex, sectionHead)
+ idxV2 := (sectionIndex+1)*c.iConfig.PairChtSize/c.iConfig.ChtSize - 1
+ chtRoot = light.GetChtRoot(c.chainDb, idxV2, sectionHead)
}
cht = light.TrustedCheckpoint{
SectionIdx: sectionIndex,
diff --git a/les/handler.go b/les/handler.go
index ca40eaabf..243a6dabd 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -94,6 +94,7 @@ type ProtocolManager struct {
txrelay *LesTxRelay
networkId uint64
chainConfig *params.ChainConfig
+ iConfig *light.IndexerConfig
blockchain BlockChain
chainDb ethdb.Database
odr *LesOdr
@@ -123,13 +124,14 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
+func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
eventMux: mux,
blockchain: blockchain,
chainConfig: chainConfig,
+ iConfig: indexerConfig,
chainDb: chainDb,
odr: odr,
networkId: networkId,
@@ -882,7 +884,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix))
for _, req := range req.Reqs {
if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
- sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*light.CHTFrequencyServer-1)
+ sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*pm.iConfig.ChtSize-1)
if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) {
trie, err := trie.New(root, trieDb)
if err != nil {
@@ -1137,10 +1139,11 @@ func (pm *ProtocolManager) getAccount(statedb *state.StateDB, root, hash common.
func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
switch id {
case htCanonical:
- sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*light.CHTFrequencyClient-1)
- return light.GetChtV2Root(pm.chainDb, idx, sectionHead), light.ChtTablePrefix
+ idxV1 := (idx+1)*(pm.iConfig.PairChtSize/pm.iConfig.ChtSize) - 1
+ sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idxV1+1)*pm.iConfig.ChtSize-1)
+ return light.GetChtRoot(pm.chainDb, idxV1, sectionHead), light.ChtTablePrefix
case htBloomBits:
- sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*light.BloomTrieFrequency-1)
+ sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*pm.iConfig.BloomTrieSize-1)
return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
}
return common.Hash{}, ""
diff --git a/les/handler_test.go b/les/handler_test.go
index 31aad3ed4..43be7f41b 100644
--- a/les/handler_test.go
+++ b/les/handler_test.go
@@ -51,10 +51,9 @@ func TestGetBlockHeadersLes1(t *testing.T) { testGetBlockHeaders(t, 1) }
func TestGetBlockHeadersLes2(t *testing.T) { testGetBlockHeaders(t, 2) }
func testGetBlockHeaders(t *testing.T, protocol int) {
- pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil, nil, ethdb.NewMemDatabase())
- bc := pm.blockchain.(*core.BlockChain)
- peer, _ := newTestPeer(t, "peer", protocol, pm, true)
- defer peer.close()
+ server, tearDown := newServerEnv(t, downloader.MaxHashFetch+15, protocol, nil)
+ defer tearDown()
+ bc := server.pm.blockchain.(*core.BlockChain)
// Create a "random" unknown hash for testing
var unknown common.Hash
@@ -167,9 +166,9 @@ func testGetBlockHeaders(t *testing.T, protocol int) {
}
// Send the hash request and verify the response
reqID++
- cost := peer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount))
- sendRequest(peer.app, GetBlockHeadersMsg, reqID, cost, tt.query)
- if err := expectResponse(peer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil {
+ cost := server.tPeer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount))
+ sendRequest(server.tPeer.app, GetBlockHeadersMsg, reqID, cost, tt.query)
+ if err := expectResponse(server.tPeer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
}
@@ -180,10 +179,9 @@ func TestGetBlockBodiesLes1(t *testing.T) { testGetBlockBodies(t, 1) }
func TestGetBlockBodiesLes2(t *testing.T) { testGetBlockBodies(t, 2) }
func testGetBlockBodies(t *testing.T, protocol int) {
- pm := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil, nil, nil, ethdb.NewMemDatabase())
- bc := pm.blockchain.(*core.BlockChain)
- peer, _ := newTestPeer(t, "peer", protocol, pm, true)
- defer peer.close()
+ server, tearDown := newServerEnv(t, downloader.MaxBlockFetch+15, protocol, nil)
+ defer tearDown()
+ bc := server.pm.blockchain.(*core.BlockChain)
// Create a batch of tests for various scenarios
limit := MaxBodyFetch
@@ -243,9 +241,9 @@ func testGetBlockBodies(t *testing.T, protocol int) {
}
reqID++
// Send the hash request and verify the response
- cost := peer.GetRequestCost(GetBlockBodiesMsg, len(hashes))
- sendRequest(peer.app, GetBlockBodiesMsg, reqID, cost, hashes)
- if err := expectResponse(peer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil {
+ cost := server.tPeer.GetRequestCost(GetBlockBodiesMsg, len(hashes))
+ sendRequest(server.tPeer.app, GetBlockBodiesMsg, reqID, cost, hashes)
+ if err := expectResponse(server.tPeer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil {
t.Errorf("test %d: bodies mismatch: %v", i, err)
}
}
@@ -257,10 +255,9 @@ func TestGetCodeLes2(t *testing.T) { testGetCode(t, 2) }
func testGetCode(t *testing.T, protocol int) {
// Assemble the test environment
- pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, ethdb.NewMemDatabase())
- bc := pm.blockchain.(*core.BlockChain)
- peer, _ := newTestPeer(t, "peer", protocol, pm, true)
- defer peer.close()
+ server, tearDown := newServerEnv(t, 4, protocol, nil)
+ defer tearDown()
+ bc := server.pm.blockchain.(*core.BlockChain)
var codereqs []*CodeReq
var codes [][]byte
@@ -277,9 +274,9 @@ func testGetCode(t *testing.T, protocol int) {
}
}
- cost := peer.GetRequestCost(GetCodeMsg, len(codereqs))
- sendRequest(peer.app, GetCodeMsg, 42, cost, codereqs)
- if err := expectResponse(peer.app, CodeMsg, 42, testBufLimit, codes); err != nil {
+ cost := server.tPeer.GetRequestCost(GetCodeMsg, len(codereqs))
+ sendRequest(server.tPeer.app, GetCodeMsg, 42, cost, codereqs)
+ if err := expectResponse(server.tPeer.app, CodeMsg, 42, testBufLimit, codes); err != nil {
t.Errorf("codes mismatch: %v", err)
}
}
@@ -290,11 +287,9 @@ func TestGetReceiptLes2(t *testing.T) { testGetReceipt(t, 2) }
func testGetReceipt(t *testing.T, protocol int) {
// Assemble the test environment
- db := ethdb.NewMemDatabase()
- pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
- bc := pm.blockchain.(*core.BlockChain)
- peer, _ := newTestPeer(t, "peer", protocol, pm, true)
- defer peer.close()
+ server, tearDown := newServerEnv(t, 4, protocol, nil)
+ defer tearDown()
+ bc := server.pm.blockchain.(*core.BlockChain)
// Collect the hashes to request, and the response to expect
hashes, receipts := []common.Hash{}, []types.Receipts{}
@@ -302,12 +297,12 @@ func testGetReceipt(t *testing.T, protocol int) {
block := bc.GetBlockByNumber(i)
hashes = append(hashes, block.Hash())
- receipts = append(receipts, rawdb.ReadReceipts(db, block.Hash(), block.NumberU64()))
+ receipts = append(receipts, rawdb.ReadReceipts(server.db, block.Hash(), block.NumberU64()))
}
// Send the hash request and verify the response
- cost := peer.GetRequestCost(GetReceiptsMsg, len(hashes))
- sendRequest(peer.app, GetReceiptsMsg, 42, cost, hashes)
- if err := expectResponse(peer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil {
+ cost := server.tPeer.GetRequestCost(GetReceiptsMsg, len(hashes))
+ sendRequest(server.tPeer.app, GetReceiptsMsg, 42, cost, hashes)
+ if err := expectResponse(server.tPeer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
}
@@ -318,11 +313,9 @@ func TestGetProofsLes2(t *testing.T) { testGetProofs(t, 2) }
func testGetProofs(t *testing.T, protocol int) {
// Assemble the test environment
- db := ethdb.NewMemDatabase()
- pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
- bc := pm.blockchain.(*core.BlockChain)
- peer, _ := newTestPeer(t, "peer", protocol, pm, true)
- defer peer.close()
+ server, tearDown := newServerEnv(t, 4, protocol, nil)
+ defer tearDown()
+ bc := server.pm.blockchain.(*core.BlockChain)
var (
proofreqs []ProofReq
@@ -334,7 +327,7 @@ func testGetProofs(t *testing.T, protocol int) {
for i := uint64(0); i <= bc.CurrentBlock().NumberU64(); i++ {
header := bc.GetHeaderByNumber(i)
root := header.Root
- trie, _ := trie.New(root, trie.NewDatabase(db))
+ trie, _ := trie.New(root, trie.NewDatabase(server.db))
for _, acc := range accounts {
req := ProofReq{
@@ -356,15 +349,15 @@ func testGetProofs(t *testing.T, protocol int) {
// Send the proof request and verify the response
switch protocol {
case 1:
- cost := peer.GetRequestCost(GetProofsV1Msg, len(proofreqs))
- sendRequest(peer.app, GetProofsV1Msg, 42, cost, proofreqs)
- if err := expectResponse(peer.app, ProofsV1Msg, 42, testBufLimit, proofsV1); err != nil {
+ cost := server.tPeer.GetRequestCost(GetProofsV1Msg, len(proofreqs))
+ sendRequest(server.tPeer.app, GetProofsV1Msg, 42, cost, proofreqs)
+ if err := expectResponse(server.tPeer.app, ProofsV1Msg, 42, testBufLimit, proofsV1); err != nil {
t.Errorf("proofs mismatch: %v", err)
}
case 2:
- cost := peer.GetRequestCost(GetProofsV2Msg, len(proofreqs))
- sendRequest(peer.app, GetProofsV2Msg, 42, cost, proofreqs)
- if err := expectResponse(peer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil {
+ cost := server.tPeer.GetRequestCost(GetProofsV2Msg, len(proofreqs))
+ sendRequest(server.tPeer.app, GetProofsV2Msg, 42, cost, proofreqs)
+ if err := expectResponse(server.tPeer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil {
t.Errorf("proofs mismatch: %v", err)
}
}
@@ -375,28 +368,33 @@ func TestGetCHTProofsLes1(t *testing.T) { testGetCHTProofs(t, 1) }
func TestGetCHTProofsLes2(t *testing.T) { testGetCHTProofs(t, 2) }
func testGetCHTProofs(t *testing.T, protocol int) {
- // Figure out the client's CHT frequency
- frequency := uint64(light.CHTFrequencyClient)
- if protocol == 1 {
- frequency = uint64(light.CHTFrequencyServer)
+ config := light.TestServerIndexerConfig
+ frequency := config.ChtSize
+ if protocol == 2 {
+ frequency = config.PairChtSize
}
- // Assemble the test environment
- db := ethdb.NewMemDatabase()
- pm := newTestProtocolManagerMust(t, false, int(frequency)+light.HelperTrieProcessConfirmations, testChainGen, nil, nil, db)
- bc := pm.blockchain.(*core.BlockChain)
- peer, _ := newTestPeer(t, "peer", protocol, pm, true)
- defer peer.close()
- // Wait a while for the CHT indexer to process the new headers
- time.Sleep(100 * time.Millisecond * time.Duration(frequency/light.CHTFrequencyServer)) // Chain indexer throttling
- time.Sleep(250 * time.Millisecond) // CI tester slack
+ waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
+ expectSections := frequency / config.ChtSize
+ for {
+ cs, _, _ := cIndexer.Sections()
+ bs, _, _ := bIndexer.Sections()
+ if cs >= expectSections && bs >= expectSections {
+ break
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+ server, tearDown := newServerEnv(t, int(frequency+config.ChtConfirms), protocol, waitIndexers)
+ defer tearDown()
+ bc := server.pm.blockchain.(*core.BlockChain)
// Assemble the proofs from the different protocols
- header := bc.GetHeaderByNumber(frequency)
+ header := bc.GetHeaderByNumber(frequency - 1)
rlp, _ := rlp.EncodeToBytes(header)
key := make([]byte, 8)
- binary.BigEndian.PutUint64(key, frequency)
+ binary.BigEndian.PutUint64(key, frequency-1)
proofsV1 := []ChtResp{{
Header: header,
@@ -406,41 +404,41 @@ func testGetCHTProofs(t *testing.T, protocol int) {
}
switch protocol {
case 1:
- root := light.GetChtRoot(db, 0, bc.GetHeaderByNumber(frequency-1).Hash())
- trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.ChtTablePrefix)))
+ root := light.GetChtRoot(server.db, 0, bc.GetHeaderByNumber(frequency-1).Hash())
+ trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.ChtTablePrefix)))
var proof light.NodeList
trie.Prove(key, 0, &proof)
proofsV1[0].Proof = proof
case 2:
- root := light.GetChtV2Root(db, 0, bc.GetHeaderByNumber(frequency-1).Hash())
- trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.ChtTablePrefix)))
+ root := light.GetChtRoot(server.db, (frequency/config.ChtSize)-1, bc.GetHeaderByNumber(frequency-1).Hash())
+ trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.ChtTablePrefix)))
trie.Prove(key, 0, &proofsV2.Proofs)
}
// Assemble the requests for the different protocols
requestsV1 := []ChtReq{{
- ChtNum: 1,
- BlockNum: frequency,
+ ChtNum: frequency / config.ChtSize,
+ BlockNum: frequency - 1,
}}
requestsV2 := []HelperTrieReq{{
Type: htCanonical,
- TrieIdx: 0,
+ TrieIdx: frequency/config.PairChtSize - 1,
Key: key,
AuxReq: auxHeader,
}}
// Send the proof request and verify the response
switch protocol {
case 1:
- cost := peer.GetRequestCost(GetHeaderProofsMsg, len(requestsV1))
- sendRequest(peer.app, GetHeaderProofsMsg, 42, cost, requestsV1)
- if err := expectResponse(peer.app, HeaderProofsMsg, 42, testBufLimit, proofsV1); err != nil {
+ cost := server.tPeer.GetRequestCost(GetHeaderProofsMsg, len(requestsV1))
+ sendRequest(server.tPeer.app, GetHeaderProofsMsg, 42, cost, requestsV1)
+ if err := expectResponse(server.tPeer.app, HeaderProofsMsg, 42, testBufLimit, proofsV1); err != nil {
t.Errorf("proofs mismatch: %v", err)
}
case 2:
- cost := peer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2))
- sendRequest(peer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2)
- if err := expectResponse(peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil {
+ cost := server.tPeer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2))
+ sendRequest(server.tPeer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2)
+ if err := expectResponse(server.tPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil {
t.Errorf("proofs mismatch: %v", err)
}
}
@@ -448,24 +446,31 @@ func testGetCHTProofs(t *testing.T, protocol int) {
// Tests that bloombits proofs can be correctly retrieved.
func TestGetBloombitsProofs(t *testing.T) {
- // Assemble the test environment
- db := ethdb.NewMemDatabase()
- pm := newTestProtocolManagerMust(t, false, light.BloomTrieFrequency+256, testChainGen, nil, nil, db)
- bc := pm.blockchain.(*core.BlockChain)
- peer, _ := newTestPeer(t, "peer", 2, pm, true)
- defer peer.close()
-
- // Wait a while for the bloombits indexer to process the new headers
- time.Sleep(100 * time.Millisecond * time.Duration(light.BloomTrieFrequency/4096)) // Chain indexer throttling
- time.Sleep(250 * time.Millisecond) // CI tester slack
+ config := light.TestServerIndexerConfig
+
+ waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
+ for {
+ cs, _, _ := cIndexer.Sections()
+ bs, _, _ := bIndexer.Sections()
+ bts, _, _ := btIndexer.Sections()
+ if cs >= 8 && bs >= 8 && bts >= 1 {
+ break
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+ server, tearDown := newServerEnv(t, int(config.BloomTrieSize+config.BloomTrieConfirms), 2, waitIndexers)
+ defer tearDown()
+ bc := server.pm.blockchain.(*core.BlockChain)
// Request and verify each bit of the bloom bits proofs
for bit := 0; bit < 2048; bit++ {
- // Assemble therequest and proofs for the bloombits
+ // Assemble the request and proofs for the bloombits
key := make([]byte, 10)
binary.BigEndian.PutUint16(key[:2], uint16(bit))
- binary.BigEndian.PutUint64(key[2:], uint64(light.BloomTrieFrequency))
+ // Only the first bloom section has data.
+ binary.BigEndian.PutUint64(key[2:], 0)
requests := []HelperTrieReq{{
Type: htBloomBits,
@@ -474,14 +479,14 @@ func TestGetBloombitsProofs(t *testing.T) {
}}
var proofs HelperTrieResps
- root := light.GetBloomTrieRoot(db, 0, bc.GetHeaderByNumber(light.BloomTrieFrequency-1).Hash())
- trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.BloomTrieTablePrefix)))
+ root := light.GetBloomTrieRoot(server.db, 0, bc.GetHeaderByNumber(config.BloomTrieSize-1).Hash())
+ trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.BloomTrieTablePrefix)))
trie.Prove(key, 0, &proofs.Proofs)
// Send the proof request and verify the response
- cost := peer.GetRequestCost(GetHelperTrieProofsMsg, len(requests))
- sendRequest(peer.app, GetHelperTrieProofsMsg, 42, cost, requests)
- if err := expectResponse(peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil {
+ cost := server.tPeer.GetRequestCost(GetHelperTrieProofsMsg, len(requests))
+ sendRequest(server.tPeer.app, GetHelperTrieProofsMsg, 42, cost, requests)
+ if err := expectResponse(server.tPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil {
t.Errorf("bit %d: proofs mismatch: %v", bit, err)
}
}
diff --git a/les/helper_test.go b/les/helper_test.go
index 8817c20c7..206ee2d92 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -24,6 +24,7 @@ import (
"math/big"
"sync"
"testing"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
@@ -123,6 +124,15 @@ func testChainGen(i int, block *core.BlockGen) {
}
}
+// testIndexers creates a set of indexers with specified params for testing purpose.
+func testIndexers(db ethdb.Database, odr light.OdrBackend, iConfig *light.IndexerConfig) (*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer) {
+ chtIndexer := light.NewChtIndexer(db, odr, iConfig.ChtSize, iConfig.ChtConfirms)
+ bloomIndexer := eth.NewBloomIndexer(db, iConfig.BloomSize, iConfig.BloomConfirms)
+ bloomTrieIndexer := light.NewBloomTrieIndexer(db, odr, iConfig.BloomSize, iConfig.BloomTrieSize)
+ bloomIndexer.AddChildIndexer(bloomTrieIndexer)
+ return chtIndexer, bloomIndexer, bloomTrieIndexer
+}
+
func testRCL() RequestCostList {
cl := make(RequestCostList, len(reqList))
for i, code := range reqList {
@@ -134,9 +144,9 @@ func testRCL() RequestCostList {
}
// newTestProtocolManager creates a new protocol manager for testing purposes,
-// with the given number of blocks already known, and potential notification
-// channels for different events.
-func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) (*ProtocolManager, error) {
+// with the given number of blocks already known, potential notification
+// channels for different events and relative chain indexers array.
+func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) (*ProtocolManager, error) {
var (
evmux = new(event.TypeMux)
engine = ethash.NewFaker()
@@ -155,16 +165,6 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
chain, _ = light.NewLightChain(odr, gspec.Config, engine)
} else {
blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
-
- chtIndexer := light.NewChtIndexer(db, false, nil)
- chtIndexer.Start(blockchain)
-
- bbtIndexer := light.NewBloomTrieIndexer(db, false, nil)
-
- bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks, light.HelperTrieProcessConfirmations)
- bloomIndexer.AddChildIndexer(bbtIndexer)
- bloomIndexer.Start(blockchain)
-
gchain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator)
if _, err := blockchain.InsertChain(gchain); err != nil {
panic(err)
@@ -172,7 +172,11 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
chain = blockchain
}
- pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup))
+ indexConfig := light.TestServerIndexerConfig
+ if lightSync {
+ indexConfig = light.TestClientIndexerConfig
+ }
+ pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup))
if err != nil {
return nil, err
}
@@ -193,11 +197,11 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
}
// newTestProtocolManagerMust creates a new protocol manager for testing purposes,
-// with the given number of blocks already known, and potential notification
-// channels for different events. In case of an error, the constructor force-
+// with the given number of blocks already known, potential notification
+// channels for different events and relative chain indexers array. In case of an error, the constructor force-
// fails the test.
-func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) *ProtocolManager {
- pm, err := newTestProtocolManager(lightSync, blocks, generator, peers, odr, db)
+func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) *ProtocolManager {
+ pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db)
if err != nil {
t.Fatalf("Failed to create protocol manager: %v", err)
}
@@ -320,3 +324,122 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
func (p *testPeer) close() {
p.app.Close()
}
+
+// TestEntity represents a network entity for testing with necessary auxiliary fields.
+type TestEntity struct {
+ db ethdb.Database
+ rPeer *peer
+ tPeer *testPeer
+ peers *peerSet
+ pm *ProtocolManager
+ // Indexers
+ chtIndexer *core.ChainIndexer
+ bloomIndexer *core.ChainIndexer
+ bloomTrieIndexer *core.ChainIndexer
+}
+
+// newServerEnv creates a server testing environment with a connected test peer for testing purpose.
+func newServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer)) (*TestEntity, func()) {
+ db := ethdb.NewMemDatabase()
+ cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig)
+
+ pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, nil, db)
+ peer, _ := newTestPeer(t, "peer", protocol, pm, true)
+
+ cIndexer.Start(pm.blockchain.(*core.BlockChain))
+ bIndexer.Start(pm.blockchain.(*core.BlockChain))
+
+ // Wait until indexers generate enough index data.
+ if waitIndexers != nil {
+ waitIndexers(cIndexer, bIndexer, btIndexer)
+ }
+
+ return &TestEntity{
+ db: db,
+ tPeer: peer,
+ pm: pm,
+ chtIndexer: cIndexer,
+ bloomIndexer: bIndexer,
+ bloomTrieIndexer: btIndexer,
+ }, func() {
+ peer.close()
+ // Note bloom trie indexer will be closed by it parent recursively.
+ cIndexer.Close()
+ bIndexer.Close()
+ }
+}
+
+// newClientServerEnv creates a client/server arch environment with a connected les server and light client pair
+// for testing purpose.
+func newClientServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer), newPeer bool) (*TestEntity, *TestEntity, func()) {
+ db, ldb := ethdb.NewMemDatabase(), ethdb.NewMemDatabase()
+ peers, lPeers := newPeerSet(), newPeerSet()
+
+ dist := newRequestDistributor(lPeers, make(chan struct{}))
+ rm := newRetrieveManager(lPeers, dist, nil)
+ odr := NewLesOdr(ldb, light.TestClientIndexerConfig, rm)
+
+ cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig)
+ lcIndexer, lbIndexer, lbtIndexer := testIndexers(ldb, odr, light.TestClientIndexerConfig)
+ odr.SetIndexers(lcIndexer, lbtIndexer, lbIndexer)
+
+ pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, peers, db)
+ lpm := newTestProtocolManagerMust(t, true, 0, nil, odr, lPeers, ldb)
+
+ startIndexers := func(clientMode bool, pm *ProtocolManager) {
+ if clientMode {
+ lcIndexer.Start(pm.blockchain.(*light.LightChain))
+ lbIndexer.Start(pm.blockchain.(*light.LightChain))
+ } else {
+ cIndexer.Start(pm.blockchain.(*core.BlockChain))
+ bIndexer.Start(pm.blockchain.(*core.BlockChain))
+ }
+ }
+
+ startIndexers(false, pm)
+ startIndexers(true, lpm)
+
+ // Execute wait until function if it is specified.
+ if waitIndexers != nil {
+ waitIndexers(cIndexer, bIndexer, btIndexer)
+ }
+
+ var (
+ peer, lPeer *peer
+ err1, err2 <-chan error
+ )
+ if newPeer {
+ peer, err1, lPeer, err2 = newTestPeerPair("peer", protocol, pm, lpm)
+ select {
+ case <-time.After(time.Millisecond * 100):
+ case err := <-err1:
+ t.Fatalf("peer 1 handshake error: %v", err)
+ case err := <-err2:
+ t.Fatalf("peer 2 handshake error: %v", err)
+ }
+ }
+
+ return &TestEntity{
+ db: db,
+ pm: pm,
+ rPeer: peer,
+ peers: peers,
+ chtIndexer: cIndexer,
+ bloomIndexer: bIndexer,
+ bloomTrieIndexer: btIndexer,
+ }, &TestEntity{
+ db: ldb,
+ pm: lpm,
+ rPeer: lPeer,
+ peers: lPeers,
+ chtIndexer: lcIndexer,
+ bloomIndexer: lbIndexer,
+ bloomTrieIndexer: lbtIndexer,
+ }, func() {
+ // Note bloom trie indexers will be closed by their parents recursively.
+ cIndexer.Close()
+ bIndexer.Close()
+ lcIndexer.Close()
+ lbIndexer.Close()
+ }
+}
diff --git a/les/odr.go b/les/odr.go
index 2ad28d5d9..9def05a67 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -28,16 +28,18 @@ import (
// LesOdr implements light.OdrBackend
type LesOdr struct {
db ethdb.Database
+ indexerConfig *light.IndexerConfig
chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer
retriever *retrieveManager
stop chan struct{}
}
-func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr {
+func NewLesOdr(db ethdb.Database, config *light.IndexerConfig, retriever *retrieveManager) *LesOdr {
return &LesOdr{
- db: db,
- retriever: retriever,
- stop: make(chan struct{}),
+ db: db,
+ indexerConfig: config,
+ retriever: retriever,
+ stop: make(chan struct{}),
}
}
@@ -73,6 +75,11 @@ func (odr *LesOdr) BloomIndexer() *core.ChainIndexer {
return odr.bloomIndexer
}
+// IndexerConfig returns the indexer config.
+func (odr *LesOdr) IndexerConfig() *light.IndexerConfig {
+ return odr.indexerConfig
+}
+
const (
MsgBlockBodies = iota
MsgCode
diff --git a/les/odr_requests.go b/les/odr_requests.go
index 075fcd92c..9e9b2673f 100644
--- a/les/odr_requests.go
+++ b/les/odr_requests.go
@@ -365,7 +365,7 @@ func (r *ChtRequest) CanSend(peer *peer) bool {
peer.lock.RLock()
defer peer.lock.RUnlock()
- return peer.headInfo.Number >= light.HelperTrieConfirmations && r.ChtNum <= (peer.headInfo.Number-light.HelperTrieConfirmations)/light.CHTFrequencyClient
+ return peer.headInfo.Number >= r.Config.ChtConfirms && r.ChtNum <= (peer.headInfo.Number-r.Config.ChtConfirms)/r.Config.ChtSize
}
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
@@ -379,7 +379,21 @@ func (r *ChtRequest) Request(reqID uint64, peer *peer) error {
Key: encNum[:],
AuxReq: auxHeader,
}
- return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []HelperTrieReq{req})
+ switch peer.version {
+ case lpv1:
+ var reqsV1 ChtReq
+ if req.Type != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 {
+ return fmt.Errorf("Request invalid in LES/1 mode")
+ }
+ blockNum := binary.BigEndian.Uint64(req.Key)
+ // convert HelperTrie request to old CHT request
+ reqsV1 = ChtReq{ChtNum: (req.TrieIdx + 1) * (r.Config.ChtSize / r.Config.PairChtSize), BlockNum: blockNum, FromLevel: req.FromLevel}
+ return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []ChtReq{reqsV1})
+ case lpv2:
+ return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []HelperTrieReq{req})
+ default:
+ panic(nil)
+ }
}
// Valid processes an ODR request reply message from the LES network
@@ -484,7 +498,7 @@ func (r *BloomRequest) CanSend(peer *peer) bool {
if peer.version < lpv2 {
return false
}
- return peer.headInfo.Number >= light.HelperTrieConfirmations && r.BloomTrieNum <= (peer.headInfo.Number-light.HelperTrieConfirmations)/light.BloomTrieFrequency
+ return peer.headInfo.Number >= r.Config.BloomTrieConfirms && r.BloomTrieNum <= (peer.headInfo.Number-r.Config.BloomTrieConfirms)/r.Config.BloomTrieSize
}
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
diff --git a/les/odr_test.go b/les/odr_test.go
index c7c25cbe4..e6458adf5 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
- "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/params"
@@ -160,36 +159,21 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai
return res
}
+// testOdr tests odr requests whose validation guaranteed by block headers.
func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
// Assemble the test environment
- peers := newPeerSet()
- dist := newRequestDistributor(peers, make(chan struct{}))
- rm := newRetrieveManager(peers, dist, nil)
- db := ethdb.NewMemDatabase()
- ldb := ethdb.NewMemDatabase()
- odr := NewLesOdr(ldb, rm)
- odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations))
- pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
- lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
- _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
- select {
- case <-time.After(time.Millisecond * 100):
- case err := <-err1:
- t.Fatalf("peer 1 handshake error: %v", err)
- case err := <-err2:
- t.Fatalf("peer 1 handshake error: %v", err)
- }
-
- lpm.synchronise(lpeer)
+ server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true)
+ defer tearDown()
+ client.pm.synchronise(client.rPeer)
test := func(expFail uint64) {
- for i := uint64(0); i <= pm.blockchain.CurrentHeader().Number.Uint64(); i++ {
- bhash := rawdb.ReadCanonicalHash(db, i)
- b1 := fn(light.NoOdr, db, pm.chainConfig, pm.blockchain.(*core.BlockChain), nil, bhash)
+ for i := uint64(0); i <= server.pm.blockchain.CurrentHeader().Number.Uint64(); i++ {
+ bhash := rawdb.ReadCanonicalHash(server.db, i)
+ b1 := fn(light.NoOdr, server.db, server.pm.chainConfig, server.pm.blockchain.(*core.BlockChain), nil, bhash)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
- b2 := fn(ctx, ldb, lpm.chainConfig, nil, lpm.blockchain.(*light.LightChain), bhash)
+ b2 := fn(ctx, client.db, client.pm.chainConfig, nil, client.pm.blockchain.(*light.LightChain), bhash)
eq := bytes.Equal(b1, b2)
exp := i < expFail
@@ -201,21 +185,20 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
}
}
}
-
// temporarily remove peer to test odr fails
// expect retrievals to fail (except genesis block) without a les peer
- peers.Unregister(lpeer.id)
+ client.peers.Unregister(client.rPeer.id)
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
test(expFail)
// expect all retrievals to pass
- peers.Register(lpeer)
+ client.peers.Register(client.rPeer)
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
- lpeer.lock.Lock()
- lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
- lpeer.lock.Unlock()
+ client.peers.lock.Lock()
+ client.rPeer.hasBlock = func(common.Hash, uint64) bool { return true }
+ client.peers.lock.Unlock()
test(5)
// still expect all retrievals to pass, now data should be cached locally
- peers.Unregister(lpeer.id)
+ client.peers.Unregister(client.rPeer.id)
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
test(5)
}
diff --git a/les/peer.go b/les/peer.go
index eb7452e27..70c863c2f 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -19,7 +19,6 @@ package les
import (
"crypto/ecdsa"
- "encoding/binary"
"errors"
"fmt"
"math/big"
@@ -36,9 +35,10 @@ import (
)
var (
- errClosed = errors.New("peer set is closed")
- errAlreadyRegistered = errors.New("peer is already registered")
- errNotRegistered = errors.New("peer is not registered")
+ errClosed = errors.New("peer set is closed")
+ errAlreadyRegistered = errors.New("peer is already registered")
+ errNotRegistered = errors.New("peer is not registered")
+ errInvalidHelpTrieReq = errors.New("invalid help trie request")
)
const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
@@ -284,21 +284,21 @@ func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error {
}
// RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node.
-func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error {
- p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
+func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, data interface{}) error {
switch p.version {
case lpv1:
- reqsV1 := make([]ChtReq, len(reqs))
- for i, req := range reqs {
- if req.Type != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 {
- return fmt.Errorf("Request invalid in LES/1 mode")
- }
- blockNum := binary.BigEndian.Uint64(req.Key)
- // convert HelperTrie request to old CHT request
- reqsV1[i] = ChtReq{ChtNum: (req.TrieIdx + 1) * (light.CHTFrequencyClient / light.CHTFrequencyServer), BlockNum: blockNum, FromLevel: req.FromLevel}
+ reqs, ok := data.([]ChtReq)
+ if !ok {
+ return errInvalidHelpTrieReq
}
- return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqsV1)
+ p.Log().Debug("Fetching batch of header proofs", "count", len(reqs))
+ return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs)
case lpv2:
+ reqs, ok := data.([]HelperTrieReq)
+ if !ok {
+ return errInvalidHelpTrieReq
+ }
+ p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs)
default:
panic(nil)
diff --git a/les/request_test.go b/les/request_test.go
index db576798b..f02c2a3d7 100644
--- a/les/request_test.go
+++ b/les/request_test.go
@@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/light"
)
@@ -84,35 +83,17 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, num uint64) light.OdrReq
func testAccess(t *testing.T, protocol int, fn accessTestFn) {
// Assemble the test environment
- peers := newPeerSet()
- dist := newRequestDistributor(peers, make(chan struct{}))
- rm := newRetrieveManager(peers, dist, nil)
- db := ethdb.NewMemDatabase()
- ldb := ethdb.NewMemDatabase()
- odr := NewLesOdr(ldb, rm)
- odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations))
-
- pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
- lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
- _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
- select {
- case <-time.After(time.Millisecond * 100):
- case err := <-err1:
- t.Fatalf("peer 1 handshake error: %v", err)
- case err := <-err2:
- t.Fatalf("peer 1 handshake error: %v", err)
- }
-
- lpm.synchronise(lpeer)
+ server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true)
+ defer tearDown()
+ client.pm.synchronise(client.rPeer)
test := func(expFail uint64) {
- for i := uint64(0); i <= pm.blockchain.CurrentHeader().Number.Uint64(); i++ {
- bhash := rawdb.ReadCanonicalHash(db, i)
- if req := fn(ldb, bhash, i); req != nil {
+ for i := uint64(0); i <= server.pm.blockchain.CurrentHeader().Number.Uint64(); i++ {
+ bhash := rawdb.ReadCanonicalHash(server.db, i)
+ if req := fn(client.db, bhash, i); req != nil {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
-
- err := odr.Retrieve(ctx, req)
+ err := client.pm.odr.Retrieve(ctx, req)
got := err == nil
exp := i < expFail
if exp && !got {
@@ -126,16 +107,16 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
}
// temporarily remove peer to test odr fails
- peers.Unregister(lpeer.id)
+ client.peers.Unregister(client.rPeer.id)
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
// expect retrievals to fail (except genesis block) without a les peer
test(0)
- peers.Register(lpeer)
+ client.peers.Register(client.rPeer)
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
- lpeer.lock.Lock()
- lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
- lpeer.lock.Unlock()
+ client.rPeer.lock.Lock()
+ client.rPeer.hasBlock = func(common.Hash, uint64) bool { return true }
+ client.rPeer.lock.Unlock()
// expect all retrievals to pass
test(5)
}
diff --git a/les/server.go b/les/server.go
index df98d1e3a..2fa0456d6 100644
--- a/les/server.go
+++ b/les/server.go
@@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
+ "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -50,7 +51,7 @@ type LesServer struct {
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
quitSync := make(chan struct{})
- pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup))
+ pm, err := NewProtocolManager(eth.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup))
if err != nil {
return nil, err
}
@@ -64,8 +65,9 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
lesCommons: lesCommons{
config: config,
chainDb: eth.ChainDb(),
- chtIndexer: light.NewChtIndexer(eth.ChainDb(), false, nil),
- bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false, nil),
+ iConfig: light.DefaultServerIndexerConfig,
+ chtIndexer: light.NewChtIndexer(eth.ChainDb(), nil, params.CHTFrequencyServer, params.HelperTrieProcessConfirmations),
+ bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency),
protocolManager: pm,
},
quitSync: quitSync,
@@ -75,14 +77,14 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
logger := log.New()
chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility
- chtV2SectionCount := chtV1SectionCount / (light.CHTFrequencyClient / light.CHTFrequencyServer)
+ chtV2SectionCount := chtV1SectionCount / (params.CHTFrequencyClient / params.CHTFrequencyServer)
if chtV2SectionCount != 0 {
// convert to LES/2 section
chtLastSection := chtV2SectionCount - 1
// convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead
- chtLastSectionV1 := (chtLastSection+1)*(light.CHTFrequencyClient/light.CHTFrequencyServer) - 1
+ chtLastSectionV1 := (chtLastSection+1)*(params.CHTFrequencyClient/params.CHTFrequencyServer) - 1
chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1)
- chtRoot := light.GetChtV2Root(pm.chainDb, chtLastSection, chtSectionHead)
+ chtRoot := light.GetChtRoot(pm.chainDb, chtLastSectionV1, chtSectionHead)
logger.Info("Loaded CHT", "section", chtLastSection, "head", chtSectionHead, "root", chtRoot)
}
bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections()