diff options
author | Anton Evangelatov <anton.evangelatov@gmail.com> | 2018-07-09 20:11:49 +0800 |
---|---|---|
committer | Balint Gabor <balint.g@gmail.com> | 2018-07-09 20:11:49 +0800 |
commit | b3711af05176f446fad5ee90e2be4bd09c4086a2 (patch) | |
tree | 036eb23e423c385c0be00e3f8d3d97dea7040f8c /swarm/bmt | |
parent | 30bdf817a0d0afb33f3635f1de877f9caf09be05 (diff) | |
download | dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.gz dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.zst dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.zip |
swarm: ctx propagation; bmt fixes; pss generic notification framework (#17150)
* cmd/swarm: minor cli flag text adjustments
* swarm/api/http: sticky footer for swarm landing page using flex
* swarm/api/http: sticky footer for error pages and fix for multiple choices
* cmd/swarm, swarm/storage, swarm: fix mingw on windows test issues
* cmd/swarm: update description of swarm cmd
* swarm: added network ID test
* cmd/swarm: support for smoke tests on the production swarm cluster
* cmd/swarm/swarm-smoke: simplify cluster logic as per suggestion
* swarm: propagate ctx to internal apis (#754)
* swarm/metrics: collect disk measurements
* swarm/bmt: fix io.Writer interface
* Write now tolerates arbitrary variable buffers
* added variable buffer tests
* Write loop and finalise optimisation
* refactor / rename
* add tests for empty input
* swarm/pss: (UPDATE) Generic notifications package (#744)
swarm/pss: Generic package for creating pss notification svcs
* swarm: Adding context to more functions
* swarm/api: change colour of landing page in templates
* swarm/api: change landing page to react to enter keypress
Diffstat (limited to 'swarm/bmt')
-rw-r--r-- | swarm/bmt/bmt.go | 220 | ||||
-rw-r--r-- | swarm/bmt/bmt_r.go | 3 | ||||
-rw-r--r-- | swarm/bmt/bmt_test.go | 122 |
3 files changed, 220 insertions, 125 deletions
diff --git a/swarm/bmt/bmt.go b/swarm/bmt/bmt.go index 71aee2495..835587020 100644 --- a/swarm/bmt/bmt.go +++ b/swarm/bmt/bmt.go @@ -117,10 +117,7 @@ func NewTreePool(hasher BaseHasherFunc, segmentCount, capacity int) *TreePool { zerohashes[0] = zeros h := hasher() for i := 1; i < depth; i++ { - h.Reset() - h.Write(zeros) - h.Write(zeros) - zeros = h.Sum(nil) + zeros = doHash(h, nil, zeros, zeros) zerohashes[i] = zeros } return &TreePool{ @@ -318,41 +315,19 @@ func (h *Hasher) Sum(b []byte) (r []byte) { // * if sequential write is used (can read sections) func (h *Hasher) sum(b []byte, release, section bool) (r []byte) { t := h.bmt - h.finalise(section) - if t.offset > 0 { // get the last node (double segment) - - // padding the segment with zero - copy(t.segment[t.offset:], h.pool.zerohashes[0]) - } - if section { - if t.cur%2 == 1 { - // if just finished current segment, copy it to the right half of the chunk - copy(t.section[h.pool.SegmentSize:], t.segment) - } else { - // copy segment to front of section, zero pad the right half - copy(t.section, t.segment) - copy(t.section[h.pool.SegmentSize:], h.pool.zerohashes[0]) - } - h.writeSection(t.cur, t.section) - } else { - // TODO: h.writeSegment(t.cur, t.segment) - panic("SegmentWriter not implemented") - } + bh := h.pool.hasher() + go h.writeSection(t.cur, t.section, true) bmtHash := <-t.result span := t.span - + // fmt.Println(t.draw(bmtHash)) if release { h.releaseTree() } - // sha3(span + BMT(pure_chunk)) + // b + sha3(span + BMT(pure_chunk)) if span == nil { - return bmtHash + return append(b, bmtHash...) } - bh := h.pool.hasher() - bh.Reset() - bh.Write(span) - bh.Write(bmtHash) - return bh.Sum(b) + return doHash(bh, b, span, bmtHash) } // Hasher implements the SwarmHash interface @@ -367,37 +342,41 @@ func (h *Hasher) Write(b []byte) (int, error) { return 0, nil } t := h.bmt - need := (h.pool.SegmentCount - t.cur) * h.pool.SegmentSize - if l < need { - need = l - } - // calculate missing bit to complete current open segment - rest := h.pool.SegmentSize - t.offset - if need < rest { - rest = need - } - copy(t.segment[t.offset:], b[:rest]) - need -= rest - size := (t.offset + rest) % h.pool.SegmentSize - // read full segments and the last possibly partial segment - for need > 0 { - // push all finished chunks we read - if t.cur%2 == 0 { - copy(t.section, t.segment) - } else { - copy(t.section[h.pool.SegmentSize:], t.segment) - h.writeSection(t.cur, t.section) + secsize := 2 * h.pool.SegmentSize + // calculate length of missing bit to complete current open section + smax := secsize - t.offset + // if at the beginning of chunk or middle of the section + if t.offset < secsize { + // fill up current segment from buffer + copy(t.section[t.offset:], b) + // if input buffer consumed and open section not complete, then + // advance offset and return + if smax == 0 { + smax = secsize + } + if l <= smax { + t.offset += l + return l, nil } - size = h.pool.SegmentSize - if need < size { - size = need + } else { + if t.cur == h.pool.SegmentCount*2 { + return 0, nil } - copy(t.segment, b[rest:rest+size]) - need -= size - rest += size + } + // read full segments and the last possibly partial segment from the input buffer + for smax < l { + // section complete; push to tree asynchronously + go h.writeSection(t.cur, t.section, false) + // reset section + t.section = make([]byte, secsize) + // copy from imput buffer at smax to right half of section + copy(t.section, b[smax:]) + // advance cursor t.cur++ + // smax here represents successive offsets in the input buffer + smax += secsize } - t.offset = size % h.pool.SegmentSize + t.offset = l - smax + secsize return l, nil } @@ -426,6 +405,8 @@ func (h *Hasher) releaseTree() { t.span = nil t.hash = nil h.bmt = nil + t.section = make([]byte, h.pool.SegmentSize*2) + t.segment = make([]byte, h.pool.SegmentSize) h.pool.release(t) } } @@ -435,29 +416,37 @@ func (h *Hasher) releaseTree() { // go h.run(h.bmt.leaves[i/2], h.pool.hasher(), i%2 == 0, s) // } -// writeSection writes the hash of i/2-th segction into right level 1 node of the BMT tree -func (h *Hasher) writeSection(i int, section []byte) { - n := h.bmt.leaves[i/2] +// writeSection writes the hash of i-th section into level 1 node of the BMT tree +func (h *Hasher) writeSection(i int, section []byte, final bool) { + // select the leaf node for the section + n := h.bmt.leaves[i] isLeft := n.isLeft n = n.parent bh := h.pool.hasher() - bh.Write(section) - go func() { - sum := bh.Sum(nil) - if n == nil { - h.bmt.result <- sum - return - } - h.run(n, bh, isLeft, sum) - }() + // hash the section + s := doHash(bh, nil, section) + // write hash into parent node + if final { + // for the last segment use writeFinalNode + h.writeFinalNode(1, n, bh, isLeft, s) + } else { + h.writeNode(n, bh, isLeft, s) + } } -// run pushes the data to the node +// writeNode pushes the data to the node // if it is the first of 2 sisters written the routine returns // if it is the second, it calculates the hash and writes it // to the parent node recursively -func (h *Hasher) run(n *node, bh hash.Hash, isLeft bool, s []byte) { +func (h *Hasher) writeNode(n *node, bh hash.Hash, isLeft bool, s []byte) { + level := 1 for { + // at the root of the bmt just write the result to the result channel + if n == nil { + h.bmt.result <- s + return + } + // otherwise assign child hash to branc if isLeft { n.left = s } else { @@ -467,44 +456,68 @@ func (h *Hasher) run(n *node, bh hash.Hash, isLeft bool, s []byte) { if n.toggle() { return } - // the second thread now can be sure both left and right children are written - // it calculates the hash of left|right and take it to the next level - bh.Reset() - bh.Write(n.left) - bh.Write(n.right) - s = bh.Sum(nil) - - // at the root of the bmt just write the result to the result channel - if n.parent == nil { - h.bmt.result <- s - return - } - - // otherwise iterate on parent + // the thread coming later now can be sure both left and right children are written + // it calculates the hash of left|right and pushes it to the parent + s = doHash(bh, nil, n.left, n.right) isLeft = n.isLeft n = n.parent + level++ } } -// finalise is following the path starting from the final datasegment to the +// writeFinalNode is following the path starting from the final datasegment to the // BMT root via parents // for unbalanced trees it fills in the missing right sister nodes using // the pool's lookup table for BMT subtree root hashes for all-zero sections -func (h *Hasher) finalise(skip bool) { - t := h.bmt - isLeft := t.cur%2 == 0 - n := t.leaves[t.cur/2] - for level := 0; n != nil; level++ { - // when the final segment's path is going via left child node - // we include an all-zero subtree hash for the right level and toggle the node. - // when the path is going through right child node, nothing to do - if isLeft && !skip { +// otherwise behaves like `writeNode` +func (h *Hasher) writeFinalNode(level int, n *node, bh hash.Hash, isLeft bool, s []byte) { + + for { + // at the root of the bmt just write the result to the result channel + if n == nil { + if s != nil { + h.bmt.result <- s + } + return + } + var noHash bool + if isLeft { + // coming from left sister branch + // when the final section's path is going via left child node + // we include an all-zero subtree hash for the right level and toggle the node. + // when the path is going through right child node, nothing to do n.right = h.pool.zerohashes[level] - n.toggle() + if s != nil { + n.left = s + // if a left final node carries a hash, it must be the first (and only thread) + // so the toggle is already in passive state no need no call + // yet thread needs to carry on pushing hash to parent + } else { + // if again first thread then propagate nil and calculate no hash + noHash = n.toggle() + } + } else { + // right sister branch + // if s is nil, then thread arrived first at previous node and here there will be two, + // so no need to do anything + if s != nil { + n.right = s + noHash = n.toggle() + } else { + noHash = true + } + } + // the child-thread first arriving will just continue resetting s to nil + // the second thread now can be sure both left and right children are written + // it calculates the hash of left|right and pushes it to the parent + if noHash { + s = nil + } else { + s = doHash(bh, nil, n.left, n.right) } - skip = false isLeft = n.isLeft n = n.parent + level++ } } @@ -525,6 +538,15 @@ func (n *node) toggle() bool { return atomic.AddInt32(&n.state, 1)%2 == 1 } +// calculates the hash of the data using hash.Hash +func doHash(h hash.Hash, b []byte, data ...[]byte) []byte { + h.Reset() + for _, v := range data { + h.Write(v) + } + return h.Sum(b) +} + func hashstr(b []byte) string { end := len(b) if end > 4 { diff --git a/swarm/bmt/bmt_r.go b/swarm/bmt/bmt_r.go index c61d2dc73..0cb6c146f 100644 --- a/swarm/bmt/bmt_r.go +++ b/swarm/bmt/bmt_r.go @@ -80,6 +80,5 @@ func (rh *RefHasher) hash(data []byte, length int) []byte { } rh.hasher.Reset() rh.hasher.Write(section) - s := rh.hasher.Sum(nil) - return s + return rh.hasher.Sum(nil) } diff --git a/swarm/bmt/bmt_test.go b/swarm/bmt/bmt_test.go index e074d90e7..ae40eadab 100644 --- a/swarm/bmt/bmt_test.go +++ b/swarm/bmt/bmt_test.go @@ -34,12 +34,12 @@ import ( // the actual data length generated (could be longer than max datalength of the BMT) const BufferSize = 4128 +var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128} + +// calculates the Keccak256 SHA3 hash of the data func sha3hash(data ...[]byte) []byte { h := sha3.NewKeccak256() - for _, v := range data { - h.Write(v) - } - return h.Sum(nil) + return doHash(h, nil, data...) } // TestRefHasher tests that the RefHasher computes the expected BMT hash for @@ -129,31 +129,48 @@ func TestRefHasher(t *testing.T) { } } -func TestHasherCorrectness(t *testing.T) { - err := testHasher(testBaseHasher) - if err != nil { - t.Fatal(err) +// tests if hasher responds with correct hash +func TestHasherEmptyData(t *testing.T) { + hasher := sha3.NewKeccak256 + var data []byte + for _, count := range counts { + t.Run(fmt.Sprintf("%d_segments", count), func(t *testing.T) { + pool := NewTreePool(hasher, count, PoolSize) + defer pool.Drain(0) + bmt := New(pool) + rbmt := NewRefHasher(hasher, count) + refHash := rbmt.Hash(data) + expHash := Hash(bmt, nil, data) + if !bytes.Equal(expHash, refHash) { + t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash) + } + }) } } -func testHasher(f func(BaseHasherFunc, []byte, int, int) error) error { +func TestHasherCorrectness(t *testing.T) { data := newData(BufferSize) hasher := sha3.NewKeccak256 size := hasher().Size() - counts := []int{1, 2, 3, 4, 5, 8, 16, 32, 64, 128} var err error for _, count := range counts { - max := count * size - incr := 1 - for n := 1; n <= max; n += incr { - err = f(hasher, data, n, count) - if err != nil { - return err + t.Run(fmt.Sprintf("segments_%v", count), func(t *testing.T) { + max := count * size + incr := 1 + capacity := 1 + pool := NewTreePool(hasher, count, capacity) + defer pool.Drain(0) + for n := 0; n <= max; n += incr { + incr = 1 + rand.Intn(5) + bmt := New(pool) + err = testHasherCorrectness(bmt, hasher, data, n, count) + if err != nil { + t.Fatal(err) + } } - } + }) } - return nil } // Tests that the BMT hasher can be synchronously reused with poolsizes 1 and PoolSize @@ -215,12 +232,69 @@ LOOP: } } -// helper function that creates a tree pool -func testBaseHasher(hasher BaseHasherFunc, d []byte, n, count int) error { - pool := NewTreePool(hasher, count, 1) - defer pool.Drain(0) - bmt := New(pool) - return testHasherCorrectness(bmt, hasher, d, n, count) +// Tests BMT Hasher io.Writer interface is working correctly +// even multiple short random write buffers +func TestBMTHasherWriterBuffers(t *testing.T) { + hasher := sha3.NewKeccak256 + + for _, count := range counts { + t.Run(fmt.Sprintf("%d_segments", count), func(t *testing.T) { + errc := make(chan error) + pool := NewTreePool(hasher, count, PoolSize) + defer pool.Drain(0) + n := count * 32 + bmt := New(pool) + data := newData(n) + rbmt := NewRefHasher(hasher, count) + refHash := rbmt.Hash(data) + expHash := Hash(bmt, nil, data) + if !bytes.Equal(expHash, refHash) { + t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash) + } + attempts := 10 + f := func() error { + bmt := New(pool) + bmt.Reset() + var buflen int + for offset := 0; offset < n; offset += buflen { + buflen = rand.Intn(n-offset) + 1 + read, err := bmt.Write(data[offset : offset+buflen]) + if err != nil { + return err + } + if read != buflen { + return fmt.Errorf("incorrect read. expected %v bytes, got %v", buflen, read) + } + } + hash := bmt.Sum(nil) + if !bytes.Equal(hash, expHash) { + return fmt.Errorf("hash mismatch. expected %x, got %x", hash, expHash) + } + return nil + } + + for j := 0; j < attempts; j++ { + go func() { + errc <- f() + }() + } + timeout := time.NewTimer(2 * time.Second) + for { + select { + case err := <-errc: + if err != nil { + t.Fatal(err) + } + attempts-- + if attempts == 0 { + return + } + case <-timeout.C: + t.Fatalf("timeout") + } + } + }) + } } // helper function that compares reference and optimised implementations on |