aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
Diffstat (limited to 'dex')
-rw-r--r--dex/handler.go216
-rw-r--r--dex/handler_test.go445
-rw-r--r--dex/helper_test.go78
-rw-r--r--dex/network.go17
-rw-r--r--dex/nodetable.go79
-rw-r--r--dex/nodetable_test.go93
-rw-r--r--dex/notaryset.go203
-rw-r--r--dex/peer.go296
-rw-r--r--dex/peer_test.go628
-rw-r--r--dex/protocol.go42
-rw-r--r--dex/protocol_test.go82
-rw-r--r--dex/sync.go95
12 files changed, 1825 insertions, 449 deletions
diff --git a/dex/handler.go b/dex/handler.go
index bc932fb28..5348b06f2 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -49,6 +49,8 @@ const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
+
+ metaChanSize = 10240
)
var (
@@ -59,11 +61,6 @@ var (
// not compatible (low protocol version restrictions and high requirements).
var errIncompatibleConfig = errors.New("incompatible configuration")
-type newNotarySetEvent struct {
- Round uint64
- Set map[string]struct{}
-}
-
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
@@ -75,32 +72,35 @@ type ProtocolManager struct {
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
+ nodeTable *nodeTable
+ gov governance
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
- downloader *downloader.Downloader
- fetcher *fetcher.Fetcher
- peers *peerSet
- notarySetManager *notarySetManager
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
+ metasCh chan newMetasEvent
+ metasSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
+ metasyncCh chan *metasync
quitSync chan struct{}
noMorePeers chan struct{}
- // channels for notarySetLoop
- newNotarySetCh chan newNotarySetEvent
- newRoundCh chan uint64
- newNotaryNodeInfoCh chan *notaryNodeInfo
+ // channels for peerSetLoop
+ crsCh chan core.NewCRSEvent
+ crsSub event.Subscription
srvr p2pServer
@@ -111,24 +111,26 @@ type ProtocolManager struct {
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
-func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+func NewProtocolManager(
+ config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
+ mux *event.TypeMux, txpool txPool, engine consensus.Engine,
+ blockchain *core.BlockChain, chaindb ethdb.Database,
+ gov governance) (*ProtocolManager, error) {
+ tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- blockchain: blockchain,
- chainconfig: config,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
- newNotarySetCh: make(chan newNotarySetEvent),
- newRoundCh: make(chan uint64),
- newNotaryNodeInfoCh: make(chan *notaryNodeInfo),
- }
- manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh)
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ nodeTable: tab,
+ gov: gov,
+ blockchain: blockchain,
+ chainconfig: config,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
+ }
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
@@ -216,22 +218,33 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.maxPeers = maxPeers
pm.srvr = srvr
+ pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable)
+
+ // if our self in node set build the node info
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
+ // broadcast node metas
+ pm.metasCh = make(chan newMetasEvent, metaChanSize)
+ pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
+ go pm.metaBroadcastLoop()
+
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
- // run the notary set loop
- go pm.notarySetLoop()
+ // run the peer set loop
+ pm.crsCh = make(chan core.NewCRSEvent)
+ pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh)
+ go pm.peerSetLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
+ go pm.metasyncLoop()
}
func (pm *ProtocolManager) Stop() {
@@ -301,6 +314,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
+ pm.syncNodeMetas(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
@@ -699,18 +713,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.txpool.AddRemotes(txs)
- case msg.Code == NotaryNodeInfoMsg:
- var info notaryNodeInfo
- if err := msg.Decode(&info); err != nil {
+ case msg.Code == MetaMsg:
+ var metas []*NodeMeta
+ if err := msg.Decode(&metas); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
-
- // TODO(sonic): validate signature
- p.MarkNotaryNodeInfo(info.Hash())
- // Broadcast to peers
- pm.BroadcastNotaryNodeInfo(&info)
-
- pm.notarySetManager.TryAddInfo(&info)
+ for i, meta := range metas {
+ if meta == nil {
+ return errResp(ErrDecode, "node meta %d is nil", i)
+ }
+ p.MarkNodeMeta(meta.Hash())
+ }
+ pm.nodeTable.Add(metas)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -769,11 +783,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
-// BroadcastNotaryNodeInfo will propagate notary node info to its peers.
-func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) {
- peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash())
- for _, peer := range peers {
- peer.AsyncSendNotaryNodeInfo(info)
+// BroadcastMetas will propagate node metas to its peers.
+func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
+ var metaset = make(map[*peer][]*NodeMeta)
+
+ for _, meta := range metas {
+ peers := pm.peers.PeersWithoutNodeMeta(meta.Hash())
+ for _, peer := range peers {
+ metaset[peer] = append(metaset[peer], meta)
+ }
+ log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers))
+ }
+
+ for peer, metas := range metaset {
+ peer.AsyncSendNodeMetas(metas)
}
}
@@ -801,97 +824,32 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
-// a loop keep building and maintaining peers in notary set.
-func (pm *ProtocolManager) notarySetLoop() {
- // TODO:
- // * pm need to know current round, and whether we are in notary set.
- // * (done) able to handle node info in the near future round.
- // * check peer limit.
- // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?)
-
- // If we are in notary set and we are synced with network.
- // events:
- // * new notary set is determined, and we are in notary set.
- // (TBD: subscribe the event or polling govornance)
- // - advance round
- // - build new notary set
- // - remove old notary set, remove old notary set from static
-
- // * current round node info changed.
-
- self := pm.srvr.Self()
- var round uint64
-
+func (pm *ProtocolManager) metaBroadcastLoop() {
for {
select {
- case event := <-pm.newNotarySetCh:
- // new notary set is determined.
- if _, ok := event.Set[self.ID.String()]; ok {
- // initialize the new notary set and add it to pm.notarySetManager
- s := newNotarySet(event.Round, event.Set)
- pm.notarySetManager.Register(event.Round, s)
-
- // TODO: handle signature
- pm.BroadcastNotaryNodeInfo(&notaryNodeInfo{
- ID: self.ID,
- IP: self.IP,
- UDP: self.UDP,
- TCP: self.TCP,
- Round: event.Round,
- Timestamp: time.Now().Unix(),
- })
- }
-
- case r := <-pm.newRoundCh:
- // move to new round.
- round = r
+ case event := <-pm.metasCh:
+ pm.BroadcastMetas(event.Metas)
- // TODO: revisit this to make sure how many previous round's
- // notary set we need to keep.
-
- // rmove notary set before current round, they are useless
- for _, s := range pm.notarySetManager.Before(round) {
- pm.removeNotarySetFromStatic(s)
- }
-
- // prepare notary set connections for new round.
- if pm.isInNotarySet(round + 1) {
- // we assume the notary set for round + 1 is added to
- // pm.notarySetManager before this step.
- if n, ok := pm.notarySetManager.Round(round + 1); ok {
- pm.addNotarySetToStatic(n)
- }
- }
-
- case <-pm.newNotaryNodeInfoCh:
- // some node info in "current round" is updated.
- // try to connect them by the new info.
- if pm.isInNotarySet(round) {
- if n, ok := pm.notarySetManager.Round(round); ok {
- pm.addNotarySetToStatic(n)
- }
- }
-
- case <-pm.quitSync:
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.metasSub.Err():
return
}
}
}
-func (pm *ProtocolManager) isInNotarySet(r uint64) bool {
- return false
-}
-
-func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) {
- for _, node := range n.NodesToAdd() {
- pm.srvr.AddNotaryPeer(node)
- n.MarkAdded(node.ID.String())
- }
-}
-
-func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) {
- for _, node := range n.Nodes() {
- pm.srvr.RemoveNotaryPeer(node)
+// a loop keep building and maintaining peers in notary set.
+// TODO: finish this
+func (pm *ProtocolManager) peerSetLoop() {
+ for {
+ select {
+ case event := <-pm.crsCh:
+ pm.peers.BuildNotaryConn(event.Round)
+ pm.peers.BuildDKGConn(event.Round)
+ pm.peers.ForgetNotaryConn(event.Round - 1)
+ pm.peers.ForgetDKGConn(event.Round - 1)
+ case <-pm.quitSync:
+ return
+ }
}
}
diff --git a/dex/handler_test.go b/dex/handler_test.go
new file mode 100644
index 000000000..981362bb5
--- /dev/null
+++ b/dex/handler_test.go
@@ -0,0 +1,445 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package dex
+
+import (
+ "math"
+ "math/big"
+ "math/rand"
+ "testing"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core"
+ "github.com/dexon-foundation/dexon/core/state"
+ "github.com/dexon-foundation/dexon/core/types"
+ "github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/eth/downloader"
+ "github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/params"
+)
+
+// Tests that protocol versions and modes of operations are matched up properly.
+func TestProtocolCompatibility(t *testing.T) {
+ // Define the compatibility chart
+ tests := []struct {
+ version uint
+ mode downloader.SyncMode
+ compatible bool
+ }{
+ {61, downloader.FullSync, true}, {62, downloader.FullSync, true}, {63, downloader.FullSync, true},
+ {61, downloader.FastSync, true}, {62, downloader.FastSync, true}, {63, downloader.FastSync, true},
+ }
+ // Make sure anything we screw up is restored
+ backup := ProtocolVersions
+ defer func() { ProtocolVersions = backup }()
+
+ // Try all available compatibility configs and check for errors
+ for i, tt := range tests {
+ ProtocolVersions = []uint{tt.version}
+
+ pm, _, err := newTestProtocolManager(tt.mode, 0, nil, nil)
+ if pm != nil {
+ defer pm.Stop()
+ }
+ if (err == nil && !tt.compatible) || (err != nil && tt.compatible) {
+ t.Errorf("test %d: compatibility mismatch: have error %v, want compatibility %v", i, err, tt.compatible)
+ }
+ }
+}
+
+// Tests that block headers can be retrieved from a remote chain based on user queries.
+func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) }
+func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) }
+
+func testGetBlockHeaders(t *testing.T, protocol int) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxHashFetch+15, nil, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Create a "random" unknown hash for testing
+ var unknown common.Hash
+ for i := range unknown {
+ unknown[i] = byte(i)
+ }
+ // Create a batch of tests for various scenarios
+ limit := uint64(downloader.MaxHeaderFetch)
+ tests := []struct {
+ query *getBlockHeadersData // The query to execute for header retrieval
+ expect []common.Hash // The hashes of the block whose headers are expected
+ }{
+ // A single random block should be retrievable by hash and number too
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(limit / 2).Hash()}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(limit / 2).Hash()},
+ },
+ // Multiple headers should be retrievable in both directions
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 1).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 2).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 1).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 2).Hash(),
+ },
+ },
+ // Multiple headers with skip lists should be retrievable
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 4).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 + 8).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(limit / 2).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(limit/2 - 8).Hash(),
+ },
+ },
+ // The chain endpoints should be retrievable
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 0}, Amount: 1},
+ []common.Hash{pm.blockchain.GetBlockByNumber(0).Hash()},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64()}, Amount: 1},
+ []common.Hash{pm.blockchain.CurrentBlock().Hash()},
+ },
+ // Ensure protocol limits are honored
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true},
+ pm.blockchain.GetBlockHashesFromHash(pm.blockchain.CurrentBlock().Hash(), limit),
+ },
+ // Check that requesting more than available is handled gracefully
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 3, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64()).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 3, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(4).Hash(),
+ pm.blockchain.GetBlockByNumber(0).Hash(),
+ },
+ },
+ // Check that requesting more than available is handled gracefully, even if mid skip
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() - 4}, Skip: 2, Amount: 3},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 4).Hash(),
+ pm.blockchain.GetBlockByNumber(pm.blockchain.CurrentBlock().NumberU64() - 1).Hash(),
+ },
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 2, Amount: 3, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(4).Hash(),
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ },
+ },
+ // Check a corner case where requesting more can iterate past the endpoints
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: 2}, Amount: 5, Reverse: true},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(2).Hash(),
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ pm.blockchain.GetBlockByNumber(0).Hash(),
+ },
+ },
+ // Check a corner case where skipping overflow loops back into the chain start
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(3).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64 - 1},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(3).Hash(),
+ },
+ },
+ // Check a corner case where skipping overflow loops back to the same header
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: pm.blockchain.GetBlockByNumber(1).Hash()}, Amount: 2, Reverse: false, Skip: math.MaxUint64},
+ []common.Hash{
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ },
+ },
+ // Check that non existing headers aren't returned
+ {
+ &getBlockHeadersData{Origin: hashOrNumber{Hash: unknown}, Amount: 1},
+ []common.Hash{},
+ }, {
+ &getBlockHeadersData{Origin: hashOrNumber{Number: pm.blockchain.CurrentBlock().NumberU64() + 1}, Amount: 1},
+ []common.Hash{},
+ },
+ }
+ // Run each of the tests and verify the results against the chain
+ for i, tt := range tests {
+ // Collect the headers to expect in the response
+ headers := []*types.Header{}
+ for _, hash := range tt.expect {
+ headers = append(headers, pm.blockchain.GetBlockByHash(hash).Header())
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x03, tt.query)
+ if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil {
+ t.Errorf("test %d: headers mismatch: %v", i, err)
+ }
+ // If the test used number origins, repeat with hashes as the too
+ if tt.query.Origin.Hash == (common.Hash{}) {
+ if origin := pm.blockchain.GetBlockByNumber(tt.query.Origin.Number); origin != nil {
+ tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0
+
+ p2p.Send(peer.app, 0x03, tt.query)
+ if err := p2p.ExpectMsg(peer.app, 0x04, headers); err != nil {
+ t.Errorf("test %d: headers mismatch: %v", i, err)
+ }
+ }
+ }
+ }
+}
+
+// Tests that block contents can be retrieved from a remote chain based on their hashes.
+func TestGetBlockBodies62(t *testing.T) { testGetBlockBodies(t, 62) }
+func TestGetBlockBodies63(t *testing.T) { testGetBlockBodies(t, 63) }
+
+func testGetBlockBodies(t *testing.T, protocol int) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, downloader.MaxBlockFetch+15, nil, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Create a batch of tests for various scenarios
+ limit := downloader.MaxBlockFetch
+ tests := []struct {
+ random int // Number of blocks to fetch randomly from the chain
+ explicit []common.Hash // Explicitly requested blocks
+ available []bool // Availability of explicitly requested blocks
+ expected int // Total number of existing blocks to expect
+ }{
+ {1, nil, nil, 1}, // A single random block should be retrievable
+ {10, nil, nil, 10}, // Multiple random blocks should be retrievable
+ {limit, nil, nil, limit}, // The maximum possible blocks should be retrievable
+ {limit + 1, nil, nil, limit}, // No more than the possible block count should be returned
+ {0, []common.Hash{pm.blockchain.Genesis().Hash()}, []bool{true}, 1}, // The genesis block should be retrievable
+ {0, []common.Hash{pm.blockchain.CurrentBlock().Hash()}, []bool{true}, 1}, // The chains head block should be retrievable
+ {0, []common.Hash{{}}, []bool{false}, 0}, // A non existent block should not be returned
+
+ // Existing and non-existing blocks interleaved should not cause problems
+ {0, []common.Hash{
+ {},
+ pm.blockchain.GetBlockByNumber(1).Hash(),
+ {},
+ pm.blockchain.GetBlockByNumber(10).Hash(),
+ {},
+ pm.blockchain.GetBlockByNumber(100).Hash(),
+ {},
+ }, []bool{false, true, false, true, false, true, false}, 3},
+ }
+ // Run each of the tests and verify the results against the chain
+ for i, tt := range tests {
+ // Collect the hashes to request, and the response to expect
+ hashes, seen := []common.Hash{}, make(map[int64]bool)
+ bodies := []*blockBody{}
+
+ for j := 0; j < tt.random; j++ {
+ for {
+ num := rand.Int63n(int64(pm.blockchain.CurrentBlock().NumberU64()))
+ if !seen[num] {
+ seen[num] = true
+
+ block := pm.blockchain.GetBlockByNumber(uint64(num))
+ hashes = append(hashes, block.Hash())
+ if len(bodies) < tt.expected {
+ bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
+ }
+ break
+ }
+ }
+ }
+ for j, hash := range tt.explicit {
+ hashes = append(hashes, hash)
+ if tt.available[j] && len(bodies) < tt.expected {
+ block := pm.blockchain.GetBlockByHash(hash)
+ bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
+ }
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x05, hashes)
+ if err := p2p.ExpectMsg(peer.app, 0x06, bodies); err != nil {
+ t.Errorf("test %d: bodies mismatch: %v", i, err)
+ }
+ }
+}
+
+// Tests that the node state database can be retrieved based on hashes.
+func TestGetNodeData63(t *testing.T) { testGetNodeData(t, 63) }
+
+func testGetNodeData(t *testing.T, protocol int) {
+ // Define three accounts to simulate transactions with
+ acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
+ acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
+ acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey)
+ acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey)
+
+ signer := types.HomesteadSigner{}
+ // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test)
+ generator := func(i int, block *core.BlockGen) {
+ switch i {
+ case 0:
+ // In block 1, the test bank sends account #1 some ether.
+ tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
+ block.AddTx(tx)
+ case 1:
+ // In block 2, the test bank sends some more ether to account #1.
+ // acc1Addr passes it on to account #2.
+ tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey)
+ tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key)
+ block.AddTx(tx1)
+ block.AddTx(tx2)
+ case 2:
+ // Block 3 is empty but was mined by account #2.
+ block.SetCoinbase(acc2Addr)
+ block.SetExtra([]byte("yeehaw"))
+ case 3:
+ // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data).
+ b2 := block.PrevBlock(1).Header()
+ b2.Extra = []byte("foo")
+ block.AddUncle(b2)
+ b3 := block.PrevBlock(2).Header()
+ b3.Extra = []byte("foo")
+ block.AddUncle(b3)
+ }
+ }
+ // Assemble the test environment
+ pm, db := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Fetch for now the entire chain db
+ hashes := []common.Hash{}
+ for _, key := range db.Keys() {
+ if len(key) == len(common.Hash{}) {
+ hashes = append(hashes, common.BytesToHash(key))
+ }
+ }
+ p2p.Send(peer.app, 0x0d, hashes)
+ msg, err := peer.app.ReadMsg()
+ if err != nil {
+ t.Fatalf("failed to read node data response: %v", err)
+ }
+ if msg.Code != 0x0e {
+ t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, 0x0c)
+ }
+ var data [][]byte
+ if err := msg.Decode(&data); err != nil {
+ t.Fatalf("failed to decode response node data: %v", err)
+ }
+ // Verify that all hashes correspond to the requested data, and reconstruct a state tree
+ for i, want := range hashes {
+ if hash := crypto.Keccak256Hash(data[i]); hash != want {
+ t.Errorf("data hash mismatch: have %x, want %x", hash, want)
+ }
+ }
+ statedb := ethdb.NewMemDatabase()
+ for i := 0; i < len(data); i++ {
+ statedb.Put(hashes[i].Bytes(), data[i])
+ }
+ accounts := []common.Address{testBank, acc1Addr, acc2Addr}
+ for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ {
+ trie, _ := state.New(pm.blockchain.GetBlockByNumber(i).Root(), state.NewDatabase(statedb))
+
+ for j, acc := range accounts {
+ state, _ := pm.blockchain.State()
+ bw := state.GetBalance(acc)
+ bh := trie.GetBalance(acc)
+
+ if (bw != nil && bh == nil) || (bw == nil && bh != nil) {
+ t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw)
+ }
+ if bw != nil && bh != nil && bw.Cmp(bw) != 0 {
+ t.Errorf("test %d, account %d: balance mismatch: have %v, want %v", i, j, bh, bw)
+ }
+ }
+ }
+}
+
+// Tests that the transaction receipts can be retrieved based on hashes.
+func TestGetReceipt63(t *testing.T) { testGetReceipt(t, 63) }
+
+func testGetReceipt(t *testing.T, protocol int) {
+ // Define three accounts to simulate transactions with
+ acc1Key, _ := crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
+ acc2Key, _ := crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee")
+ acc1Addr := crypto.PubkeyToAddress(acc1Key.PublicKey)
+ acc2Addr := crypto.PubkeyToAddress(acc2Key.PublicKey)
+
+ signer := types.HomesteadSigner{}
+ // Create a chain generator with some simple transactions (blatantly stolen from @fjl/chain_markets_test)
+ generator := func(i int, block *core.BlockGen) {
+ switch i {
+ case 0:
+ // In block 1, the test bank sends account #1 some ether.
+ tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
+ block.AddTx(tx)
+ case 1:
+ // In block 2, the test bank sends some more ether to account #1.
+ // acc1Addr passes it on to account #2.
+ tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBank), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey)
+ tx2, _ := types.SignTx(types.NewTransaction(block.TxNonce(acc1Addr), acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key)
+ block.AddTx(tx1)
+ block.AddTx(tx2)
+ case 2:
+ // Block 3 is empty but was mined by account #2.
+ block.SetCoinbase(acc2Addr)
+ block.SetExtra([]byte("yeehaw"))
+ case 3:
+ // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data).
+ b2 := block.PrevBlock(1).Header()
+ b2.Extra = []byte("foo")
+ block.AddUncle(b2)
+ b3 := block.PrevBlock(2).Header()
+ b3.Extra = []byte("foo")
+ block.AddUncle(b3)
+ }
+ }
+ // Assemble the test environment
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 4, generator, nil)
+ peer, _ := newTestPeer("peer", protocol, pm, true)
+ defer peer.close()
+
+ // Collect the hashes to request, and the response to expect
+ hashes, receipts := []common.Hash{}, []types.Receipts{}
+ for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ {
+ block := pm.blockchain.GetBlockByNumber(i)
+
+ hashes = append(hashes, block.Hash())
+ receipts = append(receipts, pm.blockchain.GetReceiptsByHash(block.Hash()))
+ }
+ // Send the hash request and verify the response
+ p2p.Send(peer.app, 0x0f, hashes)
+ if err := p2p.ExpectMsg(peer.app, 0x10, receipts); err != nil {
+ t.Errorf("receipts mismatch: %v", err)
+ }
+}
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 7e3479958..dcda6f4d2 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -37,7 +37,7 @@ import (
"github.com/dexon-foundation/dexon/ethdb"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/p2p"
- "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/params"
)
@@ -48,24 +48,47 @@ var (
// testP2PServer is a fake, helper p2p server for testing purposes.
type testP2PServer struct {
- added chan *discover.Node
- removed chan *discover.Node
+ mu sync.Mutex
+ self *enode.Node
+ direct map[enode.NodeID]*enode.Node
+ group map[string][]*enode.Node
}
-func (s *testP2PServer) Self() *discover.Node {
- return &discover.Node{}
+func newTestP2PServer(self *enode.Node) *testP2PServer {
+ return &testP2PServer{
+ self: self,
+ direct: make(map[enode.NodeID]*enode.Node),
+ group: make(map[string][]*enode.Node),
+ }
}
-func (s *testP2PServer) AddNotaryPeer(node *discover.Node) {
- if s.added != nil {
- s.added <- node
- }
+func (s *testP2PServer) Self() *enode.Node {
+ return s.self
}
-func (s *testP2PServer) RemoveNotaryPeer(node *discover.Node) {
- if s.removed != nil {
- s.removed <- node
- }
+func (s *testP2PServer) AddDirectPeer(node *enode.Node) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.direct[node.ID] = node
+}
+
+func (s *testP2PServer) RemoveDirectPeer(node *enode.Node) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.direct, node.ID)
+}
+
+func (s *testP2PServer) AddGroup(
+ name string, nodes []*enode.Node, num uint64) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.group[name] = nodes
+}
+
+func (s *testP2PServer) RemoveGroup(name string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.group, name)
}
// newTestProtocolManager creates a new protocol manager for testing purposes,
@@ -88,11 +111,11 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
panic(err)
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, &testGovernance{})
if err != nil {
return nil, nil, err
}
- pm.Start(&testP2PServer{}, 1000)
+ pm.Start(newTestP2PServer(&enode.Node{}), 1000)
return pm, db, nil
}
@@ -157,6 +180,29 @@ func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *typ
return tx
}
+// testGovernance is a fake, helper governance for testing purposes
+type testGovernance struct {
+ getChainNumFunc func(uint64) uint32
+ getNotarySetFunc func(uint32, uint64) map[string]struct{}
+ getDKGSetFunc func(uint64) map[string]struct{}
+}
+
+func (g *testGovernance) GetChainNum(round uint64) uint32 {
+ return g.getChainNumFunc(round)
+}
+
+func (g *testGovernance) GetNotarySet(chainID uint32, round uint64) map[string]struct{} {
+ return g.getNotarySetFunc(chainID, round)
+}
+
+func (g *testGovernance) GetDKGSet(round uint64) map[string]struct{} {
+ return g.getDKGSetFunc(round)
+}
+
+func (g *testGovernance) SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription {
+ return nil
+}
+
// testPeer is a simulated peer to allow testing direct network calls.
type testPeer struct {
net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging
@@ -170,7 +216,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
app, net := p2p.MsgPipe()
// Generate a random id and create the peer
- var id discover.NodeID
+ var id enode.NodeID
rand.Read(id[:])
peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net)
diff --git a/dex/network.go b/dex/network.go
index c3a0ef792..b19b46147 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -23,14 +23,6 @@ func (n *DexconNetwork) BroadcastVote(vote *types.Vote) {
func (n *DexconNetwork) BroadcastBlock(block *types.Block) {
}
-// BroadcastRandomnessRequest broadcasts rand request to DKG set.
-func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) {
-}
-
-// BroadcastRandomnessResult broadcasts rand request to Notary set.
-func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) {
-}
-
// SendDKGPrivateShare sends PrivateShare to a DKG participant.
func (n *DexconNetwork) SendDKGPrivateShare(
pub crypto.PublicKey, prvShare *types.DKGPrivateShare) {
@@ -47,6 +39,15 @@ func (n *DexconNetwork) BroadcastDKGPartialSignature(
psig *types.DKGPartialSignature) {
}
+// BroadcastRandomnessRequest broadcasts rand request to DKG set.
+func (n *DexconNetwork) BroadcastRandomnessRequest(randRequest *types.AgreementResult) {
+
+}
+
+// BroadcastRandomnessResult broadcasts rand request to Notary set.
+func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) {
+}
+
// ReceiveChan returns a channel to receive messages from DEXON network.
func (n *DexconNetwork) ReceiveChan() <-chan interface{} {
return n.receiveChan
diff --git a/dex/nodetable.go b/dex/nodetable.go
new file mode 100644
index 000000000..929b168a8
--- /dev/null
+++ b/dex/nodetable.go
@@ -0,0 +1,79 @@
+package dex
+
+import (
+ "net"
+ "sync"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/crypto/sha3"
+ "github.com/dexon-foundation/dexon/event"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/rlp"
+)
+
+type NodeMeta struct {
+ ID enode.ID
+ IP net.IP
+ UDP int
+ TCP int
+ Timestamp uint64
+ Sig []byte
+}
+
+func (n *NodeMeta) Hash() (h common.Hash) {
+ hw := sha3.NewKeccak256()
+ rlp.Encode(hw, n)
+ hw.Sum(h[:0])
+ return h
+}
+
+type newMetasEvent struct{ Metas []*NodeMeta }
+
+type nodeTable struct {
+ mu sync.RWMutex
+ entry map[enode.ID]*NodeMeta
+ feed event.Feed
+}
+
+func newNodeTable() *nodeTable {
+ return &nodeTable{
+ entry: make(map[enode.ID]*NodeMeta),
+ }
+}
+
+func (t *nodeTable) Get(id enode.ID) *NodeMeta {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ return t.entry[id]
+}
+
+func (t *nodeTable) Add(metas []*NodeMeta) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+
+ var newMetas []*NodeMeta
+ for _, meta := range metas {
+ // TODO: validate the meta
+ if e, ok := t.entry[meta.ID]; ok && e.Timestamp > meta.Timestamp {
+ continue
+ }
+ t.entry[meta.ID] = meta
+ newMetas = append(newMetas, meta)
+ }
+ t.feed.Send(newMetasEvent{newMetas})
+}
+
+func (t *nodeTable) Metas() []*NodeMeta {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ metas := make([]*NodeMeta, 0, len(t.entry))
+ for _, meta := range t.entry {
+ metas = append(metas, meta)
+ }
+ return metas
+}
+
+func (t *nodeTable) SubscribeNewMetasEvent(
+ ch chan<- newMetasEvent) event.Subscription {
+ return t.feed.Subscribe(ch)
+}
diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go
new file mode 100644
index 000000000..5b3f7de57
--- /dev/null
+++ b/dex/nodetable_test.go
@@ -0,0 +1,93 @@
+package dex
+
+import (
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+)
+
+func TestNodeTable(t *testing.T) {
+ table := newNodeTable()
+ ch := make(chan newMetasEvent)
+ table.SubscribeNewMetasEvent(ch)
+
+ metas1 := []*NodeMeta{
+ &NodeMeta{ID: randomID()},
+ &NodeMeta{ID: randomID()},
+ }
+
+ metas2 := []*NodeMeta{
+ &NodeMeta{ID: randomID()},
+ &NodeMeta{ID: randomID()},
+ }
+
+ go table.Add(metas1)
+
+ select {
+ case newMetas := <-ch:
+ m := map[common.Hash]struct{}{}
+ for _, meta := range newMetas.Metas {
+ m[meta.Hash()] = struct{}{}
+ }
+
+ if len(m) != len(metas1) {
+ t.Errorf("len mismatch: got %d, want: %d",
+ len(m), len(metas1))
+ }
+
+ for _, meta := range metas1 {
+ if _, ok := m[meta.Hash()]; !ok {
+ t.Errorf("expected meta (%s) not exists", meta.Hash())
+ }
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("did not receive new metas event within one second")
+ }
+
+ go table.Add(metas2)
+ select {
+ case newMetas := <-ch:
+ m := map[common.Hash]struct{}{}
+ for _, meta := range newMetas.Metas {
+ m[meta.Hash()] = struct{}{}
+ }
+
+ if len(m) != len(metas1) {
+ t.Errorf("len mismatch: got %d, want: %d",
+ len(m), len(metas2))
+ }
+
+ for _, meta := range metas2 {
+ if _, ok := m[meta.Hash()]; !ok {
+ t.Errorf("expected meta (%s) not exists", meta.Hash())
+ }
+ }
+ case <-time.After(1 * time.Second):
+ t.Error("did not receive new metas event within one second")
+ }
+
+ var metas []*NodeMeta
+ metas = append(metas, metas1...)
+ metas = append(metas, metas2...)
+ allMetas := table.Metas()
+ if len(allMetas) != len(metas) {
+ t.Errorf("all metas num mismatch: got %d, want %d",
+ len(metas), len(allMetas))
+ }
+
+ for _, m := range metas {
+ if m.Hash() != table.Get(m.ID).Hash() {
+ t.Errorf("meta (%s) mismatch", m.ID.String())
+ }
+ }
+}
+
+func randomID() (id enode.ID) {
+ for i := range id {
+ id[i] = byte(rand.Intn(255))
+ }
+ return id
+}
diff --git a/dex/notaryset.go b/dex/notaryset.go
deleted file mode 100644
index 74d259314..000000000
--- a/dex/notaryset.go
+++ /dev/null
@@ -1,203 +0,0 @@
-package dex
-
-import (
- "fmt"
- "sync"
-
- "github.com/dexon-foundation/dexon/p2p/discover"
-)
-
-type nodeInfo struct {
- info *notaryNodeInfo
- added bool
-}
-
-func (n *nodeInfo) NewNode() *discover.Node {
- return discover.NewNode(n.info.ID, n.info.IP, n.info.UDP, n.info.TCP)
-}
-
-type notarySet struct {
- round uint64
- m map[string]*nodeInfo
- lock sync.RWMutex
-}
-
-func newNotarySet(round uint64, s map[string]struct{}) *notarySet {
- m := make(map[string]*nodeInfo)
- for nodeID := range s {
- m[nodeID] = &nodeInfo{}
- }
-
- return &notarySet{
- round: round,
- m: m,
- }
-}
-
-// Call this function when the notaryNodeInfoMsg is received.
-func (n *notarySet) AddInfo(info *notaryNodeInfo) error {
- n.lock.Lock()
- defer n.lock.Unlock()
-
- // check round
- if info.Round != n.round {
- return fmt.Errorf("invalid round")
- }
-
- nInfo, ok := n.m[info.ID.String()]
- if !ok {
- return fmt.Errorf("not in notary set")
- }
-
- // if the info exists check timstamp
- if nInfo.info != nil {
- if nInfo.info.Timestamp > info.Timestamp {
- return fmt.Errorf("old msg")
- }
- }
-
- nInfo.info = info
- return nil
-}
-
-// MarkAdded mark the notary node as added
-// to prevent duplcate addition in the future.
-func (n *notarySet) MarkAdded(nodeID string) {
- if info, ok := n.m[nodeID]; ok {
- info.added = true
- }
-}
-
-// Return all nodes
-func (n *notarySet) Nodes() []*discover.Node {
- n.lock.RLock()
- defer n.lock.RUnlock()
-
- list := make([]*discover.Node, 0, len(n.m))
- for _, info := range n.m {
- list = append(list, info.NewNode())
- }
- return list
-}
-
-// Return nodes that need to be added to p2p server as notary node.
-func (n *notarySet) NodesToAdd() []*discover.Node {
- n.lock.RLock()
- defer n.lock.RUnlock()
-
- var list []*discover.Node
- for _, info := range n.m {
- // craete a new discover.Node
- if !info.added {
- list = append(list, info.NewNode())
- }
- }
- return list
-}
-
-type notarySetManager struct {
- m map[uint64]*notarySet
- lock sync.RWMutex
- queued map[uint64]map[string]*notaryNodeInfo
- round uint64 // biggest round of managed notary sets
- newNotaryNodeInfoCh chan *notaryNodeInfo
-}
-
-func newNotarySetManager(
- newNotaryNodeInfoCh chan *notaryNodeInfo) *notarySetManager {
- return &notarySetManager{
- m: make(map[uint64]*notarySet),
- queued: make(map[uint64]map[string]*notaryNodeInfo),
- newNotaryNodeInfoCh: newNotaryNodeInfoCh,
- }
-}
-
-// Register injects a new notary set into the manager and
-// processes the queued info.
-func (n *notarySetManager) Register(r uint64, s *notarySet) {
- n.lock.Lock()
- defer n.lock.Unlock()
- if r > n.round {
- n.round = r
- }
- n.m[r] = s
- n.processQueuedInfo()
-}
-
-// Unregister removes the notary set of the given round.
-func (n *notarySetManager) Unregister(r uint64) {
- n.lock.Lock()
- defer n.lock.Unlock()
- delete(n.m, r)
-}
-
-// Round returns the notary set of the given round.
-func (n *notarySetManager) Round(r uint64) (*notarySet, bool) {
- n.lock.RLock()
- defer n.lock.RUnlock()
- s, ok := n.m[r]
- return s, ok
-}
-
-// Before returns all the notary sets that before the given round.
-func (n *notarySetManager) Before(r uint64) []*notarySet {
- n.lock.RLock()
- defer n.lock.RUnlock()
- var list []*notarySet
- for round, s := range n.m {
- if round < r {
- list = append(list, s)
- }
- }
- return list
-}
-
-// TryAddInfo associates the given info to the notary set if the notary set is
-// managed by the manager.
-// If the notary node info is belong to future notary set, queue the info.
-func (n *notarySetManager) TryAddInfo(info *notaryNodeInfo) {
- n.lock.Lock()
- defer n.lock.Unlock()
- n.tryAddInfo(info)
-}
-
-// This function is extract for calling without lock.
-// Make sure the caller already accquired the lock.
-func (n *notarySetManager) tryAddInfo(info *notaryNodeInfo) {
- if info.Round > n.round {
- if q, ok := n.queued[info.Round]; ok {
- q[info.Hash().String()] = info
- return
- }
- n.queued[info.Round] = map[string]*notaryNodeInfo{
- info.Hash().String(): info,
- }
- return
- }
-
- s, ok := n.Round(info.Round)
- if !ok {
- return
- }
- s.AddInfo(info)
-
- // TODO(sonic): handle timeout
- n.newNotaryNodeInfoCh <- info
-}
-
-func (n *notarySetManager) processQueuedInfo() {
- n.lock.Lock()
- defer n.lock.Unlock()
- if q, ok := n.queued[n.round]; ok {
- for _, info := range q {
- n.tryAddInfo(info)
- }
- }
-
- // Clear queue infos before current round.
- for round := range n.queued {
- if round <= n.round {
- delete(n.queued, round)
- }
- }
-}
diff --git a/dex/peer.go b/dex/peer.go
index f1a4335d1..8c218f1d9 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -27,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -34,19 +35,20 @@ var (
errClosed = errors.New("peer set is closed")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
- errInvalidRound = errors.New("invalid round")
)
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- maxKnownInfos = 1024
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
maxQueuedTxs = 128
+ maxQueuedMetas = 512
+
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
@@ -57,9 +59,9 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
- maxQueuedInfos = 1024
-
handshakeTimeout = 5 * time.Second
+
+ groupNodeNum = 3
)
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
@@ -76,13 +78,27 @@ type propEvent struct {
td *big.Int
}
+type setType uint32
+
+const (
+ dkgset = iota
+ notaryset
+)
+
+type peerLabel struct {
+ set setType
+ chainID uint32
+ round uint64
+}
+
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
- version int // Protocol version negotiated
+ version int // Protocol version negotiated
+ labels mapset.Set
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
head common.Hash
@@ -90,12 +106,12 @@ type peer struct {
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownInfos mapset.Set // Set of infos known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -104,20 +120,21 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
Peer: p,
rw: rw,
version: version,
- id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
+ labels: mapset.NewSet(),
+ id: p.ID().String(),
knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
- knownInfos: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
- queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos),
term: make(chan struct{}),
}
}
// broadcast is a write loop that multiplexes block propagations, announcements,
-// transaction and notary node infos broadcasts into the remote peer.
+// transaction and notary node metas broadcasts into the remote peer.
// The goal is to have an async writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
@@ -128,6 +145,12 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Broadcast transactions", "count", len(txs))
+ case metas := <-p.queuedMetas:
+ if err := p.SendNodeMetas(metas); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast node metas", "count", len(metas))
+
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
@@ -140,12 +163,6 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
- case info := <-p.queuedInfos:
- if err := p.SendNotaryNodeInfo(info); err != nil {
- return
- }
- p.Log().Trace("Broadcast notary node info")
-
case <-p.term:
return
}
@@ -157,6 +174,14 @@ func (p *peer) close() {
close(p.term)
}
+func (p *peer) addLabel(label peerLabel) {
+ p.labels.Add(label)
+}
+
+func (p *peer) removeLabel(label peerLabel) {
+ p.labels.Remove(label)
+}
+
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo {
hash, td := p.Head()
@@ -207,11 +232,11 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
-func (p *peer) MarkNotaryNodeInfo(hash common.Hash) {
- for p.knownInfos.Cardinality() >= maxKnownInfos {
- p.knownInfos.Pop()
+func (p *peer) MarkNodeMeta(hash common.Hash) {
+ for p.knownMetas.Cardinality() >= maxKnownMetas {
+ p.knownMetas.Pop()
}
- p.knownInfos.Add(hash)
+ p.knownMetas.Add(hash)
}
// SendTransactions sends transactions to the peer and includes the hashes
@@ -236,21 +261,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
}
}
-// SendNotaryNodeInfo sends the info to the peer and includes the hashes
-// in its info hash set for future reference.
-func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error {
- return p2p.Send(p.rw, NotaryNodeInfoMsg, info)
+// SendNodeMetas sends the metas to the peer and includes the hashes
+// in its metas hash set for future reference.
+func (p *peer) SendNodeMetas(metas []*NodeMeta) error {
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
+ return p2p.Send(p.rw, MetaMsg, metas)
}
-// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a
+// AsyncSendNodeMeta queues list of notary node meta propagation to a
// remote peer. If the peer's broadcast queue is full, the event is silently
// dropped.
-func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) {
+func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) {
select {
- case p.queuedInfos <- info:
- p.knownInfos.Add(info.Hash())
+ case p.queuedMetas <- metas:
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
default:
- p.Log().Debug("Dropping notary node info propagation")
+ p.Log().Debug("Dropping node meta propagation", "count", len(metas))
}
}
@@ -431,7 +461,7 @@ func (p *peer) readStatus(network uint64, status *statusData, genesis common.Has
// String implements fmt.Stringer.
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("eth/%2d", p.version),
+ fmt.Sprintf("dex/%2d", p.version),
)
}
@@ -441,18 +471,25 @@ type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
closed bool
+ tab *nodeTable
- // TODO(sonic): revist this map after dexon core SDK is finalized.
- // use types.ValidatorID? or implement Stringer for types.ValidatorID
- notaryPeers map[uint64]map[string]*peer
- round uint64
+ srvr p2pServer
+ gov governance
+ peerLabels map[string]map[peerLabel]struct{}
+ notaryHistory map[uint64]struct{}
+ dkgHistory map[uint64]struct{}
}
// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
+func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
return &peerSet{
- peers: make(map[string]*peer),
- notaryPeers: make(map[uint64]map[string]*peer),
+ peers: make(map[string]*peer),
+ gov: gov,
+ srvr: srvr,
+ tab: tab,
+ peerLabels: make(map[string]map[peerLabel]struct{}),
+ notaryHistory: make(map[uint64]struct{}),
+ dkgHistory: make(map[uint64]struct{}),
}
}
@@ -537,14 +574,14 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
return list
}
-// PeersWithoutNotaryNodeInfo retrieves a list of peers that do not have a
-// given info in their set of known hashes.
-func (ps *peerSet) PeersWithoutNotaryNodeInfo(hash common.Hash) []*peer {
+// PeersWithoutNodeMeta retrieves a list of peers that do not have a
+// given meta in their set of known hashes.
+func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if p.knownInfos.Contains(hash) {
+ if !p.knownMetas.Contains(hash) {
list = append(list, p)
}
}
@@ -580,44 +617,171 @@ func (ps *peerSet) Close() {
ps.closed = true
}
-// AddNotaryPeer adds a peer into notary peer of the given round.
-// Caller of this function need to make sure that the peer is acutally in
-// notary set.
-func (ps *peerSet) AddNotaryPeer(round uint64, p *peer) error {
+func (ps *peerSet) BuildNotaryConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- // TODO(sonic): revisit this round check after dexon core SDK is finalized.
- if round < ps.round || round > ps.round+2 {
- return errInvalidRound
+ if _, ok := ps.notaryHistory[round]; ok {
+ return
}
- if _, ok := ps.peers[p.id]; !ok {
- return errNotRegistered
+ ps.notaryHistory[round] = struct{}{}
+
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+
+ // not in notary set, add group
+ if _, ok := s[selfID]; !ok {
+ var nodes []*discover.Node
+ for id := range s {
+ nodes = append(nodes, ps.newNode(id))
+ }
+ ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum)
+ continue
+ }
+
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, label)
+ }
}
+}
- ps.notaryPeers[round][p.id] = p
- return nil
+func (ps *peerSet) ForgetNotaryConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.notaryHistory {
+ if r <= round {
+ ps.forgetNotaryConn(r)
+ delete(ps.notaryHistory, r)
+ }
+ }
}
-// NotaryPeers return peers in notary set of the given round.
-func (ps *peerSet) NotaryPeers(round uint64) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (ps *peerSet) forgetNotaryConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+ if _, ok := s[selfID]; !ok {
+ ps.srvr.RemoveGroup(notarySetName(chainID, round))
+ continue
+ }
- list := make([]*peer, 0, len(ps.notaryPeers[round]))
- for _, p := range ps.notaryPeers[round] {
- if _, ok := ps.peers[p.id]; ok {
- list = append(list, p)
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.removeDirectPeer(id, label)
}
}
- return list
}
-// NextRound moves notary peer set to next round.
-func (ps *peerSet) NextRound() {
+func notarySetName(chainID uint32, round uint64) string {
+ return fmt.Sprintf("%d-%d-notaryset", chainID, round)
+}
+
+func (ps *peerSet) BuildDKGConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- delete(ps.notaryPeers, ps.round)
- ps.round = ps.round + 1
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+ ps.dkgHistory[round] = struct{}{}
+
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, peerLabel{
+ set: dkgset,
+ round: round,
+ })
+ }
+}
+
+func (ps *peerSet) ForgetDKGConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.dkgHistory {
+ if r <= round {
+ ps.forgetDKGConn(r)
+ delete(ps.dkgHistory, r)
+ }
+ }
+}
+
+func (ps *peerSet) forgetDKGConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+
+ delete(s, selfID)
+ label := peerLabel{
+ set: dkgset,
+ round: round,
+ }
+ for id := range s {
+ ps.removeDirectPeer(id, label)
+ }
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
+ // if the peer exists add the label
+ if p, ok := ps.peers[id]; ok {
+ p.addLabel(label)
+ }
+
+ if _, ok := ps.peerLabels[id]; !ok {
+ ps.peerLabels[id] = make(map[peerLabel]struct{})
+ }
+
+ ps.peerLabels[id][label] = struct{}{}
+ ps.srvr.AddDirectPeer(ps.newNode(id))
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) removeDirectPeer(id string, label peerLabel) {
+ if p, ok := ps.peers[id]; ok {
+ p.removeLabel(label)
+ }
+
+ delete(ps.peerLabels[id], label)
+
+ if len(ps.peerLabels[id]) == 0 {
+ ps.srvr.RemoveDirectPeer(ps.newNode(id))
+ delete(ps.peerLabels, id)
+ }
+}
+
+func (ps *peerSet) newNode(id string) *enode.Node {
+ nodeID := enode.HexID(id)
+ meta := ps.tab.Get(enode.HexID(id))
+
+ var r enr.Record
+ r.Set(enr.ID(nodeID.String()))
+ r.Set(enr.IP(meta.IP))
+ r.Set(enr.TCP(meta.TCP))
+ r.Set(enr.UDP(meta.UDP))
+
+ n, err := enode.New(enode.ValidSchemes, &r)
+ if err != nil {
+ panic(err)
+ }
+ return n
}
diff --git a/dex/peer_test.go b/dex/peer_test.go
new file mode 100644
index 000000000..bac6ed5ec
--- /dev/null
+++ b/dex/peer_test.go
@@ -0,0 +1,628 @@
+package dex
+
+import (
+ "fmt"
+ "math/big"
+ "testing"
+
+ mapset "github.com/deckarep/golang-set"
+ "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
+)
+
+func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
+ self := discover.Node{ID: nodeID(0)}
+ server := newTestP2PServer(&self)
+ table := newNodeTable()
+
+ gov := &testGovernance{
+ getChainNumFunc: func(uint64) uint32 {
+ return 3
+ },
+ }
+
+ round10 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(1), nodeID(2)},
+ []enode.ID{nodeID(1), nodeID(3)},
+ []enode.ID{nodeID(2), nodeID(4)},
+ }
+ round11 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(1), nodeID(5)},
+ []enode.ID{nodeID(5), nodeID(6)},
+ []enode.ID{nodeID(0), nodeID(2), nodeID(4)},
+ }
+ round12 := [][]enode.ID{
+ []enode.ID{nodeID(0), nodeID(3), nodeID(5)},
+ []enode.ID{nodeID(0), nodeID(7), nodeID(8)},
+ []enode.ID{nodeID(0), nodeID(2), nodeID(6)},
+ }
+
+ gov.getNotarySetFunc = func(cid uint32, round uint64) map[string]struct{} {
+ m := map[uint64][][]enode.ID{
+ 10: round10,
+ 11: round11,
+ 12: round12,
+ }
+ return newTestNodeSet(m[round][cid])
+ }
+
+ ps := newPeerSet(gov, server, table)
+ peer1 := newDummyPeer(nodeID(1))
+ peer2 := newDummyPeer(nodeID(2))
+ var err error
+ err = ps.Register(peer1)
+ if err != nil {
+ t.Error(err)
+ }
+ err = ps.Register(peer2)
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 10
+ ps.BuildNotaryConn(10)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 11
+ ps.BuildNotaryConn(11)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(4).String(): []peerLabel{
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 11},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 11}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(4), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ notarySetName(1, 11),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 12
+ ps.BuildNotaryConn(12)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ peerLabel{notaryset, 2, 12},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 0, 11},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 0, 10},
+ peerLabel{notaryset, 2, 11},
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(4).String(): []peerLabel{
+ peerLabel{notaryset, 2, 11},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 11},
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(6).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(7).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ nodeID(8).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 11, 12}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(3), nodeID(4),
+ nodeID(5), nodeID(6), nodeID(7), nodeID(8),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{
+ notarySetName(1, 10),
+ notarySetName(2, 10),
+ notarySetName(1, 11),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 11
+ ps.ForgetNotaryConn(11)
+
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(2).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{notaryset, 0, 12},
+ },
+ nodeID(6).String(): []peerLabel{
+ peerLabel{notaryset, 2, 12},
+ },
+ nodeID(7).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ nodeID(8).String(): []peerLabel{
+ peerLabel{notaryset, 1, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{12}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(2), nodeID(3),
+ nodeID(5), nodeID(6), nodeID(7), nodeID(8),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{})
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 12
+ ps.ForgetNotaryConn(12)
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{}, notaryset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkGroup(server, []string{})
+ if err != nil {
+ t.Error(err)
+ }
+
+}
+
+func TestPeerSetBuildDKGConn(t *testing.T) {
+ self := discover.Node{ID: nodeID(0)}
+ server := newTestP2PServer(&self)
+ table := newNodeTable()
+
+ gov := &testGovernance{}
+
+ gov.getDKGSetFunc = func(round uint64) map[string]struct{} {
+ m := map[uint64][]enode.ID{
+ 10: []enode.ID{nodeID(0), nodeID(1), nodeID(2)},
+ 11: []enode.ID{nodeID(1), nodeID(2), nodeID(5)},
+ 12: []enode.ID{nodeID(0), nodeID(3), nodeID(5)},
+ }
+ return newTestNodeSet(m[round])
+ }
+
+ ps := newPeerSet(gov, server, table)
+ peer1 := newDummyPeer(nodeID(1))
+ peer2 := newDummyPeer(nodeID(2))
+ var err error
+ err = ps.Register(peer1)
+ if err != nil {
+ t.Error(err)
+ }
+ err = ps.Register(peer2)
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 10
+ ps.BuildDKGConn(10)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 11
+ ps.BuildDKGConn(11)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // build round 12
+ ps.BuildDKGConn(12)
+
+ err = checkLabels(peer1, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(1).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(2).String(): []peerLabel{
+ peerLabel{dkgset, 0, 10},
+ },
+ nodeID(3).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{10, 12}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(1), nodeID(2), nodeID(3), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 11
+ ps.ForgetDKGConn(11)
+
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{
+ nodeID(3).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ nodeID(5).String(): []peerLabel{
+ peerLabel{dkgset, 0, 12},
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{12}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{
+ nodeID(3), nodeID(5),
+ })
+ if err != nil {
+ t.Error(err)
+ }
+
+ // forget round 12
+ ps.ForgetDKGConn(12)
+ err = checkLabels(peer1, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkLabels(peer2, []peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerLabels(ps, map[string][]peerLabel{})
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkPeerSetHistory(ps, []uint64{}, dkgset)
+ if err != nil {
+ t.Error(err)
+ }
+ err = checkDirectPeer(server, []enode.ID{})
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func checkLabels(p *peer, want []peerLabel) error {
+ if p.labels.Cardinality() != len(want) {
+ return fmt.Errorf("num of labels mismatch: got %d, want %d",
+ p.labels.Cardinality(), len(want))
+ }
+
+ for _, label := range want {
+ if !p.labels.Contains(label) {
+ return fmt.Errorf("label %+v not exist", label)
+ }
+ }
+ return nil
+}
+
+func checkPeerLabels(ps *peerSet, want map[string][]peerLabel) error {
+ if len(ps.peerLabels) != len(want) {
+ return fmt.Errorf("peer num mismatch: got %d, want %d",
+ len(ps.peerLabels), len(want))
+ }
+
+ for peerID, gotLabels := range ps.peerLabels {
+ wantLabels, ok := want[peerID]
+ if !ok {
+ return fmt.Errorf("peer id %s not exists", peerID)
+ }
+
+ if len(gotLabels) != len(wantLabels) {
+ return fmt.Errorf(
+ "num of labels of peer id %s mismatch: got %d, want %d",
+ peerID, len(gotLabels), len(wantLabels))
+ }
+
+ for _, label := range wantLabels {
+ if _, ok := gotLabels[label]; !ok {
+ fmt.Errorf("label: %+v not exists", label)
+ }
+ }
+ }
+ return nil
+}
+
+func checkPeerSetHistory(ps *peerSet, want []uint64, set setType) error {
+ var history map[uint64]struct{}
+ switch set {
+ case notaryset:
+ history = ps.notaryHistory
+ case dkgset:
+ history = ps.dkgHistory
+ default:
+ return fmt.Errorf("invalid set: %d", set)
+ }
+
+ if len(history) != len(want) {
+ return fmt.Errorf("num of history mismatch: got %d, want %d",
+ len(history), len(want))
+ }
+
+ for _, r := range want {
+ if _, ok := history[r]; !ok {
+ return fmt.Errorf("round %d not exists", r)
+ }
+ }
+ return nil
+}
+
+func checkDirectPeer(srvr *testP2PServer, want []enode.ID) error {
+ if len(srvr.direct) != len(want) {
+ return fmt.Errorf("num of direct peer mismatch: got %d, want %d",
+ len(srvr.direct), len(want))
+ }
+
+ for _, id := range want {
+ if _, ok := srvr.direct[id]; !ok {
+ return fmt.Errorf("direct peer %s not exists", id.String())
+ }
+ }
+ return nil
+}
+func checkGroup(srvr *testP2PServer, want []string) error {
+ if len(srvr.group) != len(want) {
+ return fmt.Errorf("num of group mismatch: got %d, want %d",
+ len(srvr.group), len(want))
+ }
+
+ for _, name := range want {
+ if _, ok := srvr.group[name]; !ok {
+ return fmt.Errorf("group %s not exists", name)
+ }
+ }
+ return nil
+}
+
+func nodeID(n int64) enode.ID {
+ b := big.NewInt(n).Bytes()
+ var id enode.ID
+ copy(id[len(id)-len(b):], b)
+ return id
+}
+
+func newTestNodeSet(nodes []enode.ID) map[string]struct{} {
+ m := make(map[string]struct{})
+ for _, node := range nodes {
+ m[node.String()] = struct{}{}
+ }
+ return m
+}
+
+func newDummyPeer(id enode.ID) *peer {
+ return &peer{
+ labels: mapset.NewSet(),
+ id: id.String(),
+ }
+}
diff --git a/dex/protocol.go b/dex/protocol.go
index 0111edf18..7b01217ff 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -20,14 +20,11 @@ import (
"fmt"
"io"
"math/big"
- "net"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/types"
- "github.com/dexon-foundation/dexon/crypto/sha3"
"github.com/dexon-foundation/dexon/event"
- "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -67,7 +64,7 @@ const (
ReceiptsMsg = 0x10
// Protocol messages belonging to dex/64
- NotaryNodeInfoMsg = 0x11
+ MetaMsg = 0x11
)
type errCode int
@@ -114,12 +111,26 @@ type txPool interface {
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
}
+type governance interface {
+ GetChainNum(uint64) uint32
+
+ GetNotarySet(uint32, uint64) map[string]struct{}
+
+ GetDKGSet(uint64) map[string]struct{}
+
+ SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription
+}
+
type p2pServer interface {
Self() *enode.Node
- AddNotaryPeer(*discover.Node)
+ AddDirectPeer(*enode.Node)
+
+ RemoveDirectPeer(*enode.Node)
- RemoveNotaryPeer(*discover.Node)
+ AddGroup(string, []*enode.Node, uint64)
+
+ RemoveGroup(string)
}
// statusData is the network packet for the status message.
@@ -195,22 +206,3 @@ type blockBody struct {
// blockBodiesData is the network packet for block content distribution.
type blockBodiesData []*blockBody
-
-// TODO(sonic): revisit this msg when dexon core SDK is finalized.
-// notartyNodeInfo is the network packet for notary node ip info.
-type notaryNodeInfo struct {
- ID discover.NodeID
- IP net.IP
- UDP uint16
- TCP uint16
- Round uint64
- Sig []byte
- Timestamp int64
-}
-
-func (n *notaryNodeInfo) Hash() (h common.Hash) {
- hw := sha3.NewKeccak256()
- rlp.Encode(hw, n)
- hw.Sum(h[:0])
- return h
-}
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index c1b6efcfc..8c7638b2b 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -221,3 +221,85 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) {
}
}
}
+
+func TestRecvNodeMetas(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ meta := NodeMeta{
+ ID: nodeID(1),
+ }
+
+ ch := make(chan newMetasEvent)
+ pm.nodeTable.SubscribeNewMetasEvent(ch)
+
+ if err := p2p.Send(p.app, MetaMsg, []interface{}{meta}); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+
+ select {
+ case event := <-ch:
+ metas := event.Metas
+ if len(metas) != 1 {
+ t.Errorf("wrong number of new metas: got %d, want 1", len(metas))
+ } else if metas[0].Hash() != meta.Hash() {
+ t.Errorf("added wrong meta hash: got %v, want %v", metas[0].Hash(), meta.Hash())
+ }
+ case <-time.After(3 * time.Second):
+ t.Errorf("no newMetasEvent received within 3 seconds")
+ }
+}
+
+func TestSendNodeMetas(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ defer pm.Stop()
+
+ allmetas := make([]*NodeMeta, 100)
+ for nonce := range allmetas {
+ allmetas[nonce] = &NodeMeta{ID: nodeID(int64(nonce))}
+ }
+
+ // Connect several peers. They should all receive the pending transactions.
+ var wg sync.WaitGroup
+ checkmetas := func(p *testPeer) {
+ defer wg.Done()
+ defer p.close()
+ seen := make(map[common.Hash]bool)
+ for _, meta := range allmetas {
+ seen[meta.Hash()] = false
+ }
+ for n := 0; n < len(allmetas) && !t.Failed(); {
+ var metas []*NodeMeta
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != MetaMsg {
+ t.Errorf("%v: got code %d, want MetaMsg", p.Peer, msg.Code)
+ }
+ if err := msg.Decode(&metas); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+ for _, meta := range metas {
+ hash := meta.Hash()
+ seenmeta, want := seen[hash]
+ if seenmeta {
+ t.Errorf("%v: got meta more than once: %x", p.Peer, hash)
+ }
+ if !want {
+ t.Errorf("%v: got unexpected meta: %x", p.Peer, hash)
+ }
+ seen[hash] = true
+ n++
+ }
+ }
+ }
+ for i := 0; i < 3; i++ {
+ p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true)
+ wg.Add(1)
+ go checkmetas(p)
+ }
+ pm.nodeTable.Add(allmetas)
+ wg.Wait()
+}
diff --git a/dex/sync.go b/dex/sync.go
index d7fe748bc..5af6076bc 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -25,7 +25,7 @@ import (
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/eth/downloader"
"github.com/dexon-foundation/dexon/log"
- "github.com/dexon-foundation/dexon/p2p/discover"
+ "github.com/dexon-foundation/dexon/p2p/enode"
)
const (
@@ -35,6 +35,9 @@ const (
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
+
+ // This is the target number for the packs of metas sent by metasyncLoop.
+ metasyncPackNum = 1024
)
type txsync struct {
@@ -64,7 +67,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) {
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
var (
- pending = make(map[discover.NodeID]*txsync)
+ pending = make(map[enode.ID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
@@ -129,6 +132,94 @@ func (pm *ProtocolManager) txsyncLoop() {
}
}
+type metasync struct {
+ p *peer
+ metas []*NodeMeta
+}
+
+// syncNodeMetas starts sending all node metas to the given peer.
+func (pm *ProtocolManager) syncNodeMetas(p *peer) {
+ metas := pm.nodeTable.Metas()
+ if len(metas) == 0 {
+ return
+ }
+ select {
+ case pm.metasyncCh <- &metasync{p, metas}:
+ case <-pm.quitSync:
+ }
+}
+
+// metasyncLoop takes care of the initial node meta sync for each new
+// connection. When a new peer appears, we relay all currently node metas.
+// In order to minimise egress bandwidth usage, we send
+// the metas in small packs to one peer at a time.
+func (pm *ProtocolManager) metasyncLoop() {
+ var (
+ pending = make(map[enode.ID]*metasync)
+ sending = false // whether a send is active
+ pack = new(metasync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *metasync) {
+ // Fill pack with node metas up to the target num.
+ var num int
+ pack.p = s.p
+ pack.metas = pack.metas[:0]
+ for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ {
+ pack.metas = append(pack.metas, s.metas[i])
+ num += 1
+ }
+ // Remove the metas that will be sent.
+ s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])]
+ if len(s.metas) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num)
+ sending = true
+ go func() { done <- pack.p.SendNodeMetas(pack.metas) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *metasync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-pm.metasyncCh:
+ pending[s.p.ID()] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ pack.p.Log().Debug("NodeMeta send failed", "err", err)
+ delete(pending, pack.p.ID())
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {