aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
authorViktor TrĂ³n <viktor.tron@gmail.com>2018-09-12 17:24:56 +0800
committerBalint Gabor <balint.g@gmail.com>2018-09-12 17:24:56 +0800
commitbfce00385f1c8dab222b7ddab6c336177a5ae731 (patch)
tree8b3d2b2ce30e8b5eaf6db5b89a6c5570c3997cff /swarm/network/stream
parentb06ff563a1f1095407612e04a1707e13d2dc20da (diff)
downloadgo-tangerine-bfce00385f1c8dab222b7ddab6c336177a5ae731.tar.gz
go-tangerine-bfce00385f1c8dab222b7ddab6c336177a5ae731.tar.zst
go-tangerine-bfce00385f1c8dab222b7ddab6c336177a5ae731.zip
Kademlia refactor (#17641)
* swarm/network: simplify kademlia/hive; rid interfaces * swarm, swarm/network/stream, swarm/netork/simulations,, swarm/pss: adapt to new Kad API * swarm/network: minor changes re review; add missing lock to NeighbourhoodDepthC
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/delivery.go12
-rw-r--r--swarm/network/stream/snapshot_sync_test.go9
-rw-r--r--swarm/network/stream/stream.go18
3 files changed, 16 insertions, 23 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 36040339d..627352535 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -47,15 +47,15 @@ var (
type Delivery struct {
db *storage.DBAPI
- overlay network.Overlay
+ kad *network.Kademlia
receiveC chan *ChunkDeliveryMsg
getPeer func(discover.NodeID) *Peer
}
-func NewDelivery(overlay network.Overlay, db *storage.DBAPI) *Delivery {
+func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery {
d := &Delivery{
db: db,
- overlay: overlay,
+ kad: kad,
receiveC: make(chan *ChunkDeliveryMsg, deliveryCap),
}
@@ -172,7 +172,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
t := time.NewTimer(10 * time.Minute)
defer t.Stop()
- log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.overlay.BaseAddr()), "created", created)
+ log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created)
start := time.Now()
select {
case <-chunk.ReqC:
@@ -269,8 +269,8 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck
var err error
requestFromPeersCount.Inc(1)
- d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool {
- spId := p.(network.Peer).ID()
+ d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool {
+ spId := p.ID()
for _, p := range peersToSkip {
if p == spId {
log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId)
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 4e1ab09fc..313019d6a 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -457,15 +457,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
//returns the number of subscriptions requested
func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
var err error
-
- kad, ok := r.delivery.overlay.(*network.Kademlia)
- if !ok {
- return 0, fmt.Errorf("Not a Kademlia!")
- }
-
+ kad := r.delivery.kad
subCnt := 0
//iterate over each bin and solicit needed subscription to bins
- kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool {
+ kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool {
//identify begin and start index of the bin(s) we want to subscribe to
histRange := &Range{}
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index cd0580a0c..deffdfc3f 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -130,7 +130,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
// wait for kademlia table to be healthy
time.Sleep(options.SyncUpdateDelay)
- kad := streamer.delivery.overlay.(*network.Kademlia)
+ kad := streamer.delivery.kad
depthC := latestIntC(kad.NeighbourhoodDepthC())
addressBookSizeC := latestIntC(kad.AddrCountC())
@@ -398,9 +398,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
// and they are no longer required after iteration, request to Quit
// them will be send to appropriate peers.
func (r *Registry) updateSyncing() {
- // if overlay in not Kademlia, panic
- kad := r.delivery.overlay.(*network.Kademlia)
-
+ kad := r.delivery.kad
// map of all SYNC streams for all peers
// used at the and of the function to remove servers
// that are not needed anymore
@@ -421,8 +419,7 @@ func (r *Registry) updateSyncing() {
r.peersMu.RUnlock()
// request subscriptions for all nodes and bins
- kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(conn network.OverlayConn, bin int) bool {
- p := conn.(network.Peer)
+ kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin))
// bin is always less then 256 and it is safe to convert it to type uint8
@@ -461,10 +458,11 @@ func (r *Registry) updateSyncing() {
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, Spec)
- bzzPeer := network.NewBzzTestPeer(peer, r.addr)
- r.delivery.overlay.On(bzzPeer)
- defer r.delivery.overlay.Off(bzzPeer)
- return r.Run(bzzPeer)
+ bp := network.NewBzzPeer(peer, r.addr)
+ np := network.NewPeer(bp, r.delivery.kad)
+ r.delivery.kad.On(np)
+ defer r.delivery.kad.Off(np)
+ return r.Run(bp)
}
// HandleMsg is the message handler that delegates incoming messages