diff options
author | Balint Gabor <balint.g@gmail.com> | 2018-09-13 17:42:19 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-13 17:42:19 +0800 |
commit | 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch) | |
tree | 62a2896b3b824449595272f0b92dda877ba1c58d /swarm/network/stream/delivery.go | |
parent | ff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff) | |
download | dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst dexon-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip |
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org>
Co-authored-by: Balint Gabor <balint.g@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/network/stream/delivery.go')
-rw-r--r-- | swarm/network/stream/delivery.go | 231 |
1 files changed, 94 insertions, 137 deletions
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 627352535..d0f27eebc 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -19,12 +19,11 @@ package stream import ( "context" "errors" - "time" - "github.com/ethereum/go-ethereum/common" + "fmt" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/discover" - cp "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -46,39 +45,34 @@ var ( ) type Delivery struct { - db *storage.DBAPI - kad *network.Kademlia - receiveC chan *ChunkDeliveryMsg - getPeer func(discover.NodeID) *Peer + chunkStore storage.SyncChunkStore + kad *network.Kademlia + getPeer func(discover.NodeID) *Peer } -func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery { - d := &Delivery{ - db: db, - kad: kad, - receiveC: make(chan *ChunkDeliveryMsg, deliveryCap), +func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery { + return &Delivery{ + chunkStore: chunkStore, + kad: kad, } - - go d.processReceivedChunks() - return d } // SwarmChunkServer implements Server type SwarmChunkServer struct { deliveryC chan []byte batchC chan []byte - db *storage.DBAPI + chunkStore storage.ChunkStore currentLen uint64 quit chan struct{} } // NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer { +func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - db: db, - quit: make(chan struct{}), + deliveryC: make(chan []byte, deliveryCap), + batchC: make(chan []byte), + chunkStore: chunkStore, + quit: make(chan struct{}), } go s.processDeliveries() return s @@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() { // GetData retrives chunk data from db store func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.chunkStore.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // RetrieveRequestMsg is the protocol msg for chunk retrieve requests @@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return err } streamer := s.Server.(*SwarmChunkServer) - chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr) - if chunk.ReqC != nil { - if created { - if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil { - log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err) - chunk.SetErrored(storage.ErrChunkForward) - return nil - } + + var cancel func() + // TODO: do something with this hardcoded timeout, maybe use TTL in the future + ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout) + + go func() { + select { + case <-ctx.Done(): + case <-streamer.quit: } - go func() { - var osp opentracing.Span - ctx, osp = spancontext.StartSpan( - ctx, - "waiting.delivery") - defer osp.Finish() - - t := time.NewTimer(10 * time.Minute) - defer t.Stop() - - log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created) - start := time.Now() - select { - case <-chunk.ReqC: - log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start)) - case <-t.C: - log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr) - chunk.SetErrored(storage.ErrChunkTimeout) - return - } - chunk.SetErrored(nil) - - if req.SkipCheck { - err := sp.Deliver(ctx, chunk, s.priority) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err) - sp.Drop(err) - } + cancel() + }() + + go func() { + chunk, err := d.chunkStore.Get(ctx, req.Addr) + if err != nil { + log.Warn("ChunkStore.Get can not retrieve chunk", "err", err) + return + } + if req.SkipCheck { + err = sp.Deliver(ctx, chunk, s.priority) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - streamer.deliveryC <- chunk.Addr[:] - }() - return nil - } - // TODO: call the retrieve function of the outgoing syncer - if req.SkipCheck { - log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr) - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr) + return } - return sp.Deliver(ctx, chunk, s.priority) - } - streamer.deliveryC <- chunk.Addr[:] + select { + case streamer.deliveryC <- chunk.Address()[:]: + case <-streamer.quit: + } + + }() + return nil } @@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct { peer *Peer // set in handleChunkDeliveryMsg } +// TODO: Fix context SNAFU func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { var osp opentracing.Span ctx, osp = spancontext.StartSpan( @@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch "chunk.delivery") defer osp.Finish() - req.peer = sp - d.receiveC <- req - return nil -} + processReceivedChunksCount.Inc(1) -func (d *Delivery) processReceivedChunks() { -R: - for req := range d.receiveC { - processReceivedChunksCount.Inc(1) - - if len(req.SData) > cp.DefaultSize+8 { - log.Warn("received chunk is bigger than expected", "len", len(req.SData)) - continue R - } - - // this should be has locally - chunk, err := d.db.Get(context.TODO(), req.Addr) - if err == nil { - continue R - } - if err != storage.ErrFetching { - log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk) - continue R - } - select { - case <-chunk.ReqC: - log.Error("someone else delivered?", "hash", chunk.Addr.Hex()) - continue R - default: - } - - chunk.SData = req.SData - d.db.Put(context.TODO(), chunk) - - go func(req *ChunkDeliveryMsg) { - err := chunk.WaitToStore() + go func() { + req.peer = sp + err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) + if err != nil { if err == storage.ErrChunkInvalid { + // we removed this log because it spams the logs + // TODO: Enable this log line + // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, ) req.peer.Drop(err) } - }(req) - } + } + }() + return nil } // RequestFromPeers sends a chunk retrieve request to -func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { - var success bool - var err error +func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { requestFromPeersCount.Inc(1) + var sp *Peer + spID := req.Source - d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool { - spId := p.ID() - for _, p := range peersToSkip { - if p == spId { - log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId) + if spID != nil { + sp = d.getPeer(*spID) + if sp == nil { + return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) + } + } else { + d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { + id := p.ID() + // TODO: skip light nodes that do not accept retrieve requests + if req.SkipPeer(id.String()) { + log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) return true } - } - sp := d.getPeer(spId) + sp = d.getPeer(id) + if sp == nil { + log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) + return true + } + spID = &id + return false + }) if sp == nil { - log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId) - return true + return nil, nil, errors.New("no peer found") } - err = sp.SendPriority(ctx, &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: skipCheck, - }, Top) - if err != nil { - return true - } - requestFromPeersEachCount.Inc(1) - success = true - return false - }) - if success { - return nil } - return errors.New("no peer found") + + err := sp.SendPriority(ctx, &RetrieveRequestMsg{ + Addr: req.Addr, + SkipCheck: req.SkipCheck, + }, Top) + if err != nil { + return nil, nil, err + } + requestFromPeersEachCount.Inc(1) + + return spID, sp.quit, nil } |