diff options
author | Balint Gabor <balint.g@gmail.com> | 2018-09-13 17:42:19 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-13 17:42:19 +0800 |
commit | 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch) | |
tree | 62a2896b3b824449595272f0b92dda877ba1c58d /swarm/network/stream | |
parent | ff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff) | |
download | go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip |
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org>
Co-authored-by: Balint Gabor <balint.g@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/network/stream')
-rw-r--r-- | swarm/network/stream/common_test.go | 17 | ||||
-rw-r--r-- | swarm/network/stream/delivery.go | 231 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 93 | ||||
-rw-r--r-- | swarm/network/stream/intervals_test.go | 78 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 87 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 51 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_retrieval_test.go | 46 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 170 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 28 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 8 | ||||
-rw-r--r-- | swarm/network/stream/syncer.go | 94 | ||||
-rw-r--r-- | swarm/network/stream/syncer_test.go | 32 |
12 files changed, 504 insertions, 431 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 491dc9fd5..e0d776e34 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -107,9 +107,14 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora return nil, nil, nil, removeDataDir, err } - db := storage.NewDBAPI(localStore) - delivery := NewDelivery(to, db) - streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, nil, removeDataDir, err + } + + delivery := NewDelivery(to, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) teardown := func() { streamer.Close() removeDataDir() @@ -150,14 +155,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { } } -func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) { +func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) { return nil, errors.New("get not well defined on round robin store") } -func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) { +func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) - rrs.stores[idx].Put(ctx, chunk) + return rrs.stores[idx].Put(ctx, chunk) } func (rrs *roundRobinStore) Close() { diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 627352535..d0f27eebc 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -19,12 +19,11 @@ package stream import ( "context" "errors" - "time" - "github.com/ethereum/go-ethereum/common" + "fmt" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/discover" - cp "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -46,39 +45,34 @@ var ( ) type Delivery struct { - db *storage.DBAPI - kad *network.Kademlia - receiveC chan *ChunkDeliveryMsg - getPeer func(discover.NodeID) *Peer + chunkStore storage.SyncChunkStore + kad *network.Kademlia + getPeer func(discover.NodeID) *Peer } -func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery { - d := &Delivery{ - db: db, - kad: kad, - receiveC: make(chan *ChunkDeliveryMsg, deliveryCap), +func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery { + return &Delivery{ + chunkStore: chunkStore, + kad: kad, } - - go d.processReceivedChunks() - return d } // SwarmChunkServer implements Server type SwarmChunkServer struct { deliveryC chan []byte batchC chan []byte - db *storage.DBAPI + chunkStore storage.ChunkStore currentLen uint64 quit chan struct{} } // NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer { +func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - db: db, - quit: make(chan struct{}), + deliveryC: make(chan []byte, deliveryCap), + batchC: make(chan []byte), + chunkStore: chunkStore, + quit: make(chan struct{}), } go s.processDeliveries() return s @@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() { // GetData retrives chunk data from db store func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.chunkStore.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // RetrieveRequestMsg is the protocol msg for chunk retrieve requests @@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return err } streamer := s.Server.(*SwarmChunkServer) - chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr) - if chunk.ReqC != nil { - if created { - if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil { - log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err) - chunk.SetErrored(storage.ErrChunkForward) - return nil - } + + var cancel func() + // TODO: do something with this hardcoded timeout, maybe use TTL in the future + ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout) + + go func() { + select { + case <-ctx.Done(): + case <-streamer.quit: } - go func() { - var osp opentracing.Span - ctx, osp = spancontext.StartSpan( - ctx, - "waiting.delivery") - defer osp.Finish() - - t := time.NewTimer(10 * time.Minute) - defer t.Stop() - - log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created) - start := time.Now() - select { - case <-chunk.ReqC: - log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start)) - case <-t.C: - log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr) - chunk.SetErrored(storage.ErrChunkTimeout) - return - } - chunk.SetErrored(nil) - - if req.SkipCheck { - err := sp.Deliver(ctx, chunk, s.priority) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err) - sp.Drop(err) - } + cancel() + }() + + go func() { + chunk, err := d.chunkStore.Get(ctx, req.Addr) + if err != nil { + log.Warn("ChunkStore.Get can not retrieve chunk", "err", err) + return + } + if req.SkipCheck { + err = sp.Deliver(ctx, chunk, s.priority) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - streamer.deliveryC <- chunk.Addr[:] - }() - return nil - } - // TODO: call the retrieve function of the outgoing syncer - if req.SkipCheck { - log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr) - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr) + return } - return sp.Deliver(ctx, chunk, s.priority) - } - streamer.deliveryC <- chunk.Addr[:] + select { + case streamer.deliveryC <- chunk.Address()[:]: + case <-streamer.quit: + } + + }() + return nil } @@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct { peer *Peer // set in handleChunkDeliveryMsg } +// TODO: Fix context SNAFU func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { var osp opentracing.Span ctx, osp = spancontext.StartSpan( @@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch "chunk.delivery") defer osp.Finish() - req.peer = sp - d.receiveC <- req - return nil -} + processReceivedChunksCount.Inc(1) -func (d *Delivery) processReceivedChunks() { -R: - for req := range d.receiveC { - processReceivedChunksCount.Inc(1) - - if len(req.SData) > cp.DefaultSize+8 { - log.Warn("received chunk is bigger than expected", "len", len(req.SData)) - continue R - } - - // this should be has locally - chunk, err := d.db.Get(context.TODO(), req.Addr) - if err == nil { - continue R - } - if err != storage.ErrFetching { - log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk) - continue R - } - select { - case <-chunk.ReqC: - log.Error("someone else delivered?", "hash", chunk.Addr.Hex()) - continue R - default: - } - - chunk.SData = req.SData - d.db.Put(context.TODO(), chunk) - - go func(req *ChunkDeliveryMsg) { - err := chunk.WaitToStore() + go func() { + req.peer = sp + err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) + if err != nil { if err == storage.ErrChunkInvalid { + // we removed this log because it spams the logs + // TODO: Enable this log line + // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, ) req.peer.Drop(err) } - }(req) - } + } + }() + return nil } // RequestFromPeers sends a chunk retrieve request to -func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { - var success bool - var err error +func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { requestFromPeersCount.Inc(1) + var sp *Peer + spID := req.Source - d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool { - spId := p.ID() - for _, p := range peersToSkip { - if p == spId { - log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId) + if spID != nil { + sp = d.getPeer(*spID) + if sp == nil { + return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) + } + } else { + d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { + id := p.ID() + // TODO: skip light nodes that do not accept retrieve requests + if req.SkipPeer(id.String()) { + log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) return true } - } - sp := d.getPeer(spId) + sp = d.getPeer(id) + if sp == nil { + log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) + return true + } + spID = &id + return false + }) if sp == nil { - log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId) - return true + return nil, nil, errors.New("no peer found") } - err = sp.SendPriority(ctx, &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: skipCheck, - }, Top) - if err != nil { - return true - } - requestFromPeersEachCount.Inc(1) - success = true - return false - }) - if success { - return nil } - return errors.New("no peer found") + + err := sp.SendPriority(ctx, &RetrieveRequestMsg{ + Addr: req.Addr, + SkipCheck: req.SkipCheck, + }, Top) + if err != nil { + return nil, nil, err + } + requestFromPeersEachCount.Inc(1) + + return spID, sp.quit, nil } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 972cc859a..ece54d4ee 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) { peerID := tester.IDs[0] - streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true) + ctx := context.Background() + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + streamer.delivery.RequestFromPeers(ctx, req) err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { { Code: 5, Msg: &RetrieveRequestMsg{ - Addr: chunk.Addr[:], + Addr: chunk.Address()[:], }, Peer: peerID, }, @@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { }) hash := storage.Address(hash0[:]) - chunk := storage.NewChunk(hash, nil) - chunk.SData = hash - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk := storage.NewChunk(hash, hash) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } hash = storage.Address(hash1[:]) - chunk = storage.NewChunk(hash, nil) - chunk.SData = hash1[:] - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk = storage.NewChunk(hash, hash1[:]) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { chunkKey := hash0[:] chunkData := hash1[:] - chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey) - - if !created { - t.Fatal("chunk already exists") - } - select { - case <-chunk.ReqC: - t.Fatal("chunk is already received") - default: - } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { }, }, p2ptest.Exchange{ - Label: "ChunkDeliveryRequest message", + Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ { Code: 6, @@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { if err != nil { t.Fatalf("Expected no error, got %v", err) } - - timeout := time.NewTimer(1 * time.Second) - - select { - case <-timeout.C: - t.Fatal("timeout receiving chunk") - case <-chunk.ReqC: + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // wait for the chunk to get stored + storedChunk, err := localStore.Get(ctx, chunkKey) + for err != nil { + select { + case <-ctx.Done(): + t.Fatalf("Chunk is not in localstore after timeout, err: %v", err) + default: + } + storedChunk, err = localStore.Get(ctx, chunkKey) + time.Sleep(50 * time.Millisecond) } - storedChunk, err := localStore.Get(context.TODO(), chunkKey) if err != nil { t.Fatalf("Expected no error, got %v", err) } - if !bytes.Equal(storedChunk.SData, chunkData) { + if !bytes.Equal(storedChunk.Data(), chunkData) { t.Fatal("Retrieved chunk has different data than original") } @@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) @@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - id := ctx.Config.ID addr := network.NewAddrFromNodeID(id) store, datadir, err := createTestLocalStorageForID(id, addr) @@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, DoSync: true, SyncUpdateDelay: 0, }) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index f4294134b..452aaca76 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -38,13 +38,18 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) -func TestIntervals(t *testing.T) { +func TestIntervalsLive(t *testing.T) { testIntervals(t, true, nil, false) - testIntervals(t, false, NewRange(9, 26), false) - testIntervals(t, true, NewRange(9, 26), false) - testIntervals(t, true, nil, true) +} + +func TestIntervalsHistory(t *testing.T) { + testIntervals(t, false, NewRange(9, 26), false) testIntervals(t, false, NewRange(9, 26), true) +} + +func TestIntervalsLiveAndHistory(t *testing.T) { + testIntervals(t, true, NewRange(9, 26), false) testIntervals(t, true, NewRange(9, 26), true) } @@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { os.RemoveAll(datadir) } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { - return newTestExternalClient(db), nil + return newTestExternalClient(netStore), nil }) r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil @@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() storer := nodeIDs[0] @@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { liveErrC := make(chan error) historyErrC := make(chan error) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - log.Error("WaitKademlia error: %v", "err", err) - return err - } - log.Debug("Watching for disconnections") disconnections := sim.PeerEvents( context.Background(), @@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), ) + err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) + if err != nil { + return err + } + go func() { for d := range disconnections { if d.Error != nil { @@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { var liveHashesChan chan []byte liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true)) if err != nil { - log.Error("Subscription error: %v", "err", err) + log.Error("get hashes", "err", err) return } i := externalStreamSessionAt @@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { var historyHashesChan chan []byte historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false)) if err != nil { + log.Error("get hashes", "err", err) return } @@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { } }() - err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) - if err != nil { - return err - } if err := <-liveErrC; err != nil { return err } @@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error { type testExternalClient struct { hashes chan []byte - db *storage.DBAPI + store storage.SyncChunkStore enableNotificationsC chan struct{} } -func newTestExternalClient(db *storage.DBAPI) *testExternalClient { +func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient { return &testExternalClient{ hashes: make(chan []byte), - db: db, + store: store, enableNotificationsC: make(chan struct{}), } } -func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { - chunk, _ := c.db.GetOrCreateRequest(ctx, hash) - if chunk.ReqC == nil { +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { + wait := c.store.FetchFunc(ctx, storage.Address(hash)) + if wait == nil { return nil } - c.hashes <- hash - //NOTE: This was failing on go1.9.x with a deadlock. - //Sometimes this function would just block - //It is commented now, but it may be well worth after the chunk refactor - //to re-enable this and see if the problem has been addressed - /* - return func() { - return chunk.WaitToStore() + select { + case c.hashes <- hash: + case <-ctx.Done(): + log.Warn("testExternalClient NeedData context", "err", ctx.Err()) + return func(_ context.Context) error { + return ctx.Err() } - */ - return nil + } + return wait } func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index a19f63589..2e1a81e82 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -18,9 +18,7 @@ package stream import ( "context" - "errors" "fmt" - "sync" "time" "github.com/ethereum/go-ethereum/metrics" @@ -31,6 +29,8 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) +var syncBatchTimeout = 30 * time.Second + // Stream defines a unique stream identifier. type Stream struct { // Name is used for Client and Server functions identification. @@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e go func() { if err := p.SendOfferedHashes(os, from, to); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() @@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e } go func() { if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() } @@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if err != nil { return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err) } - wg := sync.WaitGroup{} + + ctr := 0 + errC := make(chan error) + ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) + + ctx = context.WithValue(ctx, "source", p.ID().String()) for i := 0; i < len(hashes); i += HashSize { hash := hashes[i : i+HashSize] if wait := c.NeedData(ctx, hash); wait != nil { + ctr++ want.Set(i/HashSize, true) - wg.Add(1) // create request and wait until the chunk data arrives and is stored - go func(w func()) { - w() - wg.Done() + go func(w func(context.Context) error) { + select { + case errC <- w(ctx): + case <-ctx.Done(): + } }(wait) } } - // done := make(chan bool) - // go func() { - // wg.Wait() - // close(done) - // }() - // go func() { - // select { - // case <-done: - // s.next <- s.batchDone(p, req, hashes) - // case <-time.After(1 * time.Second): - // p.Drop(errors.New("timeout waiting for batch to be delivered")) - // } - // }() + go func() { - wg.Wait() + defer cancel() + for i := 0; i < ctr; i++ { + select { + case err := <-errC: + if err != nil { + log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) + p.Drop(err) + return + } + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) + return + case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return + } + } select { case c.next <- c.batchDone(p, req, hashes): case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) } }() // only send wantedKeysMsg if all missing chunks of the previous batch arrived @@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg c.sessionAt = req.From } from, to := c.nextBatch(req.To + 1) - log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To) + log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID()) if from == to { return nil } @@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg To: to, } go func() { + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) select { - case <-time.After(120 * time.Second): - log.Warn("handleOfferedHashesMsg timeout, so dropping peer") - p.Drop(errors.New("handle offered hashes timeout")) - return case err := <-c.next: if err != nil { - log.Warn("c.next dropping peer", "err", err) + log.Warn("c.next error dropping peer", "err", err) p.Drop(err) return } case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) err := p.SendPriority(ctx, msg, c.priority) if err != nil { - log.Warn("SendPriority err, so dropping peer", "err", err) - p.Drop(err) + log.Warn("SendPriority error", "err", err) } }() return nil @@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) // launch in go routine since GetBatch blocks until new hashes arrive go func() { if err := p.SendOfferedHashes(s, req.From, req.To); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "err", err) } }() // go p.SendOfferedHashes(s, req.From, req.To) @@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) if err != nil { return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } - chunk := storage.NewChunk(hash, nil) - chunk.SData = data - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr) - } + chunk := storage.NewChunk(hash, data) if err := p.Deliver(ctx, chunk, s.priority); err != nil { return err } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 80b9ab711..1466a7a9c 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -33,8 +33,6 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) -var sendTimeout = 30 * time.Second - type notFoundError struct { t string s Stream @@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) - p.Send(wmsg.Context, wmsg.Msg) + err := p.Send(wmsg.Context, wmsg.Msg) + if err != nil { + log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) + p.Drop(err) + } }) + + // basic monitoring for pq contention + go func(pq *pq.PriorityQueue) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + var len_maxi int + var cap_maxi int + for k := range pq.Queues { + if len_maxi < len(pq.Queues[k]) { + len_maxi = len(pq.Queues[k]) + } + + if cap_maxi < cap(pq.Queues[k]) { + cap_maxi = cap(pq.Queues[k]) + } + } + + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi)) + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi)) + case <-p.quit: + return + } + } + }(p.pq) + go func() { <-p.quit cancel() @@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error { +func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error { var sp opentracing.Span ctx, sp = spancontext.StartSpan( ctx, @@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8 defer sp.Finish() msg := &ChunkDeliveryMsg{ - Addr: chunk.Addr, - SData: chunk.SData, + Addr: chunk.Address(), + SData: chunk.Data(), } return p.SendPriority(ctx, msg, priority) } @@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8 func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - cctx, cancel := context.WithTimeout(context.Background(), sendTimeout) - defer cancel() wmsg := WrappedPriorityMsg{ Context: ctx, Msg: msg, } - return p.pq.Push(cctx, wmsg, int(priority)) + err := p.pq.Push(wmsg, int(priority)) + if err == pq.ErrContention { + log.Warn("dropping peer on priority queue contention", "peer", p.ID()) + p.Drop(err) + } + return err } // SendOfferedHashes sends OfferedHashesMsg protocol msg diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 4ff947b21..19eaad34e 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -124,23 +124,30 @@ func runFileRetrievalTest(nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } + localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -267,24 +274,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } + localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 0, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucketKeyFileStore = simulation.BucketKey("filestore") bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 313019d6a..7cd09099c 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "runtime" "sync" "testing" "time" @@ -39,15 +40,20 @@ import ( mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" ) -const testMinProxBinSize = 2 const MaxTimeout = 600 type synctestConfig struct { - addrs [][]byte - hashes []storage.Address - idToChunksMap map[discover.NodeID][]int - chunksToNodesMap map[string][]int - addrToIDMap map[string]discover.NodeID + addrs [][]byte + hashes []storage.Address + idToChunksMap map[discover.NodeID][]int + //chunksToNodesMap map[string][]int + addrToIDMap map[string]discover.NodeID +} + +// Tests in this file should not request chunks from peers. +// This function will panic indicating that there is a problem if request has been made. +func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { + panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String())) } //This test is a syncing test for nodes. @@ -58,6 +64,9 @@ type synctestConfig struct { //they are expected to store based on the syncing protocol. //Number of chunks and nodes can be provided via commandline too. func TestSyncingViaGlobalSync(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { @@ -86,11 +95,14 @@ func TestSyncingViaGlobalSync(t *testing.T) { } func TestSyncingViaDirectSubscribe(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) - err := testSyncingViaDirectSubscribe(*chunks, *nodes) + err := testSyncingViaDirectSubscribe(t, *chunks, *nodes) if err != nil { t.Fatal(err) } @@ -110,7 +122,7 @@ func TestSyncingViaDirectSubscribe(t *testing.T) { for _, chnk := range chnkCnt { for _, n := range nodeCnt { log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n)) - err := testSyncingViaDirectSubscribe(chnk, n) + err := testSyncingViaDirectSubscribe(t, chnk, n) if err != nil { t.Fatal(err) } @@ -130,21 +142,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -166,9 +184,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { t.Fatal(err) } - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -197,10 +233,6 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { conf.hashes = append(conf.hashes, hashes...) mapKeysToNodes(conf) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - return err - } - // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. allSuccess := false @@ -220,6 +252,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { }() } for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -252,7 +285,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -264,6 +300,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { if result.Error != nil { t.Fatal(result.Error) } + log.Info("Simulation ended") } /* @@ -277,7 +314,7 @@ The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. The snapshot should have 'streamer' in its service list. */ -func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { +func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { @@ -288,28 +325,34 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, }) defer sim.Close() - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() conf := &synctestConfig{} @@ -325,6 +368,24 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return err } + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -402,6 +463,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { // or until the timeout is reached. allSuccess := false for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -434,7 +496,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -447,7 +512,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return result.Error } - log.Info("Simulation terminated") + log.Info("Simulation ended") return nil } @@ -462,10 +527,9 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //iterate over each bin and solicit needed subscription to bins kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool { //identify begin and start index of the bin(s) we want to subscribe to - histRange := &Range{} subCnt++ - err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top) + err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High) if err != nil { log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err)) return false @@ -478,7 +542,6 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //map chunk keys to addresses which are responsible func mapKeysToNodes(conf *synctestConfig) { - kmap := make(map[string][]int) nodemap := make(map[string][]int) //build a pot for chunk hashes np := pot.NewPot(nil, 0) @@ -487,36 +550,33 @@ func mapKeysToNodes(conf *synctestConfig) { indexmap[string(a)] = i np, _, _ = pot.Add(np, a, pof) } + + var kadMinProxSize = 2 + + ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs) + //for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes)) for i := 0; i < len(conf.hashes); i++ { - pl := 256 //highest possible proximity - var nns []int + var a []byte np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool { - a := val.([]byte) - if pl < 256 && pl != po { - return false - } - if pl == 256 || pl == po { - log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)])) - nns = append(nns, indexmap[string(a)]) - nodemap[string(a)] = append(nodemap[string(a)], i) - } - if pl == 256 && len(nns) >= testMinProxBinSize { - //maxProxBinSize has been reached at this po, so save it - //we will add all other nodes at the same po - pl = po - } - return true + // take the first address + a = val.([]byte) + return false }) - kmap[string(conf.hashes[i])] = nns + + nns := ppmap[common.Bytes2Hex(a)].NNSet + nns = append(nns, a) + + for _, p := range nns { + nodemap[string(p)] = append(nodemap[string(p)], i) + } } for addr, chunks := range nodemap { //this selects which chunks are expected to be found with the given node conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks } log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap)) - conf.chunksToNodesMap = kmap } //upload a file(chunks) to a single local node store diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index deffdfc3f..1f1f34b7b 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -32,10 +32,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -43,8 +41,8 @@ const ( Mid High Top - PriorityQueue // number of queues - PriorityQueueCap = 32 // queue capacity + PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top + PriorityQueueCap = 128 // queue capacity HashSize = 32 ) @@ -73,7 +71,7 @@ type RegistryOptions struct { } // NewRegistry is Streamer constructor -func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry { +func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { if options == nil { options = &RegistryOptions{} } @@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) { - return NewSwarmChunkServer(delivery.db), nil + return NewSwarmChunkServer(delivery.chunkStore), nil }) streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live)) + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) }) - RegisterSwarmSyncerServer(streamer, db) - RegisterSwarmSyncerClient(streamer, db) + RegisterSwarmSyncerServer(streamer, syncChunkStore) + RegisterSwarmSyncerClient(streamer, syncChunkStore) if options.DoSync { // latestIntC function ensures that @@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { return peer.Send(context.TODO(), msg) } -func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error { - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "registry.retrieve") - defer sp.Finish() - - return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck) -} - func (r *Registry) NodeInfo() interface{} { return nil } @@ -557,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData(context.Context, []byte) func() + NeedData(context.Context, []byte) func(context.Context) error BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 7523860c9..06e96b9a9 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -80,15 +80,17 @@ func newTestClient(t string) *testClient { } } -func (self *testClient) NeedData(ctx context.Context, hash []byte) func() { +func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { self.receivedHashes[string(hash)] = hash if bytes.Equal(hash, hash0[:]) { - return func() { + return func(context.Context) error { <-self.wait0 + return nil } } else if bytes.Equal(hash, hash2[:]) { - return func() { + return func(context.Context) error { <-self.wait2 + return nil } } return nil diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index d7febe4a3..e9811a678 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -28,7 +28,6 @@ import ( ) const ( - // BatchSize = 2 BatchSize = 128 ) @@ -38,35 +37,37 @@ const ( // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { po uint8 - db *storage.DBAPI + store storage.SyncChunkStore sessionAt uint64 start uint64 + live bool quit chan struct{} } // NewSwarmSyncerServer is contructor for SwarmSyncerServer -func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) { - sessionAt := db.CurrentBucketStorageIndex(po) +func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { + sessionAt := syncChunkStore.BinIndex(po) var start uint64 if live { start = sessionAt } return &SwarmSyncerServer{ po: po, - db: db, + store: syncChunkStore, sessionAt: sessionAt, start: start, + live: live, quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(live, po, db) + return NewSwarmSyncerServer(live, po, syncChunkStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() { close(s.quit) } -// GetSection retrieves the actual chunk from localstore +// GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.store.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // GetBatch retrieves the next batch of hashes from the dbstore func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { var batch []byte i := 0 - if from == 0 { - from = s.start - } - if to <= from || from >= s.sessionAt { - to = math.MaxUint64 + if s.live { + if from == 0 { + from = s.start + } + if to <= from || from >= s.sessionAt { + to = math.MaxUint64 + } + } else { + if (to < from && to != 0) || from > s.sessionAt { + return nil, 0, 0, nil, nil + } + if to == 0 || to > s.sessionAt { + to = s.sessionAt + } } + var ticker *time.Ticker defer func() { if ticker != nil { @@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 } metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) - err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool { - batch = append(batch, addr[:]...) + err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { + batch = append(batch, key[:]...) i++ to = idx return i < BatchSize @@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 wait = true } - log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po)) + log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) return batch, from, to, nil, nil } @@ -146,28 +155,26 @@ type SwarmSyncerClient struct { sessionReader storage.LazySectionReader retrieveC chan *storage.Chunk storeC chan *storage.Chunk - db *storage.DBAPI + store storage.SyncChunkStore // chunker storage.Chunker - currentRoot storage.Address - requestFunc func(chunk *storage.Chunk) - end, start uint64 - peer *Peer - ignoreExistingRequest bool - stream Stream + currentRoot storage.Address + requestFunc func(chunk *storage.Chunk) + end, start uint64 + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - db: db, - peer: p, - ignoreExistingRequest: ignoreExistingRequest, - stream: stream, + store: store, + peer: p, + stream: stream, }, nil } // // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { +// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { // retrieveC := make(storage.Chunk, chunksCap) // RunChunkRequestor(p, retrieveC) // storeC := make(storage.Chunk, chunksCap) @@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) }) } // NeedData -func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) { - chunk, _ := s.db.GetOrCreateRequest(ctx, key) - // TODO: we may want to request from this peer anyway even if the request exists - - // ignoreExistingRequest is temporary commented out until its functionality is verified. - // For now, this optimization can be disabled. - if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) { - return nil - } - // create request and wait until the chunk data arrives and is stored - return func() { - chunk.WaitToStore() - } +func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { + return s.store.FetchFunc(ctx, key) } // BatchDone diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index f72aa3444..469d520f8 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -102,17 +102,22 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck } } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) - bucket.Store(bucketKeyDB, db) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyDB, netStore) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + bucket.Store(bucketKeyDelivery, delivery) - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) return r, cleanup, nil @@ -197,8 +202,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.DBAPI) - db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { + netStore := item.(*storage.NetStore) + netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { hashes[i] = append(hashes[i], addr) totalHashes++ hashCounts[i]++ @@ -216,16 +221,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.DBAPI) - chunk, err := db.Get(ctx, key) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { - continue + db := item.(*storage.NetStore) + _, err := db.Get(ctx, key) + if err == nil { + found++ } - // needed for leveldb not to be closed? - // chunk.WaitToStore() - found++ } } log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total) |