diff options
author | ΞTHΞЯSPHΞЯΞ <{viktor.tron,nagydani,zsfelfoldi}@gmail.com> | 2016-08-30 03:18:00 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2016-08-31 22:19:40 +0800 |
commit | 4d300e4dece56535f56ccc32330340ce89e42581 (patch) | |
tree | 135838bfae03437eb2a50c6d66de4d66b20c3220 /swarm/network | |
parent | 1f58b2d084b65eaec9aa2c2ecb1d3aae50d894b3 (diff) | |
download | dexon-4d300e4dece56535f56ccc32330340ce89e42581.tar.gz dexon-4d300e4dece56535f56ccc32330340ce89e42581.tar.zst dexon-4d300e4dece56535f56ccc32330340ce89e42581.zip |
swarm: plan bee for content storage and distribution on web3
This change imports the Swarm protocol codebase. Compared to the 'swarm'
branch, a few mostly cosmetic changes had to be made:
* The various redundant log message prefixes are gone.
* All files now have LGPLv3 license headers.
* Minor code changes were needed to please go vet and make the tests
pass on Windows.
* Further changes were required to adapt to the go-ethereum develop
branch and its new Go APIs.
Some code has not (yet) been brought over:
* swarm/cmd/bzzhash: will reappear as cmd/bzzhash later
* swarm/cmd/bzzup.sh: will be reimplemented in cmd/bzzup
* swarm/cmd/makegenesis: will reappear somehow
* swarm/examples/album: will move to a separate repository
* swarm/examples/filemanager: ditto
* swarm/examples/files: will not be merged
* swarm/test/*: will not be merged
* swarm/services/swear: will reappear as contracts/swear when needed
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/depo.go | 211 | ||||
-rw-r--r-- | swarm/network/forwarding.go | 150 | ||||
-rw-r--r-- | swarm/network/hive.go | 383 | ||||
-rw-r--r-- | swarm/network/kademlia/address.go | 173 | ||||
-rw-r--r-- | swarm/network/kademlia/address_test.go | 96 | ||||
-rw-r--r-- | swarm/network/kademlia/kaddb.go | 351 | ||||
-rw-r--r-- | swarm/network/kademlia/kademlia.go | 429 | ||||
-rw-r--r-- | swarm/network/kademlia/kademlia_test.go | 392 | ||||
-rw-r--r-- | swarm/network/messages.go | 317 | ||||
-rw-r--r-- | swarm/network/protocol.go | 554 | ||||
-rw-r--r-- | swarm/network/protocol_test.go | 17 | ||||
-rw-r--r-- | swarm/network/syncdb.go | 390 | ||||
-rw-r--r-- | swarm/network/syncdb_test.go | 221 | ||||
-rw-r--r-- | swarm/network/syncer.go | 778 |
14 files changed, 4462 insertions, 0 deletions
diff --git a/swarm/network/depo.go b/swarm/network/depo.go new file mode 100644 index 000000000..79987cc6b --- /dev/null +++ b/swarm/network/depo.go @@ -0,0 +1,211 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "bytes" + "encoding/binary" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// Handler for storage/retrieval related protocol requests +// implements the StorageHandler interface used by the bzz protocol +type Depo struct { + hashfunc storage.Hasher + localStore storage.ChunkStore + netStore storage.ChunkStore +} + +func NewDepo(hash storage.Hasher, localStore, remoteStore storage.ChunkStore) *Depo { + return &Depo{ + hashfunc: hash, + localStore: localStore, + netStore: remoteStore, // entrypoint internal + } +} + +// Handles UnsyncedKeysMsg after msg decoding - unsynced hashes upto sync state +// * the remote sync state is just stored and handled in protocol +// * filters through the new syncRequests and send the ones missing +// * back immediately as a deliveryRequest message +// * empty message just pings back for more (is this needed?) +// * strict signed sync states may be needed. +func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error { + unsynced := req.Unsynced + var missing []*syncRequest + var chunk *storage.Chunk + var err error + for _, req := range unsynced { + // skip keys that are found, + chunk, err = self.localStore.Get(storage.Key(req.Key[:])) + if err != nil || chunk.SData == nil { + missing = append(missing, req) + } + } + glog.V(logger.Debug).Infof("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State) + glog.V(logger.Detail).Infof("Depo.HandleUnsyncedKeysMsg: received %v", unsynced) + // send delivery request with missing keys + err = p.deliveryRequest(missing) + if err != nil { + return err + } + // set peers state to persist + p.syncState = req.State + return nil +} + +// Handles deliveryRequestMsg +// * serves actual chunks asked by the remote peer +// by pushing to the delivery queue (sync db) of the correct priority +// (remote peer is free to reprioritize) +// * the message implies remote peer wants more, so trigger for +// * new outgoing unsynced keys message is fired +func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error { + deliver := req.Deliver + // queue the actual delivery of a chunk () + glog.V(logger.Detail).Infof("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver) + for _, sreq := range deliver { + // TODO: look up in cache here or in deliveries + // priorities are taken from the message so the remote party can + // reprioritise to at their leisure + // r = self.pullCached(sreq.Key) // pulls and deletes from cache + Push(p, sreq.Key, sreq.Priority) + } + + // sends it out as unsyncedKeysMsg + p.syncer.sendUnsyncedKeys() + return nil +} + +// the entrypoint for store requests coming from the bzz wire protocol +// if key found locally, return. otherwise +// remote is untrusted, so hash is verified and chunk passed on to NetStore +func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { + req.from = p + chunk, err := self.localStore.Get(req.Key) + switch { + case err != nil: + glog.V(logger.Detail).Infof("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key) + // not found in memory cache, ie., a genuine store request + // create chunk + chunk = storage.NewChunk(req.Key, nil) + + case chunk.SData == nil: + // found chunk in memory store, needs the data, validate now + hasher := self.hashfunc() + hasher.Write(req.SData) + if !bytes.Equal(hasher.Sum(nil), req.Key) { + // data does not validate, ignore + // TODO: peer should be penalised/dropped? + glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req) + return + } + glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req) + + default: + // data is found, store request ignored + // this should update access count? + glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req) + return + } + + // update chunk with size and data + chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data) + chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8])) + glog.V(logger.Detail).Infof("delivery of %p from %v", chunk, p) + chunk.Source = p + self.netStore.Put(chunk) +} + +// entrypoint for retrieve requests coming from the bzz wire protocol +// checks swap balance - return if peer has no credit +func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) { + req.from = p + // swap - record credit for 1 request + // note that only charge actual reqsearches + var err error + if p.swap != nil { + err = p.swap.Add(1) + } + if err != nil { + glog.V(logger.Warn).Infof("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err) + return + } + + // call storage.NetStore#Get which + // blocks until local retrieval finished + // launches cloud retrieval + chunk, _ := self.netStore.Get(req.Key) + req = self.strategyUpdateRequest(chunk.Req, req) + // check if we can immediately deliver + if chunk.SData != nil { + glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log()) + + if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size { + sreq := &storeRequestMsgData{ + Id: req.Id, + Key: chunk.Key, + SData: chunk.SData, + requestTimeout: req.timeout, // + } + p.syncer.addRequest(sreq, DeliverReq) + } else { + glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()) + } + } else { + glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()) + } +} + +// add peer request the chunk and decides the timeout for the response if still searching +func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) { + glog.V(logger.Detail).Infof("Depo.strategyUpdateRequest: key %v", origReq.Key.Log()) + // we do not create an alternative one + req = origReq + if rs != nil { + self.addRequester(rs, req) + req.setTimeout(self.searchTimeout(rs, req)) + } + return +} + +// decides the timeout promise sent with the immediate peers response to a retrieve request +// if timeout is explicitly set and expired +func (self *Depo) searchTimeout(rs *storage.RequestStatus, req *retrieveRequestMsgData) (timeout *time.Time) { + reqt := req.getTimeout() + t := time.Now().Add(searchTimeout) + if reqt != nil && reqt.Before(t) { + return reqt + } else { + return &t + } +} + +/* +adds a new peer to an existing open request +only add if less than requesterCount peers forwarded the same request id so far +note this is done irrespective of status (searching or found) +*/ +func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) { + glog.V(logger.Detail).Infof("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.from, req.Id) + list := rs.Requesters[req.Id] + rs.Requesters[req.Id] = append(list, req) +} diff --git a/swarm/network/forwarding.go b/swarm/network/forwarding.go new file mode 100644 index 000000000..fef79c70b --- /dev/null +++ b/swarm/network/forwarding.go @@ -0,0 +1,150 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "math/rand" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +const requesterCount = 3 + +/* +forwarder implements the CloudStore interface (use by storage.NetStore) +and serves as the cloud store backend orchestrating storage/retrieval/delivery +via the native bzz protocol +which uses an MSB logarithmic distance-based semi-permanent Kademlia table for +* recursive forwarding style routing for retrieval +* smart syncronisation +*/ + +type forwarder struct { + hive *Hive +} + +func NewForwarder(hive *Hive) *forwarder { + return &forwarder{hive: hive} +} + +// generate a unique id uint64 +func generateId() uint64 { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + return uint64(r.Int63()) +} + +var searchTimeout = 3 * time.Second + +// forwarding logic +// logic propagating retrieve requests to peers given by the kademlia hive +func (self *forwarder) Retrieve(chunk *storage.Chunk) { + peers := self.hive.getPeers(chunk.Key, 0) + glog.V(logger.Detail).Infof("forwarder.Retrieve: %v - received %d peers from KΛÐΞMLIΛ...", chunk.Key.Log(), len(peers)) +OUT: + for _, p := range peers { + glog.V(logger.Detail).Infof("forwarder.Retrieve: sending retrieveRequest %v to peer [%v]", chunk.Key.Log(), p) + for _, recipients := range chunk.Req.Requesters { + for _, recipient := range recipients { + req := recipient.(*retrieveRequestMsgData) + if req.from.Addr() == p.Addr() { + continue OUT + } + } + } + req := &retrieveRequestMsgData{ + Key: chunk.Key, + Id: generateId(), + } + var err error + if p.swap != nil { + err = p.swap.Add(-1) + } + if err == nil { + p.retrieve(req) + break OUT + } + glog.V(logger.Warn).Infof("forwarder.Retrieve: unable to send retrieveRequest to peer [%v]: %v", chunk.Key.Log(), err) + } +} + +// requests to specific peers given by the kademlia hive +// except for peers that the store request came from (if any) +// delivery queueing taken care of by syncer +func (self *forwarder) Store(chunk *storage.Chunk) { + var n int + msg := &storeRequestMsgData{ + Key: chunk.Key, + SData: chunk.SData, + } + var source *peer + if chunk.Source != nil { + source = chunk.Source.(*peer) + } + for _, p := range self.hive.getPeers(chunk.Key, 0) { + glog.V(logger.Detail).Infof("forwarder.Store: %v %v", p, chunk) + + if p.syncer != nil && (source == nil || p.Addr() != source.Addr()) { + n++ + Deliver(p, msg, PropagateReq) + } + } + glog.V(logger.Detail).Infof("forwarder.Store: sent to %v peers (chunk = %v)", n, chunk) +} + +// once a chunk is found deliver it to its requesters unless timed out +func (self *forwarder) Deliver(chunk *storage.Chunk) { + // iterate over request entries + for id, requesters := range chunk.Req.Requesters { + counter := requesterCount + msg := &storeRequestMsgData{ + Key: chunk.Key, + SData: chunk.SData, + } + var n int + var req *retrieveRequestMsgData + // iterate over requesters with the same id + for id, r := range requesters { + req = r.(*retrieveRequestMsgData) + if req.timeout == nil || req.timeout.After(time.Now()) { + glog.V(logger.Detail).Infof("forwarder.Deliver: %v -> %v", req.Id, req.from) + msg.Id = uint64(id) + Deliver(req.from, msg, DeliverReq) + n++ + counter-- + if counter <= 0 { + break + } + } + } + glog.V(logger.Detail).Infof("forwarder.Deliver: submit chunk %v (request id %v) for delivery to %v peers", chunk.Key.Log(), id, n) + } +} + +// initiate delivery of a chunk to a particular peer via syncer#addRequest +// depending on syncer mode and priority settings and sync request type +// this either goes via confirmation roundtrip or queued or pushed directly +func Deliver(p *peer, req interface{}, ty int) { + p.syncer.addRequest(req, ty) +} + +// push chunk over to peer +func Push(p *peer, key storage.Key, priority uint) { + p.syncer.doDelivery(key, priority, p.syncer.quit) +} diff --git a/swarm/network/hive.go b/swarm/network/hive.go new file mode 100644 index 000000000..f5ebdd008 --- /dev/null +++ b/swarm/network/hive.go @@ -0,0 +1,383 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "fmt" + "math/rand" + "path/filepath" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/swarm/network/kademlia" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// Hive is the logistic manager of the swarm +// it uses a generic kademlia nodetable to find best peer list +// for any target +// this is used by the netstore to search for content in the swarm +// the bzz protocol peersMsgData exchange is relayed to Kademlia +// for db storage and filtering +// connections and disconnections are reported and relayed +// to keep the nodetable uptodate + +type Hive struct { + listenAddr func() string + callInterval uint64 + id discover.NodeID + addr kademlia.Address + kad *kademlia.Kademlia + path string + quit chan bool + toggle chan bool + more chan bool + + // for testing only + swapEnabled bool + syncEnabled bool + blockRead bool + blockWrite bool +} + +const ( + callInterval = 3000000000 + // bucketSize = 3 + // maxProx = 8 + // proxBinSize = 4 +) + +type HiveParams struct { + CallInterval uint64 + KadDbPath string + *kademlia.KadParams +} + +func NewHiveParams(path string) *HiveParams { + kad := kademlia.NewKadParams() + // kad.BucketSize = bucketSize + // kad.MaxProx = maxProx + // kad.ProxBinSize = proxBinSize + + return &HiveParams{ + CallInterval: callInterval, + KadDbPath: filepath.Join(path, "bzz-peers.json"), + KadParams: kad, + } +} + +func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive { + kad := kademlia.New(kademlia.Address(addr), params.KadParams) + return &Hive{ + callInterval: params.CallInterval, + kad: kad, + addr: kad.Addr(), + path: params.KadDbPath, + swapEnabled: swapEnabled, + syncEnabled: syncEnabled, + } +} + +func (self *Hive) SyncEnabled(on bool) { + self.syncEnabled = on +} + +func (self *Hive) SwapEnabled(on bool) { + self.swapEnabled = on +} + +func (self *Hive) BlockNetworkRead(on bool) { + self.blockRead = on +} + +func (self *Hive) BlockNetworkWrite(on bool) { + self.blockWrite = on +} + +// public accessor to the hive base address +func (self *Hive) Addr() kademlia.Address { + return self.addr +} + +// Start receives network info only at startup +// listedAddr is a function to retrieve listening address to advertise to peers +// connectPeer is a function to connect to a peer based on its NodeID or enode URL +// there are called on the p2p.Server which runs on the node +func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) { + self.toggle = make(chan bool) + self.more = make(chan bool) + self.quit = make(chan bool) + self.id = id + self.listenAddr = listenAddr + err = self.kad.Load(self.path, nil) + if err != nil { + glog.V(logger.Warn).Infof("Warning: error reading kaddb '%s' (skipping): %v", self.path, err) + err = nil + } + // this loop is doing bootstrapping and maintains a healthy table + go self.keepAlive() + go func() { + // whenever toggled ask kademlia about most preferred peer + for alive := range self.more { + if !alive { + // receiving false closes the loop while allowing parallel routines + // to attempt to write to more (remove Peer when shutting down) + return + } + node, need, proxLimit := self.kad.Suggest() + + if node != nil && len(node.Url) > 0 { + glog.V(logger.Detail).Infof("call known bee %v", node.Url) + // enode or any lower level connection address is unnecessary in future + // discovery table is used to look it up. + connectPeer(node.Url) + } + if need { + // a random peer is taken from the table + peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1) + if len(peers) > 0 { + // a random address at prox bin 0 is sent for lookup + randAddr := kademlia.RandomAddressAt(self.addr, proxLimit) + req := &retrieveRequestMsgData{ + Key: storage.Key(randAddr[:]), + } + glog.V(logger.Detail).Infof("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0]) + peers[0].(*peer).retrieve(req) + } else { + glog.V(logger.Warn).Infof("no peer") + } + glog.V(logger.Detail).Infof("buzz kept alive") + } else { + glog.V(logger.Info).Infof("no need for more bees") + } + select { + case self.toggle <- need: + case <-self.quit: + return + } + glog.V(logger.Debug).Infof("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount()) + } + }() + return +} + +// keepAlive is a forever loop +// in its awake state it periodically triggers connection attempts +// by writing to self.more until Kademlia Table is saturated +// wake state is toggled by writing to self.toggle +// it restarts if the table becomes non-full again due to disconnections +func (self *Hive) keepAlive() { + alarm := time.NewTicker(time.Duration(self.callInterval)).C + for { + select { + case <-alarm: + if self.kad.DBCount() > 0 { + select { + case self.more <- true: + glog.V(logger.Debug).Infof("buzz wakeup") + default: + } + } + case need := <-self.toggle: + if alarm == nil && need { + alarm = time.NewTicker(time.Duration(self.callInterval)).C + } + if alarm != nil && !need { + alarm = nil + + } + case <-self.quit: + return + } + } +} + +func (self *Hive) Stop() error { + // closing toggle channel quits the updateloop + close(self.quit) + return self.kad.Save(self.path, saveSync) +} + +// called at the end of a successful protocol handshake +func (self *Hive) addPeer(p *peer) error { + defer func() { + select { + case self.more <- true: + default: + } + }() + glog.V(logger.Detail).Infof("hi new bee %v", p) + err := self.kad.On(p, loadSync) + if err != nil { + return err + } + // self lookup (can be encoded as nil/zero key since peers addr known) + no id () + // the most common way of saying hi in bzz is initiation of gossip + // let me know about anyone new from my hood , here is the storageradius + // to send the 6 byte self lookup + // we do not record as request or forward it, just reply with peers + p.retrieve(&retrieveRequestMsgData{}) + glog.V(logger.Detail).Infof("'whatsup wheresdaparty' sent to %v", p) + + return nil +} + +// called after peer disconnected +func (self *Hive) removePeer(p *peer) { + glog.V(logger.Debug).Infof("bee %v removed", p) + self.kad.Off(p, saveSync) + select { + case self.more <- true: + default: + } + if self.kad.Count() == 0 { + glog.V(logger.Debug).Infof("empty, all bees gone") + } +} + +// Retrieve a list of live peers that are closer to target than us +func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) { + var addr kademlia.Address + copy(addr[:], target[:]) + for _, node := range self.kad.FindClosest(addr, max) { + peers = append(peers, node.(*peer)) + } + return +} + +// disconnects all the peers +func (self *Hive) DropAll() { + glog.V(logger.Info).Infof("dropping all bees") + for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) { + node.Drop() + } +} + +// contructor for kademlia.NodeRecord based on peer address alone +// TODO: should go away and only addr passed to kademlia +func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord { + now := time.Now() + return &kademlia.NodeRecord{ + Addr: addr.Addr, + Url: addr.String(), + Seen: now, + After: now, + } +} + +// called by the protocol when receiving peerset (for target address) +// peersMsgData is converted to a slice of NodeRecords for Kademlia +// this is to store all thats needed +func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) { + var nrs []*kademlia.NodeRecord + for _, p := range req.Peers { + nrs = append(nrs, newNodeRecord(p)) + } + self.kad.Add(nrs) +} + +// peer wraps the protocol instance to represent a connected peer +// it implements kademlia.Node interface +type peer struct { + *bzz // protocol instance running on peer connection +} + +// protocol instance implements kademlia.Node interface (embedded peer) +func (self *peer) Addr() kademlia.Address { + return self.remoteAddr.Addr +} + +func (self *peer) Url() string { + return self.remoteAddr.String() +} + +// TODO take into account traffic +func (self *peer) LastActive() time.Time { + return self.lastActive +} + +// reads the serialised form of sync state persisted as the 'Meta' attribute +// and sets the decoded syncState on the online node +func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error { + p, ok := node.(*peer) + if !ok { + return fmt.Errorf("invalid type") + } + if record.Meta == nil { + glog.V(logger.Debug).Infof("no sync state for node record %v setting default", record) + p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}} + return nil + } + state, err := decodeSync(record.Meta) + if err != nil { + return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err) + } + glog.V(logger.Detail).Infof("sync state for node record %v read from Meta: %s", record, string(*(record.Meta))) + p.syncState = state + return err +} + +// callback when saving a sync state +func saveSync(record *kademlia.NodeRecord, node kademlia.Node) { + if p, ok := node.(*peer); ok { + meta, err := encodeSync(p.syncState) + if err != nil { + glog.V(logger.Warn).Infof("error saving sync state for %v: %v", node, err) + return + } + glog.V(logger.Detail).Infof("saved sync state for %v: %s", node, string(*meta)) + record.Meta = meta + } +} + +// the immediate response to a retrieve request, +// sends relevant peer data given by the kademlia hive to the requester +// TODO: remember peers sent for duration of the session, only new peers sent +func (self *Hive) peers(req *retrieveRequestMsgData) { + if req != nil && req.MaxPeers >= 0 { + var addrs []*peerAddr + if req.timeout == nil || time.Now().Before(*(req.timeout)) { + key := req.Key + // self lookup from remote peer + if storage.IsZeroKey(key) { + addr := req.from.Addr() + key = storage.Key(addr[:]) + req.Key = nil + } + // get peer addresses from hive + for _, peer := range self.getPeers(key, int(req.MaxPeers)) { + addrs = append(addrs, peer.remoteAddr) + } + glog.V(logger.Debug).Infof("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log()) + + peersData := &peersMsgData{ + Peers: addrs, + Key: req.Key, + Id: req.Id, + } + peersData.setTimeout(req.timeout) + req.from.peers(peersData) + } + } +} + +func (self *Hive) String() string { + return self.kad.String() +} diff --git a/swarm/network/kademlia/address.go b/swarm/network/kademlia/address.go new file mode 100644 index 000000000..16c5ce532 --- /dev/null +++ b/swarm/network/kademlia/address.go @@ -0,0 +1,173 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "fmt" + "math/rand" + "strings" + + "github.com/ethereum/go-ethereum/common" +) + +type Address common.Hash + +func (a Address) String() string { + return fmt.Sprintf("%x", a[:]) +} + +func (a *Address) MarshalJSON() (out []byte, err error) { + return []byte(`"` + a.String() + `"`), nil +} + +func (a *Address) UnmarshalJSON(value []byte) error { + *a = Address(common.HexToHash(string(value[1 : len(value)-1]))) + return nil +} + +// the string form of the binary representation of an address (only first 8 bits) +func (a Address) Bin() string { + var bs []string + for _, b := range a[:] { + bs = append(bs, fmt.Sprintf("%08b", b)) + } + return strings.Join(bs, "") +} + +/* +Proximity(x, y) returns the proximity order of the MSB distance between x and y + +The distance metric MSB(x, y) of two equal length byte sequences x an y is the +value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. +the binary cast is big endian: most significant bit first (=MSB). + +Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. +It is defined as the reverse rank of the integer part of the base 2 +logarithm of the distance. +It is calculated by counting the number of common leading zeros in the (MSB) +binary representation of the x^y. + +(0 farthest, 255 closest, 256 self) +*/ +func proximity(one, other Address) (ret int) { + for i := 0; i < len(one); i++ { + oxo := one[i] ^ other[i] + for j := 0; j < 8; j++ { + if (uint8(oxo)>>uint8(7-j))&0x01 != 0 { + return i*8 + j + } + } + } + return len(one) * 8 +} + +// Address.ProxCmp compares the distances a->target and b->target. +// Returns -1 if a is closer to target, 1 if b is closer to target +// and 0 if they are equal. +func (target Address) ProxCmp(a, b Address) int { + for i := range target { + da := a[i] ^ target[i] + db := b[i] ^ target[i] + if da > db { + return 1 + } else if da < db { + return -1 + } + } + return 0 +} + +// randomAddressAt(address, prox) generates a random address +// at proximity order prox relative to address +// if prox is negative a random address is generated +func RandomAddressAt(self Address, prox int) (addr Address) { + addr = self + var pos int + if prox >= 0 { + pos = prox / 8 + trans := prox % 8 + transbytea := byte(0) + for j := 0; j <= trans; j++ { + transbytea |= 1 << uint8(7-j) + } + flipbyte := byte(1 << uint8(7-trans)) + transbyteb := transbytea ^ byte(255) + randbyte := byte(rand.Intn(255)) + addr[pos] = ((addr[pos] & transbytea) ^ flipbyte) | randbyte&transbyteb + } + for i := pos + 1; i < len(addr); i++ { + addr[i] = byte(rand.Intn(255)) + } + + return +} + +// KeyRange(a0, a1, proxLimit) returns the address inclusive address +// range that contain addresses closer to one than other +func KeyRange(one, other Address, proxLimit int) (start, stop Address) { + prox := proximity(one, other) + if prox >= proxLimit { + prox = proxLimit + } + start = CommonBitsAddrByte(one, other, byte(0x00), prox) + stop = CommonBitsAddrByte(one, other, byte(0xff), prox) + return +} + +func CommonBitsAddrF(self, other Address, f func() byte, p int) (addr Address) { + prox := proximity(self, other) + var pos int + if p <= prox { + prox = p + } + pos = prox / 8 + addr = self + trans := byte(prox % 8) + var transbytea byte + if p > prox { + transbytea = byte(0x7f) + } else { + transbytea = byte(0xff) + } + transbytea >>= trans + transbyteb := transbytea ^ byte(0xff) + addrpos := addr[pos] + addrpos &= transbyteb + if p > prox { + addrpos ^= byte(0x80 >> trans) + } + addrpos |= transbytea & f() + addr[pos] = addrpos + for i := pos + 1; i < len(addr); i++ { + addr[i] = f() + } + + return +} + +func CommonBitsAddr(self, other Address, prox int) (addr Address) { + return CommonBitsAddrF(self, other, func() byte { return byte(rand.Intn(255)) }, prox) +} + +func CommonBitsAddrByte(self, other Address, b byte, prox int) (addr Address) { + return CommonBitsAddrF(self, other, func() byte { return b }, prox) +} + +// randomAddressAt() generates a random address +func RandomAddress() Address { + return RandomAddressAt(Address{}, -1) +} diff --git a/swarm/network/kademlia/address_test.go b/swarm/network/kademlia/address_test.go new file mode 100644 index 000000000..c062c8eaf --- /dev/null +++ b/swarm/network/kademlia/address_test.go @@ -0,0 +1,96 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "math/rand" + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +func (Address) Generate(rand *rand.Rand, size int) reflect.Value { + var id Address + for i := 0; i < len(id); i++ { + id[i] = byte(uint8(rand.Intn(255))) + } + return reflect.ValueOf(id) +} + +func TestCommonBitsAddrF(t *testing.T) { + a := Address(common.HexToHash("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + b := Address(common.HexToHash("0x8123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + c := Address(common.HexToHash("0x4123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + d := Address(common.HexToHash("0x0023456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + e := Address(common.HexToHash("0x01A3456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")) + ab := CommonBitsAddrF(a, b, func() byte { return byte(0x00) }, 10) + expab := Address(common.HexToHash("0x8000000000000000000000000000000000000000000000000000000000000000")) + + if ab != expab { + t.Fatalf("%v != %v", ab, expab) + } + ac := CommonBitsAddrF(a, c, func() byte { return byte(0x00) }, 10) + expac := Address(common.HexToHash("0x4000000000000000000000000000000000000000000000000000000000000000")) + + if ac != expac { + t.Fatalf("%v != %v", ac, expac) + } + ad := CommonBitsAddrF(a, d, func() byte { return byte(0x00) }, 10) + expad := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")) + + if ad != expad { + t.Fatalf("%v != %v", ad, expad) + } + ae := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 10) + expae := Address(common.HexToHash("0x0180000000000000000000000000000000000000000000000000000000000000")) + + if ae != expae { + t.Fatalf("%v != %v", ae, expae) + } + acf := CommonBitsAddrF(a, c, func() byte { return byte(0xff) }, 10) + expacf := Address(common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + + if acf != expacf { + t.Fatalf("%v != %v", acf, expacf) + } + aeo := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 2) + expaeo := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")) + + if aeo != expaeo { + t.Fatalf("%v != %v", aeo, expaeo) + } + aep := CommonBitsAddrF(a, e, func() byte { return byte(0xff) }, 2) + expaep := Address(common.HexToHash("0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + + if aep != expaep { + t.Fatalf("%v != %v", aep, expaep) + } + +} + +func TestRandomAddressAt(t *testing.T) { + var a Address + for i := 0; i < 100; i++ { + a = RandomAddress() + prox := rand.Intn(255) + b := RandomAddressAt(a, prox) + if proximity(a, b) != prox { + t.Fatalf("incorrect address prox(%v, %v) == %v (expected %v)", a, b, proximity(a, b), prox) + } + } +} diff --git a/swarm/network/kademlia/kaddb.go b/swarm/network/kademlia/kaddb.go new file mode 100644 index 000000000..53a7db451 --- /dev/null +++ b/swarm/network/kademlia/kaddb.go @@ -0,0 +1,351 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "sync" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +type NodeData interface { + json.Marshaler + json.Unmarshaler +} + +// allow inactive peers under +type NodeRecord struct { + Addr Address // address of node + Url string // Url, used to connect to node + After time.Time // next call after time + Seen time.Time // last connected at time + Meta *json.RawMessage // arbitrary metadata saved for a peer + + node Node +} + +func (self *NodeRecord) setSeen() { + t := time.Now() + self.Seen = t + self.After = t +} + +func (self *NodeRecord) String() string { + return fmt.Sprintf("<%v>", self.Addr) +} + +// persisted node record database () +type KadDb struct { + Address Address + Nodes [][]*NodeRecord + index map[Address]*NodeRecord + cursors []int + lock sync.RWMutex + purgeInterval time.Duration + initialRetryInterval time.Duration + connRetryExp int +} + +func newKadDb(addr Address, params *KadParams) *KadDb { + return &KadDb{ + Address: addr, + Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load + cursors: make([]int, params.MaxProx+1), + index: make(map[Address]*NodeRecord), + purgeInterval: params.PurgeInterval, + initialRetryInterval: params.InitialRetryInterval, + connRetryExp: params.ConnRetryExp, + } +} + +func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord { + defer self.lock.Unlock() + self.lock.Lock() + + record, found := self.index[a] + if !found { + record = &NodeRecord{ + Addr: a, + Url: url, + } + glog.V(logger.Info).Infof("add new record %v to kaddb", record) + // insert in kaddb + self.index[a] = record + self.Nodes[index] = append(self.Nodes[index], record) + } else { + glog.V(logger.Info).Infof("found record %v in kaddb", record) + } + // update last seen time + record.setSeen() + // update with url in case IP/port changes + record.Url = url + return record +} + +// add adds node records to kaddb (persisted node record db) +func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) { + defer self.lock.Unlock() + self.lock.Lock() + var n int + var nodes []*NodeRecord + for _, node := range nrs { + _, found := self.index[node.Addr] + if !found && node.Addr != self.Address { + node.setSeen() + self.index[node.Addr] = node + index := proximityBin(node.Addr) + dbcursor := self.cursors[index] + nodes = self.Nodes[index] + // this is inefficient for allocation, need to just append then shift + newnodes := make([]*NodeRecord, len(nodes)+1) + copy(newnodes[:], nodes[:dbcursor]) + newnodes[dbcursor] = node + copy(newnodes[dbcursor+1:], nodes[dbcursor:]) + glog.V(logger.Detail).Infof("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes) + self.Nodes[index] = newnodes + n++ + } + } + if n > 0 { + glog.V(logger.Debug).Infof("%d/%d node records (new/known)", n, len(nrs)) + } +} + +/* +next return one node record with the highest priority for desired +connection. +This is used to pick candidates for live nodes that are most wanted for +a higly connected low centrality network structure for Swarm which best suits +for a Kademlia-style routing. + +* Starting as naive node with empty db, this implements Kademlia bootstrapping +* As a mature node, it fills short lines. All on demand. + +The candidate is chosen using the following strategy: +We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds. +On each round we proceed from the low to high proximity order buckets. +If the number of active nodes (=connected peers) is < rounds, then start looking +for a known candidate. To determine if there is a candidate to recommend the +kaddb node record database row corresponding to the bucket is checked. + +If the row cursor is on position i, the ith element in the row is chosen. +If the record is scheduled not to be retried before NOW, the next element is taken. +If the record is scheduled to be retried, it is set as checked, scheduled for +checking and is returned. The time of the next check is in X (duration) such that +X = ConnRetryExp * delta where delta is the time past since the last check and +ConnRetryExp is constant obsoletion factor. (Note that when node records are added +from peer messages, they are marked as checked and placed at the cursor, ie. +given priority over older entries). Entries which were checked more than +purgeInterval ago are deleted from the kaddb row. If no candidate is found after +a full round of checking the next bucket up is considered. If no candidate is +found when we reach the maximum-proximity bucket, the next round starts. + +node record a is more favoured to b a > b iff a is a passive node (record of +offline past peer) +|proxBin(a)| < |proxBin(b)| +|| (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|) +|| (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b)) + + +The second argument returned names the first missing slot found +*/ +func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) { + // return nil, proxLimit indicates that all buckets are filled + defer self.lock.Unlock() + self.lock.Lock() + + var interval time.Duration + var found bool + var purge []bool + var delta time.Duration + var cursor int + var count int + var after time.Time + + // iterate over columns maximum bucketsize times + for rounds := 1; rounds <= maxBinSize; rounds++ { + ROUND: + // iterate over rows from PO 0 upto MaxProx + for po, dbrow := range self.Nodes { + // if row has rounds connected peers, then take the next + if binSize(po) >= rounds { + continue ROUND + } + if !need { + // set proxlimit to the PO where the first missing slot is found + proxLimit = po + need = true + } + purge = make([]bool, len(dbrow)) + + // there is a missing slot - finding a node to connect to + // select a node record from the relavant kaddb row (of identical prox order) + ROW: + for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) { + count++ + node = dbrow[cursor] + + // skip already connected nodes + if node.node != nil { + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow)) + continue ROW + } + + // if node is scheduled to connect + if time.Time(node.After).After(time.Now()) { + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After) + continue ROW + } + + delta = time.Since(time.Time(node.Seen)) + if delta < self.initialRetryInterval { + delta = self.initialRetryInterval + } + if delta > self.purgeInterval { + // remove node + purge[cursor] = true + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen) + continue ROW + } + + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After) + + // scheduling next check + interval = time.Duration(delta * time.Duration(self.connRetryExp)) + after = time.Now().Add(interval) + + glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval) + node.After = after + found = true + } // ROW + self.cursors[po] = cursor + self.delete(po, purge) + if found { + return node, need, proxLimit + } + } // ROUND + } // ROUNDS + + return nil, need, proxLimit +} + +// deletes the noderecords of a kaddb row corresponding to the indexes +// caller must hold the dblock +// the call is unsafe, no index checks +func (self *KadDb) delete(row int, purge []bool) { + var nodes []*NodeRecord + dbrow := self.Nodes[row] + for i, del := range purge { + if i == self.cursors[row] { + //reset cursor + self.cursors[row] = len(nodes) + } + // delete the entry to be purged + if del { + delete(self.index, dbrow[i].Addr) + continue + } + // otherwise append to new list + nodes = append(nodes, dbrow[i]) + } + self.Nodes[row] = nodes +} + +// save persists kaddb on disk (written to file on path in json format. +func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error { + defer self.lock.Unlock() + self.lock.Lock() + + var n int + + for _, b := range self.Nodes { + for _, node := range b { + n++ + node.After = time.Now() + node.Seen = time.Now() + if cb != nil { + cb(node, node.node) + } + } + } + + data, err := json.MarshalIndent(self, "", " ") + if err != nil { + return err + } + err = ioutil.WriteFile(path, data, os.ModePerm) + if err != nil { + glog.V(logger.Warn).Infof("unable to save kaddb with %v nodes to %v: err", n, path, err) + } else { + glog.V(logger.Info).Infof("saved kaddb with %v nodes to %v", n, path) + } + return err +} + +// Load(path) loads the node record database (kaddb) from file on path. +func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) { + defer self.lock.Unlock() + self.lock.Lock() + + var data []byte + data, err = ioutil.ReadFile(path) + if err != nil { + return + } + + err = json.Unmarshal(data, self) + if err != nil { + return + } + var n int + var purge []bool + for po, b := range self.Nodes { + purge = make([]bool, len(b)) + ROW: + for i, node := range b { + if cb != nil { + err = cb(node, node.node) + if err != nil { + purge[i] = true + continue ROW + } + } + n++ + if (node.After == time.Time{}) { + node.After = time.Now() + } + self.index[node.Addr] = node + } + self.delete(po, purge) + } + glog.V(logger.Info).Infof("loaded kaddb with %v nodes from %v", n, path) + + return +} + +// accessor for KAD offline db count +func (self *KadDb) count() int { + defer self.lock.Unlock() + self.lock.Lock() + return len(self.index) +} diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go new file mode 100644 index 000000000..87c57cefe --- /dev/null +++ b/swarm/network/kademlia/kademlia.go @@ -0,0 +1,429 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +const ( + bucketSize = 4 + proxBinSize = 2 + maxProx = 8 + connRetryExp = 2 + maxPeers = 100 +) + +var ( + purgeInterval = 42 * time.Hour + initialRetryInterval = 42 * time.Millisecond + maxIdleInterval = 42 * 1000 * time.Millisecond + // maxIdleInterval = 42 * 10 0 * time.Millisecond +) + +type KadParams struct { + // adjustable parameters + MaxProx int + ProxBinSize int + BucketSize int + PurgeInterval time.Duration + InitialRetryInterval time.Duration + MaxIdleInterval time.Duration + ConnRetryExp int +} + +func NewKadParams() *KadParams { + return &KadParams{ + MaxProx: maxProx, + ProxBinSize: proxBinSize, + BucketSize: bucketSize, + PurgeInterval: purgeInterval, + InitialRetryInterval: initialRetryInterval, + MaxIdleInterval: maxIdleInterval, + ConnRetryExp: connRetryExp, + } +} + +// Kademlia is a table of active nodes +type Kademlia struct { + addr Address // immutable baseaddress of the table + *KadParams // Kademlia configuration parameters + proxLimit int // state, the PO of the first row of the most proximate bin + proxSize int // state, the number of peers in the most proximate bin + count int // number of active peers (w live connection) + buckets [][]Node // the actual bins + db *KadDb // kaddb, node record database + lock sync.RWMutex // mutex to access buckets +} + +type Node interface { + Addr() Address + Url() string + LastActive() time.Time + Drop() +} + +// public constructor +// add is the base address of the table +// params is KadParams configuration +func New(addr Address, params *KadParams) *Kademlia { + buckets := make([][]Node, params.MaxProx+1) + return &Kademlia{ + addr: addr, + KadParams: params, + buckets: buckets, + db: newKadDb(addr, params), + } +} + +// accessor for KAD base address +func (self *Kademlia) Addr() Address { + return self.addr +} + +// accessor for KAD active node count +func (self *Kademlia) Count() int { + defer self.lock.Unlock() + self.lock.Lock() + return self.count +} + +// accessor for KAD active node count +func (self *Kademlia) DBCount() int { + return self.db.count() +} + +// On is the entry point called when a new nodes is added +// unsafe in that node is not checked to be already active node (to be called once) +func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) { + glog.V(logger.Warn).Infof("%v", self) + defer self.lock.Unlock() + self.lock.Lock() + + index := self.proximityBin(node.Addr()) + record := self.db.findOrCreate(index, node.Addr(), node.Url()) + + if cb != nil { + err = cb(record, node) + glog.V(logger.Detail).Infof("cb(%v, %v) ->%v", record, node, err) + if err != nil { + return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err) + } + glog.V(logger.Debug).Infof("add node record %v with node %v", record, node) + } + + // insert in kademlia table of active nodes + bucket := self.buckets[index] + // if bucket is full insertion replaces the worst node + // TODO: give priority to peers with active traffic + if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation + self.buckets[index] = append(bucket, node) + glog.V(logger.Debug).Infof("add node %v to table", node) + self.setProxLimit(index, true) + record.node = node + self.count++ + return nil + } + + // always rotate peers + idle := self.MaxIdleInterval + var pos int + var replaced Node + for i, p := range bucket { + idleInt := time.Since(p.LastActive()) + if idleInt > idle { + idle = idleInt + pos = i + replaced = p + } + } + if replaced == nil { + glog.V(logger.Debug).Infof("all peers wanted, PO%03d bucket full", index) + return fmt.Errorf("bucket full") + } + glog.V(logger.Debug).Infof("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval) + replaced.Drop() + // actually replace in the row. When off(node) is called, the peer is no longer in the row + bucket[pos] = node + // there is no change in bucket cardinalities so no prox limit adjustment is needed + record.node = node + self.count++ + return nil + +} + +// Off is the called when a node is taken offline (from the protocol main loop exit) +func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) { + self.lock.Lock() + defer self.lock.Unlock() + + index := self.proximityBin(node.Addr()) + bucket := self.buckets[index] + for i := 0; i < len(bucket); i++ { + if node.Addr() == bucket[i].Addr() { + self.buckets[index] = append(bucket[:i], bucket[(i+1):]...) + self.setProxLimit(index, false) + break + } + } + + record := self.db.index[node.Addr()] + // callback on remove + if cb != nil { + cb(record, record.node) + } + record.node = nil + self.count-- + glog.V(logger.Debug).Infof("remove node %v from table, population now is %v", node, self.count) + + return +} + +// proxLimit is dynamically adjusted so that +// 1) there is no empty buckets in bin < proxLimit and +// 2) the sum of all items are the minimum possible but higher than ProxBinSize +// adjust Prox (proxLimit and proxSize after an insertion/removal of nodes) +// caller holds the lock +func (self *Kademlia) setProxLimit(r int, on bool) { + // if the change is outside the core (PO lower) + // and the change does not leave a bucket empty then + // no adjustment needed + if r < self.proxLimit && len(self.buckets[r]) > 0 { + return + } + // if on=a node was added, then r must be within prox limit so increment cardinality + if on { + self.proxSize++ + curr := len(self.buckets[self.proxLimit]) + // if now core is big enough without the furthest bucket, then contract + // this can result in more than one bucket change + for self.proxSize >= self.ProxBinSize+curr && curr > 0 { + self.proxSize -= curr + self.proxLimit++ + curr = len(self.buckets[self.proxLimit]) + + glog.V(logger.Detail).Infof("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r) + } + return + } + // otherwise + if r >= self.proxLimit { + self.proxSize-- + } + // expand core by lowering prox limit until hit zero or cover the empty bucket or reached target cardinality + for (self.proxSize < self.ProxBinSize || r < self.proxLimit) && + self.proxLimit > 0 { + // + self.proxLimit-- + self.proxSize += len(self.buckets[self.proxLimit]) + glog.V(logger.Detail).Infof("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r) + } +} + +/* +returns the list of nodes belonging to the same proximity bin +as the target. The most proximate bin will be the union of the bins between +proxLimit and MaxProx. +*/ +func (self *Kademlia) FindClosest(target Address, max int) []Node { + self.lock.Lock() + defer self.lock.Unlock() + + r := nodesByDistance{ + target: target, + } + + po := self.proximityBin(target) + index := po + step := 1 + glog.V(logger.Detail).Infof("serving %v nodes at %v (PO%02d)", max, index, po) + + // if max is set to 0, just want a full bucket, dynamic number + min := max + // set limit to max + limit := max + if max == 0 { + min = 1 + limit = maxPeers + } + + var n int + for index >= 0 { + // add entire bucket + for _, p := range self.buckets[index] { + r.push(p, limit) + n++ + } + // terminate if index reached the bottom or enough peers > min + glog.V(logger.Detail).Infof("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po) + if n >= min && (step < 0 || max == 0) { + break + } + // reach top most non-empty PO bucket, turn around + if index == self.MaxProx { + index = po + step = -1 + } + index += step + } + glog.V(logger.Detail).Infof("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po) + return r.nodes +} + +func (self *Kademlia) Suggest() (*NodeRecord, bool, int) { + defer self.lock.RUnlock() + self.lock.RLock() + return self.db.findBest(self.BucketSize, func(i int) int { return len(self.buckets[i]) }) +} + +// adds node records to kaddb (persisted node record db) +func (self *Kademlia) Add(nrs []*NodeRecord) { + self.db.add(nrs, self.proximityBin) +} + +// nodesByDistance is a list of nodes, ordered by distance to target. +type nodesByDistance struct { + nodes []Node + target Address +} + +func sortedByDistanceTo(target Address, slice []Node) bool { + var last Address + for i, node := range slice { + if i > 0 { + if target.ProxCmp(node.Addr(), last) < 0 { + return false + } + } + last = node.Addr() + } + return true +} + +// push(node, max) adds the given node to the list, keeping the total size +// below max elements. +func (h *nodesByDistance) push(node Node, max int) { + // returns the firt index ix such that func(i) returns true + ix := sort.Search(len(h.nodes), func(i int) bool { + return h.target.ProxCmp(h.nodes[i].Addr(), node.Addr()) >= 0 + }) + + if len(h.nodes) < max { + h.nodes = append(h.nodes, node) + } + if ix < len(h.nodes) { + copy(h.nodes[ix+1:], h.nodes[ix:]) + h.nodes[ix] = node + } +} + +/* +Taking the proximity order relative to a fix point x classifies the points in +the space (n byte long byte sequences) into bins. Items in each are at +most half as distant from x as items in the previous bin. Given a sample of +uniformly distributed items (a hash function over arbitrary sequence) the +proximity scale maps onto series of subsets with cardinalities on a negative +exponential scale. + +It also has the property that any two item belonging to the same bin are at +most half as distant from each other as they are from x. + +If we think of random sample of items in the bins as connections in a network of interconnected nodes than relative proximity can serve as the basis for local +decisions for graph traversal where the task is to find a route between two +points. Since in every hop, the finite distance halves, there is +a guaranteed constant maximum limit on the number of hops needed to reach one +node from the other. +*/ + +func (self *Kademlia) proximityBin(other Address) (ret int) { + ret = proximity(self.addr, other) + if ret > self.MaxProx { + ret = self.MaxProx + } + return +} + +// provides keyrange for chunk db iteration +func (self *Kademlia) KeyRange(other Address) (start, stop Address) { + defer self.lock.RUnlock() + self.lock.RLock() + return KeyRange(self.addr, other, self.proxLimit) +} + +// save persists kaddb on disk (written to file on path in json format. +func (self *Kademlia) Save(path string, cb func(*NodeRecord, Node)) error { + return self.db.save(path, cb) +} + +// Load(path) loads the node record database (kaddb) from file on path. +func (self *Kademlia) Load(path string, cb func(*NodeRecord, Node) error) (err error) { + return self.db.load(path, cb) +} + +// kademlia table + kaddb table displayed with ascii +func (self *Kademlia) String() string { + defer self.lock.RUnlock() + self.lock.RLock() + defer self.db.lock.RUnlock() + self.db.lock.RLock() + + var rows []string + rows = append(rows, "=========================================================================") + rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %v", time.Now().UTC().Format(time.UnixDate), self.addr.String()[:6])) + rows = append(rows, fmt.Sprintf("population: %d (%d), proxLimit: %d, proxSize: %d", self.count, len(self.db.index), self.proxLimit, self.proxSize)) + rows = append(rows, fmt.Sprintf("MaxProx: %d, ProxBinSize: %d, BucketSize: %d", self.MaxProx, self.ProxBinSize, self.BucketSize)) + + for i, bucket := range self.buckets { + + if i == self.proxLimit { + rows = append(rows, fmt.Sprintf("============ PROX LIMIT: %d ==========================================", i)) + } + row := []string{fmt.Sprintf("%03d", i), fmt.Sprintf("%2d", len(bucket))} + var k int + c := self.db.cursors[i] + for ; k < len(bucket); k++ { + p := bucket[(c+k)%len(bucket)] + row = append(row, p.Addr().String()[:6]) + if k == 4 { + break + } + } + for ; k < 4; k++ { + row = append(row, " ") + } + row = append(row, fmt.Sprintf("| %2d %2d", len(self.db.Nodes[i]), self.db.cursors[i])) + + for j, p := range self.db.Nodes[i] { + row = append(row, p.Addr.String()[:6]) + if j == 3 { + break + } + } + rows = append(rows, strings.Join(row, " ")) + if i == self.MaxProx { + } + } + rows = append(rows, "=========================================================================") + return strings.Join(rows, "\n") +} diff --git a/swarm/network/kademlia/kademlia_test.go b/swarm/network/kademlia/kademlia_test.go new file mode 100644 index 000000000..66edfe654 --- /dev/null +++ b/swarm/network/kademlia/kademlia_test.go @@ -0,0 +1,392 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package kademlia + +import ( + "fmt" + "math" + "math/rand" + "os" + "path/filepath" + "reflect" + "testing" + "testing/quick" + "time" +) + +var ( + quickrand = rand.New(rand.NewSource(time.Now().Unix())) + quickcfgFindClosest = &quick.Config{MaxCount: 50, Rand: quickrand} + quickcfgBootStrap = &quick.Config{MaxCount: 100, Rand: quickrand} +) + +type testNode struct { + addr Address +} + +func (n *testNode) String() string { + return fmt.Sprintf("%x", n.addr[:]) +} + +func (n *testNode) Addr() Address { + return n.addr +} + +func (n *testNode) Drop() { +} + +func (n *testNode) Url() string { + return "" +} + +func (n *testNode) LastActive() time.Time { + return time.Now() +} + +func TestOn(t *testing.T) { + addr, ok := gen(Address{}, quickrand).(Address) + other, ok := gen(Address{}, quickrand).(Address) + if !ok { + t.Errorf("oops") + } + kad := New(addr, NewKadParams()) + err := kad.On(&testNode{addr: other}, nil) + _ = err +} + +func TestBootstrap(t *testing.T) { + + test := func(test *bootstrapTest) bool { + // for any node kad.le, Target and N + params := NewKadParams() + params.MaxProx = test.MaxProx + params.BucketSize = test.BucketSize + params.ProxBinSize = test.BucketSize + kad := New(test.Self, params) + var err error + + for p := 0; p < 9; p++ { + var nrs []*NodeRecord + n := math.Pow(float64(2), float64(7-p)) + for i := 0; i < int(n); i++ { + addr := RandomAddressAt(test.Self, p) + nrs = append(nrs, &NodeRecord{ + Addr: addr, + }) + } + kad.Add(nrs) + } + + node := &testNode{test.Self} + + n := 0 + for n < 100 { + err = kad.On(node, nil) + if err != nil { + t.Fatalf("backend not accepting node: %v", err) + } + + record, need, _ := kad.Suggest() + if !need { + break + } + n++ + if record == nil { + continue + } + node = &testNode{record.Addr} + } + exp := test.BucketSize * (test.MaxProx + 1) + if kad.Count() != exp { + t.Errorf("incorrect number of peers, expected %d, got %d\n%v", exp, kad.Count(), kad) + return false + } + return true + } + if err := quick.Check(test, quickcfgBootStrap); err != nil { + t.Error(err) + } + +} + +func TestFindClosest(t *testing.T) { + + test := func(test *FindClosestTest) bool { + // for any node kad.le, Target and N + params := NewKadParams() + params.MaxProx = 7 + kad := New(test.Self, params) + var err error + for _, node := range test.All { + err = kad.On(node, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("backend not accepting node: %v", err) + } + } + + if len(test.All) == 0 || test.N == 0 { + return true + } + nodes := kad.FindClosest(test.Target, test.N) + + // check that the number of results is min(N, kad.len) + wantN := test.N + if tlen := kad.Count(); tlen < test.N { + wantN = tlen + } + + if len(nodes) != wantN { + t.Errorf("wrong number of nodes: got %d, want %d", len(nodes), wantN) + return false + } + + if hasDuplicates(nodes) { + t.Errorf("result contains duplicates") + return false + } + + if !sortedByDistanceTo(test.Target, nodes) { + t.Errorf("result is not sorted by distance to target") + return false + } + + // check that the result nodes have minimum distance to target. + farthestResult := nodes[len(nodes)-1].Addr() + for i, b := range kad.buckets { + for j, n := range b { + if contains(nodes, n.Addr()) { + continue // don't run the check below for nodes in result + } + if test.Target.ProxCmp(n.Addr(), farthestResult) < 0 { + _ = i * j + t.Errorf("kad.le contains node that is closer to target but it's not in result") + return false + } + } + } + return true + } + if err := quick.Check(test, quickcfgFindClosest); err != nil { + t.Error(err) + } +} + +type proxTest struct { + add bool + index int + addr Address +} + +var ( + addresses []Address +) + +func TestProxAdjust(t *testing.T) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + self := gen(Address{}, r).(Address) + params := NewKadParams() + params.MaxProx = 7 + kad := New(self, params) + + var err error + for i := 0; i < 100; i++ { + a := gen(Address{}, r).(Address) + addresses = append(addresses, a) + err = kad.On(&testNode{addr: a}, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("backend not accepting node: %v", err) + } + if !kad.proxCheck(t) { + return + } + } + test := func(test *proxTest) bool { + node := &testNode{test.addr} + if test.add { + kad.On(node, nil) + } else { + kad.Off(node, nil) + } + return kad.proxCheck(t) + } + if err := quick.Check(test, quickcfgFindClosest); err != nil { + t.Error(err) + } +} + +func TestSaveLoad(t *testing.T) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + addresses := gen([]Address{}, r).([]Address) + self := RandomAddress() + params := NewKadParams() + params.MaxProx = 7 + kad := New(self, params) + + var err error + + for _, a := range addresses { + err = kad.On(&testNode{addr: a}, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("backend not accepting node: %v", err) + } + } + nodes := kad.FindClosest(self, 100) + + path := filepath.Join(os.TempDir(), "bzz-kad-test-save-load.peers") + err = kad.Save(path, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("unepected error saving kaddb: %v", err) + } + kad = New(self, params) + err = kad.Load(path, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("unepected error loading kaddb: %v", err) + } + for _, b := range kad.db.Nodes { + for _, node := range b { + err = kad.On(&testNode{node.Addr}, nil) + if err != nil && err.Error() != "bucket full" { + t.Fatalf("backend not accepting node: %v", err) + } + } + } + loadednodes := kad.FindClosest(self, 100) + for i, node := range loadednodes { + if nodes[i].Addr() != node.Addr() { + t.Errorf("node mismatch at %d/%d: %v != %v", i, len(nodes), nodes[i].Addr(), node.Addr()) + } + } +} + +func (self *Kademlia) proxCheck(t *testing.T) bool { + var sum int + for i, b := range self.buckets { + l := len(b) + // if we are in the high prox multibucket + if i >= self.proxLimit { + sum += l + } else if l == 0 { + t.Errorf("bucket %d empty, yet proxLimit is %d\n%v", len(b), self.proxLimit, self) + return false + } + } + // check if merged high prox bucket does not exceed size + if sum > 0 { + if sum != self.proxSize { + t.Errorf("proxSize incorrect, expected %v, got %v", sum, self.proxSize) + return false + } + last := len(self.buckets[self.proxLimit]) + if last > 0 && sum >= self.ProxBinSize+last { + t.Errorf("proxLimit %v incorrect, redundant non-empty bucket %d added to proxBin with %v (target %v)\n%v", self.proxLimit, last, sum-last, self.ProxBinSize, self) + return false + } + if self.proxLimit > 0 && sum < self.ProxBinSize { + t.Errorf("proxLimit %v incorrect. proxSize %v is less than target %v, yet there is more peers", self.proxLimit, sum, self.ProxBinSize) + return false + } + } + return true +} + +type bootstrapTest struct { + MaxProx int + BucketSize int + Self Address +} + +func (*bootstrapTest) Generate(rand *rand.Rand, size int) reflect.Value { + t := &bootstrapTest{ + Self: gen(Address{}, rand).(Address), + MaxProx: 5 + rand.Intn(2), + BucketSize: rand.Intn(3) + 1, + } + return reflect.ValueOf(t) +} + +type FindClosestTest struct { + Self Address + Target Address + All []Node + N int +} + +func (c FindClosestTest) String() string { + return fmt.Sprintf("A: %064x\nT: %064x\n(%d)\n", c.Self[:], c.Target[:], c.N) +} + +func (*FindClosestTest) Generate(rand *rand.Rand, size int) reflect.Value { + t := &FindClosestTest{ + Self: gen(Address{}, rand).(Address), + Target: gen(Address{}, rand).(Address), + N: rand.Intn(bucketSize), + } + for _, a := range gen([]Address{}, rand).([]Address) { + t.All = append(t.All, &testNode{addr: a}) + } + return reflect.ValueOf(t) +} + +func (*proxTest) Generate(rand *rand.Rand, size int) reflect.Value { + var add bool + if rand.Intn(1) == 0 { + add = true + } + var t *proxTest + if add { + t = &proxTest{ + addr: gen(Address{}, rand).(Address), + add: add, + } + } else { + t = &proxTest{ + index: rand.Intn(len(addresses)), + add: add, + } + } + return reflect.ValueOf(t) +} + +func hasDuplicates(slice []Node) bool { + seen := make(map[Address]bool) + for _, node := range slice { + if seen[node.Addr()] { + return true + } + seen[node.Addr()] = true + } + return false +} + +func contains(nodes []Node, addr Address) bool { + for _, n := range nodes { + if n.Addr() == addr { + return true + } + } + return false +} + +// gen wraps quick.Value so it's easier to use. +// it generates a random value of the given value's type. +func gen(typ interface{}, rand *rand.Rand) interface{} { + v, ok := quick.Value(reflect.TypeOf(typ), rand) + if !ok { + panic(fmt.Sprintf("couldn't generate random value of type %T", typ)) + } + return v.Interface() +} diff --git a/swarm/network/messages.go b/swarm/network/messages.go new file mode 100644 index 000000000..d3858c424 --- /dev/null +++ b/swarm/network/messages.go @@ -0,0 +1,317 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "fmt" + "net" + "time" + + "github.com/ethereum/go-ethereum/contracts/chequebook" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/swarm/network/kademlia" + "github.com/ethereum/go-ethereum/swarm/services/swap" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +/* +BZZ protocol Message Types and Message Data Types +*/ + +// bzz protocol message codes +const ( + statusMsg = iota // 0x01 + storeRequestMsg // 0x02 + retrieveRequestMsg // 0x03 + peersMsg // 0x04 + syncRequestMsg // 0x05 + deliveryRequestMsg // 0x06 + unsyncedKeysMsg // 0x07 + paymentMsg // 0x08 +) + +/* + Handshake + +* Version: 8 byte integer version of the protocol +* ID: arbitrary byte sequence client identifier human readable +* Addr: the address advertised by the node, format similar to DEVp2p wire protocol +* Swap: info for the swarm accounting protocol +* NetworkID: 8 byte integer network identifier +* Caps: swarm-specific capabilities, format identical to devp2p +* SyncState: syncronisation state (db iterator key and address space etc) persisted about the peer + +*/ +type statusMsgData struct { + Version uint64 + ID string + Addr *peerAddr + Swap *swap.SwapProfile + NetworkId uint64 +} + +func (self *statusMsgData) String() string { + return fmt.Sprintf("Status: Version: %v, ID: %v, Addr: %v, Swap: %v, NetworkId: %v", self.Version, self.ID, self.Addr, self.Swap, self.NetworkId) +} + +/* + store requests are forwarded to the peers in their kademlia proximity bin + if they are distant + if they are within our storage radius or have any incentive to store it + then attach your nodeID to the metadata + if the storage request is sufficiently close (within our proxLimit, i. e., the + last row of the routing table) +*/ +type storeRequestMsgData struct { + Key storage.Key // hash of datasize | data + SData []byte // the actual chunk Data + // optional + Id uint64 // request ID. if delivery, the ID is retrieve request ID + requestTimeout *time.Time // expiry for forwarding - [not serialised][not currently used] + storageTimeout *time.Time // expiry of content - [not serialised][not currently used] + from *peer // [not serialised] protocol registers the requester +} + +func (self storeRequestMsgData) String() string { + var from string + if self.from == nil { + from = "self" + } else { + from = self.from.Addr().String() + } + end := len(self.SData) + if len(self.SData) > 10 { + end = 10 + } + return fmt.Sprintf("from: %v, Key: %v; ID: %v, requestTimeout: %v, storageTimeout: %v, SData %x", from, self.Key, self.Id, self.requestTimeout, self.storageTimeout, self.SData[:end]) +} + +/* +Retrieve request + +Timeout in milliseconds. Note that zero timeout retrieval requests do not request forwarding, but prompt for a peers message response. therefore they serve also +as messages to retrieve peers. + +MaxSize specifies the maximum size that the peer will accept. This is useful in +particular if we allow storage and delivery of multichunk payload representing +the entire or partial subtree unfolding from the requested root key. +So when only interested in limited part of a stream (infinite trees) or only +testing chunk availability etc etc, we can indicate it by limiting the size here. + +Request ID can be newly generated or kept from the request originator. +If request ID Is missing or zero, the request is handled as a lookup only +prompting a peers response but not launching a search. Lookup requests are meant +to be used to bootstrap kademlia tables. + +In the special case that the key is the zero value as well, the remote peer's +address is assumed (the message is to be handled as a self lookup request). +The response is a PeersMsg with the peers in the kademlia proximity bin +corresponding to the address. +*/ + +type retrieveRequestMsgData struct { + Key storage.Key // target Key address of chunk to be retrieved + Id uint64 // request id, request is a lookup if missing or zero + MaxSize uint64 // maximum size of delivery accepted + MaxPeers uint64 // maximum number of peers returned + Timeout uint64 // the longest time we are expecting a response + timeout *time.Time // [not serialied] + from *peer // +} + +func (self retrieveRequestMsgData) String() string { + var from string + if self.from == nil { + from = "ourselves" + } else { + from = self.from.Addr().String() + } + var target []byte + if len(self.Key) > 3 { + target = self.Key[:4] + } + return fmt.Sprintf("from: %v, Key: %x; ID: %v, MaxSize: %v, MaxPeers: %d", from, target, self.Id, self.MaxSize, self.MaxPeers) +} + +// lookups are encoded by missing request ID +func (self retrieveRequestMsgData) isLookup() bool { + return self.Id == 0 +} + +// sets timeout fields +func (self retrieveRequestMsgData) setTimeout(t *time.Time) { + self.timeout = t + if t != nil { + self.Timeout = uint64(t.UnixNano()) + } else { + self.Timeout = 0 + } +} + +func (self retrieveRequestMsgData) getTimeout() (t *time.Time) { + if self.Timeout > 0 && self.timeout == nil { + timeout := time.Unix(int64(self.Timeout), 0) + t = &timeout + self.timeout = t + } + return +} + +// peerAddr is sent in StatusMsg as part of the handshake +type peerAddr struct { + IP net.IP + Port uint16 + ID []byte // the 64 byte NodeID (ECDSA Public Key) + Addr kademlia.Address +} + +// peerAddr pretty prints as enode +func (self peerAddr) String() string { + var nodeid discover.NodeID + copy(nodeid[:], self.ID) + return discover.NewNode(nodeid, self.IP, 0, self.Port).String() +} + +/* +peers Msg is one response to retrieval; it is always encouraged after a retrieval +request to respond with a list of peers in the same kademlia proximity bin. +The encoding of a peer is identical to that in the devp2p base protocol peers +messages: [IP, Port, NodeID] +note that a node's DPA address is not the NodeID but the hash of the NodeID. + +Timeout serves to indicate whether the responder is forwarding the query within +the timeout or not. + +NodeID serves as the owner of payment contracts and signer of proofs of transfer. + +The Key is the target (if response to a retrieval request) or missing (zero value) +peers address (hash of NodeID) if retrieval request was a self lookup. + +Peers message is requested by retrieval requests with a missing or zero value request ID +*/ +type peersMsgData struct { + Peers []*peerAddr // + Timeout uint64 // + timeout *time.Time // indicate whether responder is expected to deliver content + Key storage.Key // present if a response to a retrieval request + Id uint64 // present if a response to a retrieval request + from *peer +} + +// peers msg pretty printer +func (self peersMsgData) String() string { + var from string + if self.from == nil { + from = "ourselves" + } else { + from = self.from.Addr().String() + } + var target []byte + if len(self.Key) > 3 { + target = self.Key[:4] + } + return fmt.Sprintf("from: %v, Key: %x; ID: %v, Peers: %v", from, target, self.Id, self.Peers) +} + +func (self peersMsgData) setTimeout(t *time.Time) { + self.timeout = t + if t != nil { + self.Timeout = uint64(t.UnixNano()) + } else { + self.Timeout = 0 + } +} + +func (self peersMsgData) getTimeout() (t *time.Time) { + if self.Timeout > 0 && self.timeout == nil { + timeout := time.Unix(int64(self.Timeout), 0) + t = &timeout + self.timeout = t + } + return +} + +/* +syncRequest + +is sent after the handshake to initiate syncing +the syncState of the remote node is persisted in kaddb and set on the +peer/protocol instance when the node is registered by hive as online{ +*/ + +type syncRequestMsgData struct { + SyncState *syncState `rlp:"nil"` +} + +func (self *syncRequestMsgData) String() string { + return fmt.Sprintf("%v", self.SyncState) +} + +/* +deliveryRequest + +is sent once a batch of sync keys is filtered. The ones not found are +sent as a list of syncReuest (hash, priority) in the Deliver field. +When the source receives the sync request it continues to iterate +and fetch at most N items as yet unsynced. +At the same time responds with deliveries of the items. +*/ +type deliveryRequestMsgData struct { + Deliver []*syncRequest +} + +func (self *deliveryRequestMsgData) String() string { + return fmt.Sprintf("sync request for new chunks\ndelivery request for %v chunks", len(self.Deliver)) +} + +/* +unsyncedKeys + +is sent first after the handshake if SyncState iterator brings up hundreds, thousands? +and subsequently sent as a response to deliveryRequestMsgData. + +Syncing is the iterative process of exchanging unsyncedKeys and deliveryRequestMsgs +both ways. + +State contains the sync state sent by the source. When the source receives the +sync state it continues to iterate and fetch at most N items as yet unsynced. +At the same time responds with deliveries of the items. +*/ +type unsyncedKeysMsgData struct { + Unsynced []*syncRequest + State *syncState +} + +func (self *unsyncedKeysMsgData) String() string { + return fmt.Sprintf("sync: keys of %d new chunks (state %v) => synced: %v", len(self.Unsynced), self.State, self.State.Synced) +} + +/* +payment + +is sent when the swap balance is tilted in favour of the remote peer +and in absolute units exceeds the PayAt parameter in the remote peer's profile +*/ + +type paymentMsgData struct { + Units uint // units actually paid for (checked against amount by swap) + Promise *chequebook.Cheque // payment with cheque +} + +func (self *paymentMsgData) String() string { + return fmt.Sprintf("payment for %d units: %v", self.Units, self.Promise) +} diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go new file mode 100644 index 000000000..5e65108d6 --- /dev/null +++ b/swarm/network/protocol.go @@ -0,0 +1,554 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +/* +bzz implements the swarm wire protocol [bzz] (sister of eth and shh) +the protocol instance is launched on each peer by the network layer if the +bzz protocol handler is registered on the p2p server. + +The bzz protocol component speaks the bzz protocol +* handle the protocol handshake +* register peers in the KΛÐΞMLIΛ table via the hive logistic manager +* dispatch to hive for handling the DHT logic +* encode and decode requests for storage and retrieval +* handle sync protocol messages via the syncer +* talks the SWAP payment protocol (swap accounting is done within NetStore) +*/ + +import ( + "fmt" + "net" + "strconv" + "time" + + "github.com/ethereum/go-ethereum/contracts/chequebook" + "github.com/ethereum/go-ethereum/errs" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap" + "github.com/ethereum/go-ethereum/swarm/services/swap/swap" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +const ( + Version = 0 + ProtocolLength = uint64(8) + ProtocolMaxMsgSize = 10 * 1024 * 1024 + NetworkId = 322 +) + +const ( + ErrMsgTooLarge = iota + ErrDecode + ErrInvalidMsgCode + ErrVersionMismatch + ErrNetworkIdMismatch + ErrNoStatusMsg + ErrExtraStatusMsg + ErrSwap + ErrSync + ErrUnwanted +) + +var errorToString = map[int]string{ + ErrMsgTooLarge: "Message too long", + ErrDecode: "Invalid message", + ErrInvalidMsgCode: "Invalid message code", + ErrVersionMismatch: "Protocol version mismatch", + ErrNetworkIdMismatch: "NetworkId mismatch", + ErrNoStatusMsg: "No status message", + ErrExtraStatusMsg: "Extra status message", + ErrSwap: "SWAP error", + ErrSync: "Sync error", + ErrUnwanted: "Unwanted peer", +} + +// bzz represents the swarm wire protocol +// an instance is running on each peer +type bzz struct { + selfID discover.NodeID // peer's node id used in peer advertising in handshake + key storage.Key // baseaddress as storage.Key + storage StorageHandler // handler storage/retrieval related requests coming via the bzz wire protocol + hive *Hive // the logistic manager, peerPool, routing service and peer handler + dbAccess *DbAccess // access to db storage counter and iterator for syncing + requestDb *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing + remoteAddr *peerAddr // remote peers address + peer *p2p.Peer // the p2p peer object + rw p2p.MsgReadWriter // messageReadWriter to send messages to + errors *errs.Errors // errors table + backend chequebook.Backend + lastActive time.Time + + swap *swap.Swap // swap instance for the peer connection + swapParams *bzzswap.SwapParams // swap settings both local and remote + swapEnabled bool // flag to enable SWAP (will be set via Caps in handshake) + syncEnabled bool // flag to enable SYNC (will be set via Caps in handshake) + syncer *syncer // syncer instance for the peer connection + syncParams *SyncParams // syncer params + syncState *syncState // outgoing syncronisation state (contains reference to remote peers db counter) +} + +// interface type for handler of storage/retrieval related requests coming +// via the bzz wire protocol +// messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest +type StorageHandler interface { + HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error + HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error + HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) + HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) +} + +/* +main entrypoint, wrappers starting a server that will run the bzz protocol +use this constructor to attach the protocol ("class") to server caps +This is done by node.Node#Register(func(node.ServiceContext) (Service, error)) +Service implements Protocols() which is an array of protocol constructors +at node startup the protocols are initialised +the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error +on each peer connection +The Run function of the Bzz protocol class creates a bzz instance +which will represent the peer for the swarm hive and all peer-aware components +*/ +func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams) (p2p.Protocol, error) { + + // a single global request db is created for all peer connections + // this is to persist delivery backlog and aid syncronisation + requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath) + if err != nil { + return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err) + } + + return p2p.Protocol{ + Name: "bzz", + Version: Version, + Length: ProtocolLength, + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, p, rw) + }, + }, nil +} + +/* +the main protocol loop that + * does the handshake by exchanging statusMsg + * if peer is valid and accepted, registers with the hive + * then enters into a forever loop handling incoming messages + * storage and retrieval related queries coming via bzz are dispatched to StorageHandler + * peer-related messages are dispatched to the hive + * payment related messages are relayed to SWAP service + * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook) + * whenever the loop terminates, the peer will disconnect with Subprotocol error + * whenever handlers return an error the loop terminates +*/ +func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) { + + self := &bzz{ + storage: depo, + backend: backend, + hive: hive, + dbAccess: dbaccess, + requestDb: requestDb, + peer: p, + rw: rw, + errors: &errs.Errors{ + Package: "BZZ", + Errors: errorToString, + }, + swapParams: sp, + syncParams: sy, + swapEnabled: hive.swapEnabled, + syncEnabled: true, + } + + // handle handshake + err = self.handleStatus() + if err != nil { + return err + } + defer func() { + // if the handler loop exits, the peer is disconnecting + // deregister the peer in the hive + self.hive.removePeer(&peer{bzz: self}) + if self.syncer != nil { + self.syncer.stop() // quits request db and delivery loops, save requests + } + if self.swap != nil { + self.swap.Stop() // quits chequebox autocash etc + } + }() + + // the main forever loop that handles incoming requests + for { + if self.hive.blockRead { + glog.V(logger.Warn).Infof("Cannot read network") + time.Sleep(100 * time.Millisecond) + continue + } + err = self.handle() + if err != nil { + return + } + } +} + +// TODO: may need to implement protocol drop only? don't want to kick off the peer +// if they are useful for other protocols +func (self *bzz) Drop() { + self.peer.Disconnect(p2p.DiscSubprotocolError) +} + +// one cycle of the main forever loop that handles and dispatches incoming messages +func (self *bzz) handle() error { + msg, err := self.rw.ReadMsg() + glog.V(logger.Debug).Infof("<- %v", msg) + if err != nil { + return err + } + if msg.Size > ProtocolMaxMsgSize { + return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + } + // make sure that the payload has been fully consumed + defer msg.Discard() + + switch msg.Code { + + case statusMsg: + // no extra status message allowed. The one needed already handled by + // handleStatus + glog.V(logger.Debug).Infof("Status message: %v", msg) + return self.protoError(ErrExtraStatusMsg, "") + + case storeRequestMsg: + // store requests are dispatched to netStore + var req storeRequestMsgData + if err := msg.Decode(&req); err != nil { + return self.protoError(ErrDecode, "<- %v: %v", msg, err) + } + if len(req.SData) < 9 { + return self.protoError(ErrDecode, "<- %v: Data too short (%v)", msg) + } + // last Active time is set only when receiving chunks + self.lastActive = time.Now() + glog.V(logger.Detail).Infof("incoming store request: %s", req.String()) + // swap accounting is done within forwarding + self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self}) + + case retrieveRequestMsg: + // retrieve Requests are dispatched to netStore + var req retrieveRequestMsgData + if err := msg.Decode(&req); err != nil { + return self.protoError(ErrDecode, "<- %v: %v", msg, err) + } + req.from = &peer{bzz: self} + // if request is lookup and not to be delivered + if req.isLookup() { + glog.V(logger.Detail).Infof("self lookup for %v: responding with peers only...", req.from) + } else if req.Key == nil { + return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil") + } else { + // swap accounting is done within netStore + self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self}) + } + // direct response with peers, TODO: sort this out + self.hive.peers(&req) + + case peersMsg: + // response to lookups and immediate response to retrieve requests + // dispatches new peer data to the hive that adds them to KADDB + var req peersMsgData + if err := msg.Decode(&req); err != nil { + return self.protoError(ErrDecode, "<- %v: %v", msg, err) + } + req.from = &peer{bzz: self} + glog.V(logger.Detail).Infof("<- peer addresses: %v", req) + self.hive.HandlePeersMsg(&req, &peer{bzz: self}) + + case syncRequestMsg: + var req syncRequestMsgData + if err := msg.Decode(&req); err != nil { + return self.protoError(ErrDecode, "<- %v: %v", msg, err) + } + glog.V(logger.Debug).Infof("<- sync request: %v", req) + self.lastActive = time.Now() + self.sync(req.SyncState) + + case unsyncedKeysMsg: + // coming from parent node offering + var req unsyncedKeysMsgData + if err := msg.Decode(&req); err != nil { + return self.protoError(ErrDecode, "<- %v: %v", msg, err) + } + glog.V(logger.Debug).Infof("<- unsynced keys : %s", req.String()) + err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self}) + self.lastActive = time.Now() + if err != nil { + return self.protoError(ErrDecode, "<- %v: %v", msg, err) + } + + case deliveryRequestMsg: + // response to syncKeysMsg hashes filtered not existing in db + // also relays the last synced state to the source + var req deliveryRequestMsgData + if err := msg.Decode(&req); err != nil { + return self.protoError(ErrDecode, "<-msg %v: %v", msg, err) + } + glog.V(logger.Debug).Infof("<- delivery request: %s", req.String()) + err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self}) + self.lastActive = time.Now() + if err != nil { + return self.protoError(ErrDecode, "<- %v: %v", msg, err) + } + + case paymentMsg: + // swap protocol message for payment, Units paid for, Cheque paid with + if self.swapEnabled { + var req paymentMsgData + if err := msg.Decode(&req); err != nil { + return self.protoError(ErrDecode, "<- %v: %v", msg, err) + } + glog.V(logger.Debug).Infof("<- payment: %s", req.String()) + self.swap.Receive(int(req.Units), req.Promise) + } + + default: + // no other message is allowed + return self.protoError(ErrInvalidMsgCode, "%v", msg.Code) + } + return nil +} + +func (self *bzz) handleStatus() (err error) { + + handshake := &statusMsgData{ + Version: uint64(Version), + ID: "honey", + Addr: self.selfAddr(), + NetworkId: uint64(NetworkId), + Swap: &bzzswap.SwapProfile{ + Profile: self.swapParams.Profile, + PayProfile: self.swapParams.PayProfile, + }, + } + + err = p2p.Send(self.rw, statusMsg, handshake) + if err != nil { + self.protoError(ErrNoStatusMsg, err.Error()) + } + + // read and handle remote status + var msg p2p.Msg + msg, err = self.rw.ReadMsg() + if err != nil { + return err + } + + if msg.Code != statusMsg { + self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, statusMsg) + } + + if msg.Size > ProtocolMaxMsgSize { + return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + } + + var status statusMsgData + if err := msg.Decode(&status); err != nil { + return self.protoError(ErrDecode, " %v: %v", msg, err) + } + + if status.NetworkId != NetworkId { + return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId) + } + + if Version != status.Version { + return self.protoError(ErrVersionMismatch, "%d (!= %d)", status.Version, Version) + } + + self.remoteAddr = self.peerAddr(status.Addr) + glog.V(logger.Detail).Infof("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr()) + + if self.swapEnabled { + // set remote profile for accounting + self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self) + if err != nil { + return self.protoError(ErrSwap, "%v", err) + } + } + + glog.V(logger.Info).Infof("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId) + err = self.hive.addPeer(&peer{bzz: self}) + if err != nil { + return self.protoError(ErrUnwanted, "%v", err) + } + + // hive sets syncstate so sync should start after node added + glog.V(logger.Info).Infof("syncronisation request sent with %v", self.syncState) + self.syncRequest() + + return nil +} + +func (self *bzz) sync(state *syncState) error { + // syncer setup + if self.syncer != nil { + return self.protoError(ErrSync, "sync request can only be sent once") + } + + cnt := self.dbAccess.counter() + remoteaddr := self.remoteAddr.Addr + start, stop := self.hive.kad.KeyRange(remoteaddr) + + // an explicitly received nil syncstate disables syncronisation + if state == nil { + self.syncEnabled = false + glog.V(logger.Warn).Infof("syncronisation disabled for peer %v", self) + state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true} + } else { + state.synced = make(chan bool) + state.SessionAt = cnt + if storage.IsZeroKey(state.Stop) && state.Synced { + state.Start = storage.Key(start[:]) + state.Stop = storage.Key(stop[:]) + } + glog.V(logger.Debug).Infof("syncronisation requested by peer %v at state %v", self, state) + } + var err error + self.syncer, err = newSyncer( + self.requestDb, + storage.Key(remoteaddr[:]), + self.dbAccess, + self.unsyncedKeys, self.store, + self.syncParams, state, func() bool { return self.syncEnabled }, + ) + if err != nil { + return self.protoError(ErrSync, "%v", err) + } + glog.V(logger.Detail).Infof("syncer set for peer %v", self) + return nil +} + +func (self *bzz) String() string { + return self.remoteAddr.String() +} + +// repair reported address if IP missing +func (self *bzz) peerAddr(base *peerAddr) *peerAddr { + if base.IP.IsUnspecified() { + host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String()) + base.IP = net.ParseIP(host) + } + return base +} + +// returns self advertised node connection info (listening address w enodes) +// IP will get repaired on the other end if missing +// or resolved via ID by discovery at dialout +func (self *bzz) selfAddr() *peerAddr { + id := self.hive.id + host, port, _ := net.SplitHostPort(self.hive.listenAddr()) + intport, _ := strconv.Atoi(port) + addr := &peerAddr{ + Addr: self.hive.addr, + ID: id[:], + IP: net.ParseIP(host), + Port: uint16(intport), + } + return addr +} + +// outgoing messages +// send retrieveRequestMsg +func (self *bzz) retrieve(req *retrieveRequestMsgData) error { + return self.send(retrieveRequestMsg, req) +} + +// send storeRequestMsg +func (self *bzz) store(req *storeRequestMsgData) error { + return self.send(storeRequestMsg, req) +} + +func (self *bzz) syncRequest() error { + req := &syncRequestMsgData{} + if self.hive.syncEnabled { + glog.V(logger.Debug).Infof("syncronisation request to peer %v at state %v", self, self.syncState) + req.SyncState = self.syncState + } + if self.syncState == nil { + glog.V(logger.Warn).Infof("syncronisation disabled for peer %v at state %v", self, self.syncState) + } + return self.send(syncRequestMsg, req) +} + +// queue storeRequestMsg in request db +func (self *bzz) deliveryRequest(reqs []*syncRequest) error { + req := &deliveryRequestMsgData{ + Deliver: reqs, + } + return self.send(deliveryRequestMsg, req) +} + +// batch of syncRequests to send off +func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error { + req := &unsyncedKeysMsgData{ + Unsynced: reqs, + State: state, + } + return self.send(unsyncedKeysMsg, req) +} + +// send paymentMsg +func (self *bzz) Pay(units int, promise swap.Promise) { + req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)} + self.payment(req) +} + +// send paymentMsg +func (self *bzz) payment(req *paymentMsgData) error { + return self.send(paymentMsg, req) +} + +// sends peersMsg +func (self *bzz) peers(req *peersMsgData) error { + return self.send(peersMsg, req) +} + +func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) { + err = self.errors.New(code, format, params...) + err.Log(glog.V(logger.Info)) + return +} + +func (self *bzz) protoErrorDisconnect(err *errs.Error) { + err.Log(glog.V(logger.Info)) + if err.Fatal() { + self.peer.Disconnect(p2p.DiscSubprotocolError) + } +} + +func (self *bzz) send(msg uint64, data interface{}) error { + if self.hive.blockWrite { + return fmt.Errorf("network write blocked") + } + glog.V(logger.Detail).Infof("-> %v: %v (%T) to %v", msg, data, data, self) + err := p2p.Send(self.rw, msg, data) + if err != nil { + self.Drop() + } + return err +} diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go new file mode 100644 index 000000000..91dea8cac --- /dev/null +++ b/swarm/network/protocol_test.go @@ -0,0 +1,17 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network diff --git a/swarm/network/syncdb.go b/swarm/network/syncdb.go new file mode 100644 index 000000000..cef32610f --- /dev/null +++ b/swarm/network/syncdb.go @@ -0,0 +1,390 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "encoding/binary" + "fmt" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +const counterKeyPrefix = 0x01 + +/* +syncDb is a queueing service for outgoing deliveries. +One instance per priority queue for each peer + +a syncDb instance maintains an in-memory buffer (of capacity bufferSize) +once its in-memory buffer is full it switches to persisting in db +and dbRead iterator iterates through the items keeping their order +once the db read catches up (there is no more items in the db) then +it switches back to in-memory buffer. + +when syncdb is stopped all items in the buffer are saved to the db +*/ +type syncDb struct { + start []byte // this syncdb starting index in requestdb + key storage.Key // remote peers address key + counterKey []byte // db key to persist counter + priority uint // priotity High|Medium|Low + buffer chan interface{} // incoming request channel + db *storage.LDBDatabase // underlying db (TODO should be interface) + done chan bool // chan to signal goroutines finished quitting + quit chan bool // chan to signal quitting to goroutines + total, dbTotal int // counts for one session + batch chan chan int // channel for batch requests + dbBatchSize uint // number of items before batch is saved +} + +// constructor needs a shared request db (leveldb) +// priority is used in the index key +// uses a buffer and a leveldb for persistent storage +// bufferSize, dbBatchSize are config parameters +func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb { + start := make([]byte, 42) + start[1] = byte(priorities - priority) + copy(start[2:34], key) + + counterKey := make([]byte, 34) + counterKey[0] = counterKeyPrefix + copy(counterKey[1:], start[1:34]) + + syncdb := &syncDb{ + start: start, + key: key, + counterKey: counterKey, + priority: priority, + buffer: make(chan interface{}, bufferSize), + db: db, + done: make(chan bool), + quit: make(chan bool), + batch: make(chan chan int), + dbBatchSize: dbBatchSize, + } + glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority) + + // starts the main forever loop reading from buffer + go syncdb.bufferRead(deliver) + return syncdb +} + +/* +bufferRead is a forever iterator loop that takes care of delivering +outgoing store requests reads from incoming buffer + +its argument is the deliver function taking the item as first argument +and a quit channel as second. +Closing of this channel is supposed to abort all waiting for delivery +(typically network write) + +The iteration switches between 2 modes, +* buffer mode reads the in-memory buffer and delivers the items directly +* db mode reads from the buffer and writes to the db, parallelly another +routine is started that reads from the db and delivers items + +If there is buffer contention in buffer mode (slow network, high upload volume) +syncdb switches to db mode and starts dbRead +Once db backlog is delivered, it reverts back to in-memory buffer + +It is automatically started when syncdb is initialised. + +It saves the buffer to db upon receiving quit signal. syncDb#stop() +*/ +func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) { + var buffer, db chan interface{} // channels representing the two read modes + var more bool + var req interface{} + var entry *syncDbEntry + var inBatch, inDb int + batch := new(leveldb.Batch) + var dbSize chan int + quit := self.quit + counterValue := make([]byte, 8) + + // counter is used for keeping the items in order, persisted to db + // start counter where db was at, 0 if not found + data, err := self.db.Get(self.counterKey) + var counter uint64 + if err == nil { + counter = binary.BigEndian.Uint64(data) + glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter) + } else { + glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter) + } + +LOOP: + for { + // waiting for item next in the buffer, or quit signal or batch request + select { + // buffer only closes when writing to db + case req = <-buffer: + // deliver request : this is blocking on network write so + // it is passed the quit channel as argument, so that it returns + // if syncdb is stopped. In this case we need to save the item to the db + more = deliver(req, self.quit) + if !more { + glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) + // received quit signal, save request currently waiting delivery + // by switching to db mode and closing the buffer + buffer = nil + db = self.buffer + close(db) + quit = nil // needs to block the quit case in select + break // break from select, this item will be written to the db + } + self.total++ + glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) + // by the time deliver returns, there were new writes to the buffer + // if buffer contention is detected, switch to db mode which drains + // the buffer so no process will block on pushing store requests + if len(buffer) == cap(buffer) { + glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total) + buffer = nil + db = self.buffer + } + continue LOOP + + // incoming entry to put into db + case req, more = <-db: + if !more { + // only if quit is called, saved all the buffer + binary.BigEndian.PutUint64(counterValue, counter) + batch.Put(self.counterKey, counterValue) // persist counter in batch + self.writeSyncBatch(batch) // save batch + glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority) + break LOOP + } + self.dbTotal++ + self.total++ + // otherwise break after select + case dbSize = <-self.batch: + // explicit request for batch + if inBatch == 0 && quit != nil { + // there was no writes since the last batch so db depleted + // switch to buffer mode + glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority) + db = nil + buffer = self.buffer + dbSize <- 0 // indicates to 'caller' that batch has been written + inDb = 0 + continue LOOP + } + binary.BigEndian.PutUint64(counterValue, counter) + batch.Put(self.counterKey, counterValue) + glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue) + batch = self.writeSyncBatch(batch) + dbSize <- inBatch // indicates to 'caller' that batch has been written + inBatch = 0 + continue LOOP + + // closing syncDb#quit channel is used to signal to all goroutines to quit + case <-quit: + // need to save backlog, so switch to db mode + db = self.buffer + buffer = nil + quit = nil + glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority) + close(db) + continue LOOP + } + + // only get here if we put req into db + entry, err = self.newSyncDbEntry(req, counter) + if err != nil { + glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err) + continue LOOP + } + batch.Put(entry.key, entry.val) + glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter) + // if just switched to db mode and not quitting, then launch dbRead + // in a parallel go routine to send deliveries from db + if inDb == 0 && quit != nil { + glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead") + go self.dbRead(true, counter, deliver) + } + inDb++ + inBatch++ + counter++ + // need to save the batch if it gets too large (== dbBatchSize) + if inBatch%int(self.dbBatchSize) == 0 { + batch = self.writeSyncBatch(batch) + } + } + glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter) + close(self.done) +} + +// writes the batch to the db and returns a new batch object +func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch { + err := self.db.Write(batch) + if err != nil { + glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err) + return batch + } + return new(leveldb.Batch) +} + +// abstract type for db entries (TODO could be a feature of Receipts) +type syncDbEntry struct { + key, val []byte +} + +func (self syncDbEntry) String() string { + return fmt.Sprintf("key: %x, value: %x", self.key, self.val) +} + +/* + dbRead is iterating over store requests to be sent over to the peer + this is mainly to prevent crashes due to network output buffer contention (???) + as well as to make syncronisation resilient to disconnects + the messages are supposed to be sent in the p2p priority queue. + + the request DB is shared between peers, but domains for each syncdb + are disjoint. dbkeys (42 bytes) are structured: + * 0: 0x00 (0x01 reserved for counter key) + * 1: priorities - priority (so that high priority can be replayed first) + * 2-33: peers address + * 34-41: syncdb counter to preserve order (this field is missing for the counter key) + + values (40 bytes) are: + * 0-31: key + * 32-39: request id + +dbRead needs a boolean to indicate if on first round all the historical +record is synced. Second argument to indicate current db counter +The third is the function to apply +*/ +func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) { + key := make([]byte, 42) + copy(key, self.start) + binary.BigEndian.PutUint64(key[34:], counter) + var batches, n, cnt, total int + var more bool + var entry *syncDbEntry + var it iterator.Iterator + var del *leveldb.Batch + batchSizes := make(chan int) + + for { + // if useBatches is false, cnt is not set + if useBatches { + // this could be called before all cnt items sent out + // so that loop is not blocking while delivering + // only relevant if cnt is large + select { + case self.batch <- batchSizes: + case <-self.quit: + return + } + // wait for the write to finish and get the item count in the next batch + cnt = <-batchSizes + batches++ + if cnt == 0 { + // empty + return + } + } + it = self.db.NewIterator() + it.Seek(key) + if !it.Valid() { + copy(key, self.start) + useBatches = true + continue + } + del = new(leveldb.Batch) + glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt) + + for n = 0; !useBatches || n < cnt; it.Next() { + copy(key, it.Key()) + if len(key) == 0 || key[0] != 0 { + copy(key, self.start) + useBatches = true + break + } + val := make([]byte, 40) + copy(val, it.Value()) + entry = &syncDbEntry{key, val} + // glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total) + more = fun(entry, self.quit) + if !more { + // quit received when waiting to deliver entry, the entry will not be deleted + glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt) + break + } + // since subsequent batches of the same db session are indexed incrementally + // deleting earlier batches can be delayed and parallelised + // this could be batch delete when db is idle (but added complexity esp when quitting) + del.Delete(key) + n++ + total++ + } + glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total) + self.db.Write(del) // this could be async called only when db is idle + it.Release() + } +} + +// +func (self *syncDb) stop() { + close(self.quit) + <-self.done +} + +// calculate a dbkey for the request, for the db to work +// see syncdb for db key structure +// polimorphic: accepted types, see syncer#addRequest +func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) { + var key storage.Key + var chunk *storage.Chunk + var id uint64 + var ok bool + var sreq *storeRequestMsgData + + if key, ok = req.(storage.Key); ok { + id = generateId() + } else if chunk, ok = req.(*storage.Chunk); ok { + key = chunk.Key + id = generateId() + } else if sreq, ok = req.(*storeRequestMsgData); ok { + key = sreq.Key + id = sreq.Id + } else if entry, ok = req.(*syncDbEntry); !ok { + return nil, fmt.Errorf("type not allowed: %v (%T)", req, req) + } + + // order by peer > priority > seqid + // value is request id if exists + if entry == nil { + dbkey := make([]byte, 42) + dbval := make([]byte, 40) + + // encode key + copy(dbkey[:], self.start[:34]) // db peer + binary.BigEndian.PutUint64(dbkey[34:], counter) + // encode value + copy(dbval, key[:]) + binary.BigEndian.PutUint64(dbval[32:], id) + + entry = &syncDbEntry{dbkey, dbval} + } + return +} diff --git a/swarm/network/syncdb_test.go b/swarm/network/syncdb_test.go new file mode 100644 index 000000000..e46d32a2e --- /dev/null +++ b/swarm/network/syncdb_test.go @@ -0,0 +1,221 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "bytes" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +func init() { + glog.SetV(0) + glog.SetToStderr(true) +} + +type testSyncDb struct { + *syncDb + c int + t *testing.T + fromDb chan bool + delivered [][]byte + sent []int + dbdir string + at int +} + +func newTestSyncDb(priority, bufferSize, batchSize int, dbdir string, t *testing.T) *testSyncDb { + if len(dbdir) == 0 { + tmp, err := ioutil.TempDir(os.TempDir(), "syncdb-test") + if err != nil { + t.Fatalf("unable to create temporary direcory %v: %v", tmp, err) + } + dbdir = tmp + } + db, err := storage.NewLDBDatabase(filepath.Join(dbdir, "requestdb")) + if err != nil { + t.Fatalf("unable to create db: %v", err) + } + self := &testSyncDb{ + fromDb: make(chan bool), + dbdir: dbdir, + t: t, + } + h := crypto.Sha3Hash([]byte{0}) + key := storage.Key(h[:]) + self.syncDb = newSyncDb(db, key, uint(priority), uint(bufferSize), uint(batchSize), self.deliver) + // kick off db iterator right away, if no items on db this will allow + // reading from the buffer + return self + +} + +func (self *testSyncDb) close() { + self.db.Close() + os.RemoveAll(self.dbdir) +} + +func (self *testSyncDb) push(n int) { + for i := 0; i < n; i++ { + self.buffer <- storage.Key(crypto.Sha3([]byte{byte(self.c)})) + self.sent = append(self.sent, self.c) + self.c++ + } + glog.V(logger.Debug).Infof("pushed %v requests", n) +} + +func (self *testSyncDb) draindb() { + it := self.db.NewIterator() + defer it.Release() + for { + it.Seek(self.start) + if !it.Valid() { + return + } + k := it.Key() + if len(k) == 0 || k[0] == 1 { + return + } + it.Release() + it = self.db.NewIterator() + } +} + +func (self *testSyncDb) deliver(req interface{}, quit chan bool) bool { + _, db := req.(*syncDbEntry) + key, _, _, _, err := parseRequest(req) + if err != nil { + self.t.Fatalf("unexpected error of key %v: %v", key, err) + } + self.delivered = append(self.delivered, key) + select { + case self.fromDb <- db: + return true + case <-quit: + return false + } +} + +func (self *testSyncDb) expect(n int, db bool) { + var ok bool + // for n items + for i := 0; i < n; i++ { + ok = <-self.fromDb + if self.at+1 > len(self.delivered) { + self.t.Fatalf("expected %v, got %v", self.at+1, len(self.delivered)) + } + if len(self.sent) > self.at && !bytes.Equal(crypto.Sha3([]byte{byte(self.sent[self.at])}), self.delivered[self.at]) { + self.t.Fatalf("expected delivery %v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db) + glog.V(logger.Debug).Infof("%v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db) + } + if !ok && db { + self.t.Fatalf("expected delivery %v/%v/%v from db", i, n, self.at) + } + if ok && !db { + self.t.Fatalf("expected delivery %v/%v/%v from cache", i, n, self.at) + } + self.at++ + } +} + +func TestSyncDb(t *testing.T) { + priority := High + bufferSize := 5 + batchSize := 2 * bufferSize + s := newTestSyncDb(priority, bufferSize, batchSize, "", t) + defer s.close() + defer s.stop() + s.dbRead(false, 0, s.deliver) + s.draindb() + + s.push(4) + s.expect(1, false) + // 3 in buffer + time.Sleep(100 * time.Millisecond) + s.push(3) + // push over limit + s.expect(1, false) + // one popped from the buffer, then contention detected + s.expect(4, true) + s.push(4) + s.expect(5, true) + // depleted db, switch back to buffer + s.draindb() + s.push(5) + s.expect(4, false) + s.push(3) + s.expect(4, false) + // buffer depleted + time.Sleep(100 * time.Millisecond) + s.push(6) + s.expect(1, false) + // push into buffer full, switch to db + s.expect(5, true) + s.draindb() + s.push(1) + s.expect(1, false) +} + +func TestSaveSyncDb(t *testing.T) { + amount := 30 + priority := High + bufferSize := amount + batchSize := 10 + s := newTestSyncDb(priority, bufferSize, batchSize, "", t) + go s.dbRead(false, 0, s.deliver) + s.push(amount) + s.stop() + s.db.Close() + + s = newTestSyncDb(priority, bufferSize, batchSize, s.dbdir, t) + go s.dbRead(false, 0, s.deliver) + s.expect(amount, true) + for i, key := range s.delivered { + expKey := crypto.Sha3([]byte{byte(i)}) + if !bytes.Equal(key, expKey) { + t.Fatalf("delivery %v expected to be key %x, got %x", i, expKey, key) + } + } + s.push(amount) + s.expect(amount, false) + for i := amount; i < 2*amount; i++ { + key := s.delivered[i] + expKey := crypto.Sha3([]byte{byte(i - amount)}) + if !bytes.Equal(key, expKey) { + t.Fatalf("delivery %v expected to be key %x, got %x", i, expKey, key) + } + } + s.stop() + s.db.Close() + + s = newTestSyncDb(priority, bufferSize, batchSize, s.dbdir, t) + defer s.close() + defer s.stop() + + go s.dbRead(false, 0, s.deliver) + s.push(1) + s.expect(1, false) + +} diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go new file mode 100644 index 000000000..e871666bd --- /dev/null +++ b/swarm/network/syncer.go @@ -0,0 +1,778 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "path/filepath" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// syncer parameters (global, not peer specific) default values +const ( + requestDbBatchSize = 512 // size of batch before written to request db + keyBufferSize = 1024 // size of buffer for unsynced keys + syncBatchSize = 128 // maximum batchsize for outgoing requests + syncBufferSize = 128 // size of buffer for delivery requests + syncCacheSize = 1024 // cache capacity to store request queue in memory +) + +// priorities +const ( + Low = iota // 0 + Medium // 1 + High // 2 + priorities // 3 number of priority levels +) + +// request types +const ( + DeliverReq = iota // 0 + PushReq // 1 + PropagateReq // 2 + HistoryReq // 3 + BacklogReq // 4 +) + +// json serialisable struct to record the syncronisation state between 2 peers +type syncState struct { + *storage.DbSyncState // embeds the following 4 fields: + // Start Key // lower limit of address space + // Stop Key // upper limit of address space + // First uint64 // counter taken from last sync state + // Last uint64 // counter of remote peer dbStore at the time of last connection + SessionAt uint64 // set at the time of connection + LastSeenAt uint64 // set at the time of connection + Latest storage.Key // cursor of dbstore when last (continuously set by syncer) + Synced bool // true iff Sync is done up to the last disconnect + synced chan bool // signal that sync stage finished +} + +// wrapper of db-s to provide mockable custom local chunk store access to syncer +type DbAccess struct { + db *storage.DbStore + loc *storage.LocalStore +} + +func NewDbAccess(loc *storage.LocalStore) *DbAccess { + return &DbAccess{loc.DbStore.(*storage.DbStore), loc} +} + +// to obtain the chunks from key or request db entry only +func (self *DbAccess) get(key storage.Key) (*storage.Chunk, error) { + return self.loc.Get(key) +} + +// current storage counter of chunk db +func (self *DbAccess) counter() uint64 { + return self.db.Counter() +} + +// implemented by dbStoreSyncIterator +type keyIterator interface { + Next() storage.Key +} + +// generator function for iteration by address range and storage counter +func (self *DbAccess) iterator(s *syncState) keyIterator { + it, err := self.db.NewSyncIterator(*(s.DbSyncState)) + if err != nil { + return nil + } + return keyIterator(it) +} + +func (self syncState) String() string { + if self.Synced { + return fmt.Sprintf( + "session started at: %v, last seen at: %v, latest key: %v", + self.SessionAt, self.LastSeenAt, + self.Latest.Log(), + ) + } else { + return fmt.Sprintf( + "address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v", + self.Start.Log(), self.Stop.Log(), + self.First, self.Last, + self.SessionAt, self.LastSeenAt, + self.Latest.Log(), + ) + } +} + +// syncer parameters (global, not peer specific) +type SyncParams struct { + RequestDbPath string // path for request db (leveldb) + RequestDbBatchSize uint // nuber of items before batch is saved to requestdb + KeyBufferSize uint // size of key buffer + SyncBatchSize uint // maximum batchsize for outgoing requests + SyncBufferSize uint // size of buffer for + SyncCacheSize uint // cache capacity to store request queue in memory + SyncPriorities []uint // list of priority levels for req types 0-3 + SyncModes []bool // list of sync modes for for req types 0-3 +} + +// constructor with default values +func NewSyncParams(bzzdir string) *SyncParams { + return &SyncParams{ + RequestDbPath: filepath.Join(bzzdir, "requests"), + RequestDbBatchSize: requestDbBatchSize, + KeyBufferSize: keyBufferSize, + SyncBufferSize: syncBufferSize, + SyncBatchSize: syncBatchSize, + SyncCacheSize: syncCacheSize, + SyncPriorities: []uint{High, Medium, Medium, Low, Low}, + SyncModes: []bool{true, true, true, true, false}, + } +} + +// syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding +type syncer struct { + *SyncParams // sync parameters + syncF func() bool // if syncing is needed + key storage.Key // remote peers address key + state *syncState // sync state for our dbStore + syncStates chan *syncState // different stages of sync + deliveryRequest chan bool // one of two triggers needed to send unsyncedKeys + newUnsyncedKeys chan bool // one of two triggers needed to send unsynced keys + quit chan bool // signal to quit loops + + // DB related fields + dbAccess *DbAccess // access to dbStore + db *storage.LDBDatabase // delivery msg db + + // native fields + queues [priorities]*syncDb // in-memory cache / queues for sync reqs + keys [priorities]chan interface{} // buffer for unsynced keys + deliveries [priorities]chan *storeRequestMsgData // delivery + + // bzz protocol instance outgoing message callbacks (mockable for testing) + unsyncedKeys func([]*syncRequest, *syncState) error // send unsyncedKeysMsg + store func(*storeRequestMsgData) error // send storeRequestMsg +} + +// a syncer instance is linked to each peer connection +// constructor is called from protocol after successful handshake +// the returned instance is attached to the peer and can be called +// by the forwarder +func newSyncer( + db *storage.LDBDatabase, remotekey storage.Key, + dbAccess *DbAccess, + unsyncedKeys func([]*syncRequest, *syncState) error, + store func(*storeRequestMsgData) error, + params *SyncParams, + state *syncState, + syncF func() bool, +) (*syncer, error) { + + syncBufferSize := params.SyncBufferSize + keyBufferSize := params.KeyBufferSize + dbBatchSize := params.RequestDbBatchSize + + self := &syncer{ + syncF: syncF, + key: remotekey, + dbAccess: dbAccess, + syncStates: make(chan *syncState, 20), + deliveryRequest: make(chan bool, 1), + newUnsyncedKeys: make(chan bool, 1), + SyncParams: params, + state: state, + quit: make(chan bool), + unsyncedKeys: unsyncedKeys, + store: store, + } + + // initialising + for i := 0; i < priorities; i++ { + self.keys[i] = make(chan interface{}, keyBufferSize) + self.deliveries[i] = make(chan *storeRequestMsgData) + // initialise a syncdb instance for each priority queue + self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i))) + } + glog.V(logger.Info).Infof("syncer started: %v", state) + // launch chunk delivery service + go self.syncDeliveries() + // launch sync task manager + if self.syncF() { + go self.sync() + } + // process unsynced keys to broadcast + go self.syncUnsyncedKeys() + + return self, nil +} + +// metadata serialisation +func encodeSync(state *syncState) (*json.RawMessage, error) { + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return nil, err + } + meta := json.RawMessage(data) + return &meta, nil +} + +func decodeSync(meta *json.RawMessage) (*syncState, error) { + if meta == nil { + return nil, fmt.Errorf("unable to deserialise sync state from <nil>") + } + data := []byte(*(meta)) + if len(data) == 0 { + return nil, fmt.Errorf("unable to deserialise sync state from <nil>") + } + state := &syncState{DbSyncState: &storage.DbSyncState{}} + err := json.Unmarshal(data, state) + return state, err +} + +/* + sync implements the syncing script + * first all items left in the request Db are replayed + * type = StaleSync + * Mode: by default once again via confirmation roundtrip + * Priority: the items are replayed as the proirity specified for StaleSync + * but within the order respects earlier priority level of request + * after all items are consumed for a priority level, the the respective + queue for delivery requests is open (this way new reqs not written to db) + (TODO: this should be checked) + * the sync state provided by the remote peer is used to sync history + * all the backlog from earlier (aborted) syncing is completed starting from latest + * if Last < LastSeenAt then all items in between then process all + backlog from upto last disconnect + * if Last > 0 && + + sync is called from the syncer constructor and is not supposed to be used externally +*/ +func (self *syncer) sync() { + state := self.state + // sync finished + defer close(self.syncStates) + + // 0. first replay stale requests from request db + if state.SessionAt == 0 { + glog.V(logger.Debug).Infof("syncer[%v]: nothing to sync", self.key.Log()) + return + } + glog.V(logger.Debug).Infof("syncer[%v]: start replaying stale requests from request db", self.key.Log()) + for p := priorities - 1; p >= 0; p-- { + self.queues[p].dbRead(false, 0, self.replay()) + } + glog.V(logger.Debug).Infof("syncer[%v]: done replaying stale requests from request db", self.key.Log()) + + // unless peer is synced sync unfinished history beginning on + if !state.Synced { + start := state.Start + + if !storage.IsZeroKey(state.Latest) { + // 1. there is unfinished earlier sync + state.Start = state.Latest + glog.V(logger.Debug).Infof("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state) + // blocks while the entire history upto state is synced + self.syncState(state) + if state.Last < state.SessionAt { + state.First = state.Last + 1 + } + } + state.Latest = storage.ZeroKey + state.Start = start + // 2. sync up to last disconnect1 + if state.First < state.LastSeenAt { + state.Last = state.LastSeenAt + glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state) + self.syncState(state) + state.First = state.LastSeenAt + } + state.Latest = storage.ZeroKey + + } else { + // synchronisation starts at end of last session + state.First = state.LastSeenAt + } + + // 3. sync up to current session start + // if there have been new chunks since last session + if state.LastSeenAt < state.SessionAt { + state.Last = state.SessionAt + glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state) + // blocks until state syncing is finished + self.syncState(state) + } + glog.V(logger.Info).Infof("syncer[%v]: syncing all history complete", self.key.Log()) + +} + +// wait till syncronised block uptil state is synced +func (self *syncer) syncState(state *syncState) { + self.syncStates <- state + select { + case <-state.synced: + case <-self.quit: + } +} + +// stop quits both request processor and saves the request cache to disk +func (self *syncer) stop() { + close(self.quit) + glog.V(logger.Detail).Infof("syncer[%v]: stop and save sync request db backlog", self.key.Log()) + for _, db := range self.queues { + db.stop() + } +} + +// rlp serialisable sync request +type syncRequest struct { + Key storage.Key + Priority uint +} + +func (self *syncRequest) String() string { + return fmt.Sprintf("<Key: %v, Priority: %v>", self.Key.Log(), self.Priority) +} + +func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) { + key, _, _, _, err := parseRequest(req) + // TODO: if req has chunk, it should be put in a cache + // create + if err != nil { + return nil, err + } + return &syncRequest{key, uint(p)}, nil +} + +// serves historical items from the DB +// * read is on demand, blocking unless history channel is read +// * accepts sync requests (syncStates) to create new db iterator +// * closes the channel one iteration finishes +func (self *syncer) syncHistory(state *syncState) chan interface{} { + var n uint + history := make(chan interface{}) + glog.V(logger.Debug).Infof("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop) + it := self.dbAccess.iterator(state) + if it != nil { + go func() { + // signal end of the iteration ended + defer close(history) + IT: + for { + key := it.Next() + if key == nil { + break IT + } + select { + // blocking until history channel is read from + case history <- storage.Key(key): + n++ + glog.V(logger.Detail).Infof("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n) + state.Latest = key + case <-self.quit: + return + } + } + glog.V(logger.Debug).Infof("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n) + }() + } + return history +} + +// triggers key syncronisation +func (self *syncer) sendUnsyncedKeys() { + select { + case self.deliveryRequest <- true: + default: + } +} + +// assembles a new batch of unsynced keys +// * keys are drawn from the key buffers in order of priority queue +// * if the queues of priority for History (HistoryReq) or higher are depleted, +// historical data is used so historical items are lower priority within +// their priority group. +// * Order of historical data is unspecified +func (self *syncer) syncUnsyncedKeys() { + // send out new + var unsynced []*syncRequest + var more, justSynced bool + var keyCount, historyCnt int + var history chan interface{} + + priority := High + keys := self.keys[priority] + var newUnsyncedKeys, deliveryRequest chan bool + keyCounts := make([]int, priorities) + histPrior := self.SyncPriorities[HistoryReq] + syncStates := self.syncStates + state := self.state + +LOOP: + for { + + var req interface{} + // select the highest priority channel to read from + // keys channels are buffered so the highest priority ones + // are checked first - integrity can only be guaranteed if writing + // is locked while selecting + if priority != High || len(keys) == 0 { + // selection is not needed if the High priority queue has items + keys = nil + PRIORITIES: + for priority = High; priority >= 0; priority-- { + // the first priority channel that is non-empty will be assigned to keys + if len(self.keys[priority]) > 0 { + glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority) + keys = self.keys[priority] + break PRIORITIES + } + glog.V(logger.Detail).Infof("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])) + // if the input queue is empty on this level, resort to history if there is any + if uint(priority) == histPrior && history != nil { + glog.V(logger.Detail).Infof("syncer[%v]: reading history for %v", self.key.Log(), self.key) + keys = history + break PRIORITIES + } + } + } + + // if peer ready to receive but nothing to send + if keys == nil && deliveryRequest == nil { + // if no items left and switch to waiting mode + glog.V(logger.Detail).Infof("syncer[%v]: buffers consumed. Waiting", self.key.Log()) + newUnsyncedKeys = self.newUnsyncedKeys + } + + // send msg iff + // * peer is ready to receive keys AND ( + // * all queues and history are depleted OR + // * batch full OR + // * all history have been consumed, synced) + if deliveryRequest == nil && + (justSynced || + len(unsynced) > 0 && keys == nil || + len(unsynced) == int(self.SyncBatchSize)) { + justSynced = false + // listen to requests + deliveryRequest = self.deliveryRequest + newUnsyncedKeys = nil // not care about data until next req comes in + // set sync to current counter + // (all nonhistorical outgoing traffic sheduled and persisted + state.LastSeenAt = self.dbAccess.counter() + state.Latest = storage.ZeroKey + glog.V(logger.Detail).Infof("syncer[%v]: sending %v", self.key.Log(), unsynced) + // send the unsynced keys + stateCopy := *state + err := self.unsyncedKeys(unsynced, &stateCopy) + if err != nil { + glog.V(logger.Warn).Infof("syncer[%v]: unable to send unsynced keys: %v", err) + } + self.state = state + glog.V(logger.Debug).Infof("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy) + unsynced = nil + keys = nil + } + + // process item and add it to the batch + select { + case <-self.quit: + break LOOP + case req, more = <-keys: + if keys == history && !more { + glog.V(logger.Detail).Infof("syncer[%v]: syncing history segment complete", self.key.Log()) + // history channel is closed, waiting for new state (called from sync()) + syncStates = self.syncStates + state.Synced = true // this signals that the current segment is complete + select { + case state.synced <- false: + case <-self.quit: + break LOOP + } + justSynced = true + history = nil + } + case <-deliveryRequest: + glog.V(logger.Detail).Infof("syncer[%v]: peer ready to receive", self.key.Log()) + + // this 1 cap channel can wake up the loop + // signaling that peer is ready to receive unsynced Keys + // the channel is set to nil any further writes will be ignored + deliveryRequest = nil + + case <-newUnsyncedKeys: + glog.V(logger.Detail).Infof("syncer[%v]: new unsynced keys available", self.key.Log()) + // this 1 cap channel can wake up the loop + // signals that data is available to send if peer is ready to receive + newUnsyncedKeys = nil + keys = self.keys[High] + + case state, more = <-syncStates: + // this resets the state + if !more { + state = self.state + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state) + state.Synced = true + syncStates = nil + } else { + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior) + state.Synced = false + history = self.syncHistory(state) + // only one history at a time, only allow another one once the + // history channel is closed + syncStates = nil + } + } + if req == nil { + continue LOOP + } + + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req) + keyCounts[priority]++ + keyCount++ + if keys == history { + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + historyCnt++ + } + if sreq, err := self.newSyncRequest(req, priority); err == nil { + // extract key from req + glog.V(logger.Detail).Infof("syncer(priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + unsynced = append(unsynced, sreq) + } else { + glog.V(logger.Warn).Infof("syncer(priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err) + } + + } +} + +// delivery loop +// takes into account priority, send store Requests with chunk (delivery) +// idle blocking if no new deliveries in any of the queues +func (self *syncer) syncDeliveries() { + var req *storeRequestMsgData + p := High + var deliveries chan *storeRequestMsgData + var msg *storeRequestMsgData + var err error + var c = [priorities]int{} + var n = [priorities]int{} + var total, success uint + + for { + deliveries = self.deliveries[p] + select { + case req = <-deliveries: + n[p]++ + c[p]++ + default: + if p == Low { + // blocking, depletion on all channels, no preference for priority + select { + case req = <-self.deliveries[High]: + n[High]++ + case req = <-self.deliveries[Medium]: + n[Medium]++ + case req = <-self.deliveries[Low]: + n[Low]++ + case <-self.quit: + return + } + p = High + } else { + p-- + continue + } + } + total++ + msg, err = self.newStoreRequestMsgData(req) + if err != nil { + glog.V(logger.Warn).Infof("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err) + } else { + err = self.store(msg) + if err != nil { + glog.V(logger.Warn).Infof("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err) + } else { + success++ + glog.V(logger.Detail).Infof("syncer[%v]: %v successfully delivered", self.key.Log(), req) + } + } + if total%self.SyncBatchSize == 0 { + glog.V(logger.Debug).Infof("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]) + } + } +} + +/* + addRequest handles requests for delivery + it accepts 4 types: + + * storeRequestMsgData: coming from netstore propagate response + * chunk: coming from forwarding (questionable: id?) + * key: from incoming syncRequest + * syncDbEntry: key,id encoded in db + + If sync mode is on for the type of request, then + it sends the request to the keys queue of the correct priority + channel buffered with capacity (SyncBufferSize) + + If sync mode is off then, requests are directly sent to deliveries +*/ +func (self *syncer) addRequest(req interface{}, ty int) { + // retrieve priority for request type name int8 + + priority := self.SyncPriorities[ty] + // sync mode for this type ON + if self.syncF() || ty == DeliverReq { + if self.SyncModes[ty] { + self.addKey(req, priority, self.quit) + } else { + self.addDelivery(req, priority, self.quit) + } + } +} + +// addKey queues sync request for sync confirmation with given priority +// ie the key will go out in an unsyncedKeys message +func (self *syncer) addKey(req interface{}, priority uint, quit chan bool) bool { + select { + case self.keys[priority] <- req: + // this wakes up the unsynced keys loop if idle + select { + case self.newUnsyncedKeys <- true: + default: + } + return true + case <-quit: + return false + } +} + +// addDelivery queues delivery request for with given priority +// ie the chunk will be delivered ASAP mod priority queueing handled by syncdb +// requests are persisted across sessions for correct sync +func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) bool { + select { + case self.queues[priority].buffer <- req: + return true + case <-quit: + return false + } +} + +// doDelivery delivers the chunk for the request with given priority +// without queuing +func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool { + msgdata, err := self.newStoreRequestMsgData(req) + if err != nil { + glog.V(logger.Warn).Infof("unable to deliver request %v: %v", msgdata, err) + return false + } + select { + case self.deliveries[priority] <- msgdata: + return true + case <-quit: + return false + } +} + +// returns the delivery function for given priority +// passed on to syncDb +func (self *syncer) deliver(priority uint) func(req interface{}, quit chan bool) bool { + return func(req interface{}, quit chan bool) bool { + return self.doDelivery(req, priority, quit) + } +} + +// returns the replay function passed on to syncDb +// depending on sync mode settings for BacklogReq, +// re play of request db backlog sends items via confirmation +// or directly delivers +func (self *syncer) replay() func(req interface{}, quit chan bool) bool { + sync := self.SyncModes[BacklogReq] + priority := self.SyncPriorities[BacklogReq] + // sync mode for this type ON + if sync { + return func(req interface{}, quit chan bool) bool { + return self.addKey(req, priority, quit) + } + } else { + return func(req interface{}, quit chan bool) bool { + return self.doDelivery(req, priority, quit) + } + + } +} + +// given a request, extends it to a full storeRequestMsgData +// polimorphic: see addRequest for the types accepted +func (self *syncer) newStoreRequestMsgData(req interface{}) (*storeRequestMsgData, error) { + + key, id, chunk, sreq, err := parseRequest(req) + if err != nil { + return nil, err + } + + if sreq == nil { + if chunk == nil { + var err error + chunk, err = self.dbAccess.get(key) + if err != nil { + return nil, err + } + } + + sreq = &storeRequestMsgData{ + Id: id, + Key: chunk.Key, + SData: chunk.SData, + } + } + + return sreq, nil +} + +// parse request types and extracts, key, id, chunk, request if available +// does not do chunk lookup ! +func parseRequest(req interface{}) (storage.Key, uint64, *storage.Chunk, *storeRequestMsgData, error) { + var key storage.Key + var entry *syncDbEntry + var chunk *storage.Chunk + var id uint64 + var ok bool + var sreq *storeRequestMsgData + var err error + + if key, ok = req.(storage.Key); ok { + id = generateId() + + } else if entry, ok = req.(*syncDbEntry); ok { + id = binary.BigEndian.Uint64(entry.val[32:]) + key = storage.Key(entry.val[:32]) + + } else if chunk, ok = req.(*storage.Chunk); ok { + key = chunk.Key + id = generateId() + + } else if sreq, ok = req.(*storeRequestMsgData); ok { + key = sreq.Key + } else { + err = fmt.Errorf("type not allowed: %v (%T)", req, req) + } + + return key, id, chunk, sreq, err +} |