aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/depo.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/depo.go')
-rw-r--r--swarm/network/depo.go211
1 files changed, 211 insertions, 0 deletions
diff --git a/swarm/network/depo.go b/swarm/network/depo.go
new file mode 100644
index 000000000..79987cc6b
--- /dev/null
+++ b/swarm/network/depo.go
@@ -0,0 +1,211 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "bytes"
+ "encoding/binary"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// Handler for storage/retrieval related protocol requests
+// implements the StorageHandler interface used by the bzz protocol
+type Depo struct {
+ hashfunc storage.Hasher
+ localStore storage.ChunkStore
+ netStore storage.ChunkStore
+}
+
+func NewDepo(hash storage.Hasher, localStore, remoteStore storage.ChunkStore) *Depo {
+ return &Depo{
+ hashfunc: hash,
+ localStore: localStore,
+ netStore: remoteStore, // entrypoint internal
+ }
+}
+
+// Handles UnsyncedKeysMsg after msg decoding - unsynced hashes upto sync state
+// * the remote sync state is just stored and handled in protocol
+// * filters through the new syncRequests and send the ones missing
+// * back immediately as a deliveryRequest message
+// * empty message just pings back for more (is this needed?)
+// * strict signed sync states may be needed.
+func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error {
+ unsynced := req.Unsynced
+ var missing []*syncRequest
+ var chunk *storage.Chunk
+ var err error
+ for _, req := range unsynced {
+ // skip keys that are found,
+ chunk, err = self.localStore.Get(storage.Key(req.Key[:]))
+ if err != nil || chunk.SData == nil {
+ missing = append(missing, req)
+ }
+ }
+ glog.V(logger.Debug).Infof("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State)
+ glog.V(logger.Detail).Infof("Depo.HandleUnsyncedKeysMsg: received %v", unsynced)
+ // send delivery request with missing keys
+ err = p.deliveryRequest(missing)
+ if err != nil {
+ return err
+ }
+ // set peers state to persist
+ p.syncState = req.State
+ return nil
+}
+
+// Handles deliveryRequestMsg
+// * serves actual chunks asked by the remote peer
+// by pushing to the delivery queue (sync db) of the correct priority
+// (remote peer is free to reprioritize)
+// * the message implies remote peer wants more, so trigger for
+// * new outgoing unsynced keys message is fired
+func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error {
+ deliver := req.Deliver
+ // queue the actual delivery of a chunk ()
+ glog.V(logger.Detail).Infof("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver)
+ for _, sreq := range deliver {
+ // TODO: look up in cache here or in deliveries
+ // priorities are taken from the message so the remote party can
+ // reprioritise to at their leisure
+ // r = self.pullCached(sreq.Key) // pulls and deletes from cache
+ Push(p, sreq.Key, sreq.Priority)
+ }
+
+ // sends it out as unsyncedKeysMsg
+ p.syncer.sendUnsyncedKeys()
+ return nil
+}
+
+// the entrypoint for store requests coming from the bzz wire protocol
+// if key found locally, return. otherwise
+// remote is untrusted, so hash is verified and chunk passed on to NetStore
+func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
+ req.from = p
+ chunk, err := self.localStore.Get(req.Key)
+ switch {
+ case err != nil:
+ glog.V(logger.Detail).Infof("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key)
+ // not found in memory cache, ie., a genuine store request
+ // create chunk
+ chunk = storage.NewChunk(req.Key, nil)
+
+ case chunk.SData == nil:
+ // found chunk in memory store, needs the data, validate now
+ hasher := self.hashfunc()
+ hasher.Write(req.SData)
+ if !bytes.Equal(hasher.Sum(nil), req.Key) {
+ // data does not validate, ignore
+ // TODO: peer should be penalised/dropped?
+ glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)
+ return
+ }
+ glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req)
+
+ default:
+ // data is found, store request ignored
+ // this should update access count?
+ glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req)
+ return
+ }
+
+ // update chunk with size and data
+ chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data)
+ chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
+ glog.V(logger.Detail).Infof("delivery of %p from %v", chunk, p)
+ chunk.Source = p
+ self.netStore.Put(chunk)
+}
+
+// entrypoint for retrieve requests coming from the bzz wire protocol
+// checks swap balance - return if peer has no credit
+func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) {
+ req.from = p
+ // swap - record credit for 1 request
+ // note that only charge actual reqsearches
+ var err error
+ if p.swap != nil {
+ err = p.swap.Add(1)
+ }
+ if err != nil {
+ glog.V(logger.Warn).Infof("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err)
+ return
+ }
+
+ // call storage.NetStore#Get which
+ // blocks until local retrieval finished
+ // launches cloud retrieval
+ chunk, _ := self.netStore.Get(req.Key)
+ req = self.strategyUpdateRequest(chunk.Req, req)
+ // check if we can immediately deliver
+ if chunk.SData != nil {
+ glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log())
+
+ if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size {
+ sreq := &storeRequestMsgData{
+ Id: req.Id,
+ Key: chunk.Key,
+ SData: chunk.SData,
+ requestTimeout: req.timeout, //
+ }
+ p.syncer.addRequest(sreq, DeliverReq)
+ } else {
+ glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log())
+ }
+ } else {
+ glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log())
+ }
+}
+
+// add peer request the chunk and decides the timeout for the response if still searching
+func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) {
+ glog.V(logger.Detail).Infof("Depo.strategyUpdateRequest: key %v", origReq.Key.Log())
+ // we do not create an alternative one
+ req = origReq
+ if rs != nil {
+ self.addRequester(rs, req)
+ req.setTimeout(self.searchTimeout(rs, req))
+ }
+ return
+}
+
+// decides the timeout promise sent with the immediate peers response to a retrieve request
+// if timeout is explicitly set and expired
+func (self *Depo) searchTimeout(rs *storage.RequestStatus, req *retrieveRequestMsgData) (timeout *time.Time) {
+ reqt := req.getTimeout()
+ t := time.Now().Add(searchTimeout)
+ if reqt != nil && reqt.Before(t) {
+ return reqt
+ } else {
+ return &t
+ }
+}
+
+/*
+adds a new peer to an existing open request
+only add if less than requesterCount peers forwarded the same request id so far
+note this is done irrespective of status (searching or found)
+*/
+func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) {
+ glog.V(logger.Detail).Infof("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.from, req.Id)
+ list := rs.Requesters[req.Id]
+ rs.Requesters[req.Id] = append(list, req)
+}