diff options
author | ΞTHΞЯSPHΞЯΞ <{viktor.tron,nagydani,zsfelfoldi}@gmail.com> | 2016-08-30 03:18:00 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2016-08-31 22:19:40 +0800 |
commit | 4d300e4dece56535f56ccc32330340ce89e42581 (patch) | |
tree | 135838bfae03437eb2a50c6d66de4d66b20c3220 /swarm/network/hive.go | |
parent | 1f58b2d084b65eaec9aa2c2ecb1d3aae50d894b3 (diff) | |
download | dexon-4d300e4dece56535f56ccc32330340ce89e42581.tar.gz dexon-4d300e4dece56535f56ccc32330340ce89e42581.tar.zst dexon-4d300e4dece56535f56ccc32330340ce89e42581.zip |
swarm: plan bee for content storage and distribution on web3
This change imports the Swarm protocol codebase. Compared to the 'swarm'
branch, a few mostly cosmetic changes had to be made:
* The various redundant log message prefixes are gone.
* All files now have LGPLv3 license headers.
* Minor code changes were needed to please go vet and make the tests
pass on Windows.
* Further changes were required to adapt to the go-ethereum develop
branch and its new Go APIs.
Some code has not (yet) been brought over:
* swarm/cmd/bzzhash: will reappear as cmd/bzzhash later
* swarm/cmd/bzzup.sh: will be reimplemented in cmd/bzzup
* swarm/cmd/makegenesis: will reappear somehow
* swarm/examples/album: will move to a separate repository
* swarm/examples/filemanager: ditto
* swarm/examples/files: will not be merged
* swarm/test/*: will not be merged
* swarm/services/swear: will reappear as contracts/swear when needed
Diffstat (limited to 'swarm/network/hive.go')
-rw-r--r-- | swarm/network/hive.go | 383 |
1 files changed, 383 insertions, 0 deletions
diff --git a/swarm/network/hive.go b/swarm/network/hive.go new file mode 100644 index 000000000..f5ebdd008 --- /dev/null +++ b/swarm/network/hive.go @@ -0,0 +1,383 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "fmt" + "math/rand" + "path/filepath" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/swarm/network/kademlia" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// Hive is the logistic manager of the swarm +// it uses a generic kademlia nodetable to find best peer list +// for any target +// this is used by the netstore to search for content in the swarm +// the bzz protocol peersMsgData exchange is relayed to Kademlia +// for db storage and filtering +// connections and disconnections are reported and relayed +// to keep the nodetable uptodate + +type Hive struct { + listenAddr func() string + callInterval uint64 + id discover.NodeID + addr kademlia.Address + kad *kademlia.Kademlia + path string + quit chan bool + toggle chan bool + more chan bool + + // for testing only + swapEnabled bool + syncEnabled bool + blockRead bool + blockWrite bool +} + +const ( + callInterval = 3000000000 + // bucketSize = 3 + // maxProx = 8 + // proxBinSize = 4 +) + +type HiveParams struct { + CallInterval uint64 + KadDbPath string + *kademlia.KadParams +} + +func NewHiveParams(path string) *HiveParams { + kad := kademlia.NewKadParams() + // kad.BucketSize = bucketSize + // kad.MaxProx = maxProx + // kad.ProxBinSize = proxBinSize + + return &HiveParams{ + CallInterval: callInterval, + KadDbPath: filepath.Join(path, "bzz-peers.json"), + KadParams: kad, + } +} + +func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive { + kad := kademlia.New(kademlia.Address(addr), params.KadParams) + return &Hive{ + callInterval: params.CallInterval, + kad: kad, + addr: kad.Addr(), + path: params.KadDbPath, + swapEnabled: swapEnabled, + syncEnabled: syncEnabled, + } +} + +func (self *Hive) SyncEnabled(on bool) { + self.syncEnabled = on +} + +func (self *Hive) SwapEnabled(on bool) { + self.swapEnabled = on +} + +func (self *Hive) BlockNetworkRead(on bool) { + self.blockRead = on +} + +func (self *Hive) BlockNetworkWrite(on bool) { + self.blockWrite = on +} + +// public accessor to the hive base address +func (self *Hive) Addr() kademlia.Address { + return self.addr +} + +// Start receives network info only at startup +// listedAddr is a function to retrieve listening address to advertise to peers +// connectPeer is a function to connect to a peer based on its NodeID or enode URL +// there are called on the p2p.Server which runs on the node +func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) { + self.toggle = make(chan bool) + self.more = make(chan bool) + self.quit = make(chan bool) + self.id = id + self.listenAddr = listenAddr + err = self.kad.Load(self.path, nil) + if err != nil { + glog.V(logger.Warn).Infof("Warning: error reading kaddb '%s' (skipping): %v", self.path, err) + err = nil + } + // this loop is doing bootstrapping and maintains a healthy table + go self.keepAlive() + go func() { + // whenever toggled ask kademlia about most preferred peer + for alive := range self.more { + if !alive { + // receiving false closes the loop while allowing parallel routines + // to attempt to write to more (remove Peer when shutting down) + return + } + node, need, proxLimit := self.kad.Suggest() + + if node != nil && len(node.Url) > 0 { + glog.V(logger.Detail).Infof("call known bee %v", node.Url) + // enode or any lower level connection address is unnecessary in future + // discovery table is used to look it up. + connectPeer(node.Url) + } + if need { + // a random peer is taken from the table + peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1) + if len(peers) > 0 { + // a random address at prox bin 0 is sent for lookup + randAddr := kademlia.RandomAddressAt(self.addr, proxLimit) + req := &retrieveRequestMsgData{ + Key: storage.Key(randAddr[:]), + } + glog.V(logger.Detail).Infof("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0]) + peers[0].(*peer).retrieve(req) + } else { + glog.V(logger.Warn).Infof("no peer") + } + glog.V(logger.Detail).Infof("buzz kept alive") + } else { + glog.V(logger.Info).Infof("no need for more bees") + } + select { + case self.toggle <- need: + case <-self.quit: + return + } + glog.V(logger.Debug).Infof("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount()) + } + }() + return +} + +// keepAlive is a forever loop +// in its awake state it periodically triggers connection attempts +// by writing to self.more until Kademlia Table is saturated +// wake state is toggled by writing to self.toggle +// it restarts if the table becomes non-full again due to disconnections +func (self *Hive) keepAlive() { + alarm := time.NewTicker(time.Duration(self.callInterval)).C + for { + select { + case <-alarm: + if self.kad.DBCount() > 0 { + select { + case self.more <- true: + glog.V(logger.Debug).Infof("buzz wakeup") + default: + } + } + case need := <-self.toggle: + if alarm == nil && need { + alarm = time.NewTicker(time.Duration(self.callInterval)).C + } + if alarm != nil && !need { + alarm = nil + + } + case <-self.quit: + return + } + } +} + +func (self *Hive) Stop() error { + // closing toggle channel quits the updateloop + close(self.quit) + return self.kad.Save(self.path, saveSync) +} + +// called at the end of a successful protocol handshake +func (self *Hive) addPeer(p *peer) error { + defer func() { + select { + case self.more <- true: + default: + } + }() + glog.V(logger.Detail).Infof("hi new bee %v", p) + err := self.kad.On(p, loadSync) + if err != nil { + return err + } + // self lookup (can be encoded as nil/zero key since peers addr known) + no id () + // the most common way of saying hi in bzz is initiation of gossip + // let me know about anyone new from my hood , here is the storageradius + // to send the 6 byte self lookup + // we do not record as request or forward it, just reply with peers + p.retrieve(&retrieveRequestMsgData{}) + glog.V(logger.Detail).Infof("'whatsup wheresdaparty' sent to %v", p) + + return nil +} + +// called after peer disconnected +func (self *Hive) removePeer(p *peer) { + glog.V(logger.Debug).Infof("bee %v removed", p) + self.kad.Off(p, saveSync) + select { + case self.more <- true: + default: + } + if self.kad.Count() == 0 { + glog.V(logger.Debug).Infof("empty, all bees gone") + } +} + +// Retrieve a list of live peers that are closer to target than us +func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) { + var addr kademlia.Address + copy(addr[:], target[:]) + for _, node := range self.kad.FindClosest(addr, max) { + peers = append(peers, node.(*peer)) + } + return +} + +// disconnects all the peers +func (self *Hive) DropAll() { + glog.V(logger.Info).Infof("dropping all bees") + for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) { + node.Drop() + } +} + +// contructor for kademlia.NodeRecord based on peer address alone +// TODO: should go away and only addr passed to kademlia +func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord { + now := time.Now() + return &kademlia.NodeRecord{ + Addr: addr.Addr, + Url: addr.String(), + Seen: now, + After: now, + } +} + +// called by the protocol when receiving peerset (for target address) +// peersMsgData is converted to a slice of NodeRecords for Kademlia +// this is to store all thats needed +func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) { + var nrs []*kademlia.NodeRecord + for _, p := range req.Peers { + nrs = append(nrs, newNodeRecord(p)) + } + self.kad.Add(nrs) +} + +// peer wraps the protocol instance to represent a connected peer +// it implements kademlia.Node interface +type peer struct { + *bzz // protocol instance running on peer connection +} + +// protocol instance implements kademlia.Node interface (embedded peer) +func (self *peer) Addr() kademlia.Address { + return self.remoteAddr.Addr +} + +func (self *peer) Url() string { + return self.remoteAddr.String() +} + +// TODO take into account traffic +func (self *peer) LastActive() time.Time { + return self.lastActive +} + +// reads the serialised form of sync state persisted as the 'Meta' attribute +// and sets the decoded syncState on the online node +func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error { + p, ok := node.(*peer) + if !ok { + return fmt.Errorf("invalid type") + } + if record.Meta == nil { + glog.V(logger.Debug).Infof("no sync state for node record %v setting default", record) + p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}} + return nil + } + state, err := decodeSync(record.Meta) + if err != nil { + return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err) + } + glog.V(logger.Detail).Infof("sync state for node record %v read from Meta: %s", record, string(*(record.Meta))) + p.syncState = state + return err +} + +// callback when saving a sync state +func saveSync(record *kademlia.NodeRecord, node kademlia.Node) { + if p, ok := node.(*peer); ok { + meta, err := encodeSync(p.syncState) + if err != nil { + glog.V(logger.Warn).Infof("error saving sync state for %v: %v", node, err) + return + } + glog.V(logger.Detail).Infof("saved sync state for %v: %s", node, string(*meta)) + record.Meta = meta + } +} + +// the immediate response to a retrieve request, +// sends relevant peer data given by the kademlia hive to the requester +// TODO: remember peers sent for duration of the session, only new peers sent +func (self *Hive) peers(req *retrieveRequestMsgData) { + if req != nil && req.MaxPeers >= 0 { + var addrs []*peerAddr + if req.timeout == nil || time.Now().Before(*(req.timeout)) { + key := req.Key + // self lookup from remote peer + if storage.IsZeroKey(key) { + addr := req.from.Addr() + key = storage.Key(addr[:]) + req.Key = nil + } + // get peer addresses from hive + for _, peer := range self.getPeers(key, int(req.MaxPeers)) { + addrs = append(addrs, peer.remoteAddr) + } + glog.V(logger.Debug).Infof("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log()) + + peersData := &peersMsgData{ + Peers: addrs, + Key: req.Key, + Id: req.Id, + } + peersData.setTimeout(req.timeout) + req.from.peers(peersData) + } + } +} + +func (self *Hive) String() string { + return self.kad.String() +} |