diff options
Diffstat (limited to 'swarm/network/forwarding.go')
-rw-r--r-- | swarm/network/forwarding.go | 150 |
1 files changed, 0 insertions, 150 deletions
diff --git a/swarm/network/forwarding.go b/swarm/network/forwarding.go deleted file mode 100644 index 88a82a678..000000000 --- a/swarm/network/forwarding.go +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package network - -import ( - "fmt" - "math/rand" - "time" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -const requesterCount = 3 - -/* -forwarder implements the CloudStore interface (use by storage.NetStore) -and serves as the cloud store backend orchestrating storage/retrieval/delivery -via the native bzz protocol -which uses an MSB logarithmic distance-based semi-permanent Kademlia table for -* recursive forwarding style routing for retrieval -* smart syncronisation -*/ - -type forwarder struct { - hive *Hive -} - -func NewForwarder(hive *Hive) *forwarder { - return &forwarder{hive: hive} -} - -// generate a unique id uint64 -func generateId() uint64 { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - return uint64(r.Int63()) -} - -var searchTimeout = 3 * time.Second - -// forwarding logic -// logic propagating retrieve requests to peers given by the kademlia hive -func (self *forwarder) Retrieve(chunk *storage.Chunk) { - peers := self.hive.getPeers(chunk.Key, 0) - log.Trace(fmt.Sprintf("forwarder.Retrieve: %v - received %d peers from KΛÐΞMLIΛ...", chunk.Key.Log(), len(peers))) -OUT: - for _, p := range peers { - log.Trace(fmt.Sprintf("forwarder.Retrieve: sending retrieveRequest %v to peer [%v]", chunk.Key.Log(), p)) - for _, recipients := range chunk.Req.Requesters { - for _, recipient := range recipients { - req := recipient.(*retrieveRequestMsgData) - if req.from.Addr() == p.Addr() { - continue OUT - } - } - } - req := &retrieveRequestMsgData{ - Key: chunk.Key, - Id: generateId(), - } - var err error - if p.swap != nil { - err = p.swap.Add(-1) - } - if err == nil { - p.retrieve(req) - break OUT - } - log.Warn(fmt.Sprintf("forwarder.Retrieve: unable to send retrieveRequest to peer [%v]: %v", chunk.Key.Log(), err)) - } -} - -// requests to specific peers given by the kademlia hive -// except for peers that the store request came from (if any) -// delivery queueing taken care of by syncer -func (self *forwarder) Store(chunk *storage.Chunk) { - var n int - msg := &storeRequestMsgData{ - Key: chunk.Key, - SData: chunk.SData, - } - var source *peer - if chunk.Source != nil { - source = chunk.Source.(*peer) - } - for _, p := range self.hive.getPeers(chunk.Key, 0) { - log.Trace(fmt.Sprintf("forwarder.Store: %v %v", p, chunk)) - - if p.syncer != nil && (source == nil || p.Addr() != source.Addr()) { - n++ - Deliver(p, msg, PropagateReq) - } - } - log.Trace(fmt.Sprintf("forwarder.Store: sent to %v peers (chunk = %v)", n, chunk)) -} - -// once a chunk is found deliver it to its requesters unless timed out -func (self *forwarder) Deliver(chunk *storage.Chunk) { - // iterate over request entries - for id, requesters := range chunk.Req.Requesters { - counter := requesterCount - msg := &storeRequestMsgData{ - Key: chunk.Key, - SData: chunk.SData, - } - var n int - var req *retrieveRequestMsgData - // iterate over requesters with the same id - for id, r := range requesters { - req = r.(*retrieveRequestMsgData) - if req.timeout == nil || req.timeout.After(time.Now()) { - log.Trace(fmt.Sprintf("forwarder.Deliver: %v -> %v", req.Id, req.from)) - msg.Id = uint64(id) - Deliver(req.from, msg, DeliverReq) - n++ - counter-- - if counter <= 0 { - break - } - } - } - log.Trace(fmt.Sprintf("forwarder.Deliver: submit chunk %v (request id %v) for delivery to %v peers", chunk.Key.Log(), id, n)) - } -} - -// initiate delivery of a chunk to a particular peer via syncer#addRequest -// depending on syncer mode and priority settings and sync request type -// this either goes via confirmation roundtrip or queued or pushed directly -func Deliver(p *peer, req interface{}, ty int) { - p.syncer.addRequest(req, ty) -} - -// push chunk over to peer -func Push(p *peer, key storage.Key, priority uint) { - p.syncer.doDelivery(key, priority, p.syncer.quit) -} |