aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/bmt/bmt.go
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2018-07-09 20:11:49 +0800
committerBalint Gabor <balint.g@gmail.com>2018-07-09 20:11:49 +0800
commitb3711af05176f446fad5ee90e2be4bd09c4086a2 (patch)
tree036eb23e423c385c0be00e3f8d3d97dea7040f8c /swarm/bmt/bmt.go
parent30bdf817a0d0afb33f3635f1de877f9caf09be05 (diff)
downloaddexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.gz
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.zst
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.zip
swarm: ctx propagation; bmt fixes; pss generic notification framework (#17150)
* cmd/swarm: minor cli flag text adjustments * swarm/api/http: sticky footer for swarm landing page using flex * swarm/api/http: sticky footer for error pages and fix for multiple choices * cmd/swarm, swarm/storage, swarm: fix mingw on windows test issues * cmd/swarm: update description of swarm cmd * swarm: added network ID test * cmd/swarm: support for smoke tests on the production swarm cluster * cmd/swarm/swarm-smoke: simplify cluster logic as per suggestion * swarm: propagate ctx to internal apis (#754) * swarm/metrics: collect disk measurements * swarm/bmt: fix io.Writer interface * Write now tolerates arbitrary variable buffers * added variable buffer tests * Write loop and finalise optimisation * refactor / rename * add tests for empty input * swarm/pss: (UPDATE) Generic notifications package (#744) swarm/pss: Generic package for creating pss notification svcs * swarm: Adding context to more functions * swarm/api: change colour of landing page in templates * swarm/api: change landing page to react to enter keypress
Diffstat (limited to 'swarm/bmt/bmt.go')
-rw-r--r--swarm/bmt/bmt.go220
1 files changed, 121 insertions, 99 deletions
diff --git a/swarm/bmt/bmt.go b/swarm/bmt/bmt.go
index 71aee2495..835587020 100644
--- a/swarm/bmt/bmt.go
+++ b/swarm/bmt/bmt.go
@@ -117,10 +117,7 @@ func NewTreePool(hasher BaseHasherFunc, segmentCount, capacity int) *TreePool {
zerohashes[0] = zeros
h := hasher()
for i := 1; i < depth; i++ {
- h.Reset()
- h.Write(zeros)
- h.Write(zeros)
- zeros = h.Sum(nil)
+ zeros = doHash(h, nil, zeros, zeros)
zerohashes[i] = zeros
}
return &TreePool{
@@ -318,41 +315,19 @@ func (h *Hasher) Sum(b []byte) (r []byte) {
// * if sequential write is used (can read sections)
func (h *Hasher) sum(b []byte, release, section bool) (r []byte) {
t := h.bmt
- h.finalise(section)
- if t.offset > 0 { // get the last node (double segment)
-
- // padding the segment with zero
- copy(t.segment[t.offset:], h.pool.zerohashes[0])
- }
- if section {
- if t.cur%2 == 1 {
- // if just finished current segment, copy it to the right half of the chunk
- copy(t.section[h.pool.SegmentSize:], t.segment)
- } else {
- // copy segment to front of section, zero pad the right half
- copy(t.section, t.segment)
- copy(t.section[h.pool.SegmentSize:], h.pool.zerohashes[0])
- }
- h.writeSection(t.cur, t.section)
- } else {
- // TODO: h.writeSegment(t.cur, t.segment)
- panic("SegmentWriter not implemented")
- }
+ bh := h.pool.hasher()
+ go h.writeSection(t.cur, t.section, true)
bmtHash := <-t.result
span := t.span
-
+ // fmt.Println(t.draw(bmtHash))
if release {
h.releaseTree()
}
- // sha3(span + BMT(pure_chunk))
+ // b + sha3(span + BMT(pure_chunk))
if span == nil {
- return bmtHash
+ return append(b, bmtHash...)
}
- bh := h.pool.hasher()
- bh.Reset()
- bh.Write(span)
- bh.Write(bmtHash)
- return bh.Sum(b)
+ return doHash(bh, b, span, bmtHash)
}
// Hasher implements the SwarmHash interface
@@ -367,37 +342,41 @@ func (h *Hasher) Write(b []byte) (int, error) {
return 0, nil
}
t := h.bmt
- need := (h.pool.SegmentCount - t.cur) * h.pool.SegmentSize
- if l < need {
- need = l
- }
- // calculate missing bit to complete current open segment
- rest := h.pool.SegmentSize - t.offset
- if need < rest {
- rest = need
- }
- copy(t.segment[t.offset:], b[:rest])
- need -= rest
- size := (t.offset + rest) % h.pool.SegmentSize
- // read full segments and the last possibly partial segment
- for need > 0 {
- // push all finished chunks we read
- if t.cur%2 == 0 {
- copy(t.section, t.segment)
- } else {
- copy(t.section[h.pool.SegmentSize:], t.segment)
- h.writeSection(t.cur, t.section)
+ secsize := 2 * h.pool.SegmentSize
+ // calculate length of missing bit to complete current open section
+ smax := secsize - t.offset
+ // if at the beginning of chunk or middle of the section
+ if t.offset < secsize {
+ // fill up current segment from buffer
+ copy(t.section[t.offset:], b)
+ // if input buffer consumed and open section not complete, then
+ // advance offset and return
+ if smax == 0 {
+ smax = secsize
+ }
+ if l <= smax {
+ t.offset += l
+ return l, nil
}
- size = h.pool.SegmentSize
- if need < size {
- size = need
+ } else {
+ if t.cur == h.pool.SegmentCount*2 {
+ return 0, nil
}
- copy(t.segment, b[rest:rest+size])
- need -= size
- rest += size
+ }
+ // read full segments and the last possibly partial segment from the input buffer
+ for smax < l {
+ // section complete; push to tree asynchronously
+ go h.writeSection(t.cur, t.section, false)
+ // reset section
+ t.section = make([]byte, secsize)
+ // copy from imput buffer at smax to right half of section
+ copy(t.section, b[smax:])
+ // advance cursor
t.cur++
+ // smax here represents successive offsets in the input buffer
+ smax += secsize
}
- t.offset = size % h.pool.SegmentSize
+ t.offset = l - smax + secsize
return l, nil
}
@@ -426,6 +405,8 @@ func (h *Hasher) releaseTree() {
t.span = nil
t.hash = nil
h.bmt = nil
+ t.section = make([]byte, h.pool.SegmentSize*2)
+ t.segment = make([]byte, h.pool.SegmentSize)
h.pool.release(t)
}
}
@@ -435,29 +416,37 @@ func (h *Hasher) releaseTree() {
// go h.run(h.bmt.leaves[i/2], h.pool.hasher(), i%2 == 0, s)
// }
-// writeSection writes the hash of i/2-th segction into right level 1 node of the BMT tree
-func (h *Hasher) writeSection(i int, section []byte) {
- n := h.bmt.leaves[i/2]
+// writeSection writes the hash of i-th section into level 1 node of the BMT tree
+func (h *Hasher) writeSection(i int, section []byte, final bool) {
+ // select the leaf node for the section
+ n := h.bmt.leaves[i]
isLeft := n.isLeft
n = n.parent
bh := h.pool.hasher()
- bh.Write(section)
- go func() {
- sum := bh.Sum(nil)
- if n == nil {
- h.bmt.result <- sum
- return
- }
- h.run(n, bh, isLeft, sum)
- }()
+ // hash the section
+ s := doHash(bh, nil, section)
+ // write hash into parent node
+ if final {
+ // for the last segment use writeFinalNode
+ h.writeFinalNode(1, n, bh, isLeft, s)
+ } else {
+ h.writeNode(n, bh, isLeft, s)
+ }
}
-// run pushes the data to the node
+// writeNode pushes the data to the node
// if it is the first of 2 sisters written the routine returns
// if it is the second, it calculates the hash and writes it
// to the parent node recursively
-func (h *Hasher) run(n *node, bh hash.Hash, isLeft bool, s []byte) {
+func (h *Hasher) writeNode(n *node, bh hash.Hash, isLeft bool, s []byte) {
+ level := 1
for {
+ // at the root of the bmt just write the result to the result channel
+ if n == nil {
+ h.bmt.result <- s
+ return
+ }
+ // otherwise assign child hash to branc
if isLeft {
n.left = s
} else {
@@ -467,44 +456,68 @@ func (h *Hasher) run(n *node, bh hash.Hash, isLeft bool, s []byte) {
if n.toggle() {
return
}
- // the second thread now can be sure both left and right children are written
- // it calculates the hash of left|right and take it to the next level
- bh.Reset()
- bh.Write(n.left)
- bh.Write(n.right)
- s = bh.Sum(nil)
-
- // at the root of the bmt just write the result to the result channel
- if n.parent == nil {
- h.bmt.result <- s
- return
- }
-
- // otherwise iterate on parent
+ // the thread coming later now can be sure both left and right children are written
+ // it calculates the hash of left|right and pushes it to the parent
+ s = doHash(bh, nil, n.left, n.right)
isLeft = n.isLeft
n = n.parent
+ level++
}
}
-// finalise is following the path starting from the final datasegment to the
+// writeFinalNode is following the path starting from the final datasegment to the
// BMT root via parents
// for unbalanced trees it fills in the missing right sister nodes using
// the pool's lookup table for BMT subtree root hashes for all-zero sections
-func (h *Hasher) finalise(skip bool) {
- t := h.bmt
- isLeft := t.cur%2 == 0
- n := t.leaves[t.cur/2]
- for level := 0; n != nil; level++ {
- // when the final segment's path is going via left child node
- // we include an all-zero subtree hash for the right level and toggle the node.
- // when the path is going through right child node, nothing to do
- if isLeft && !skip {
+// otherwise behaves like `writeNode`
+func (h *Hasher) writeFinalNode(level int, n *node, bh hash.Hash, isLeft bool, s []byte) {
+
+ for {
+ // at the root of the bmt just write the result to the result channel
+ if n == nil {
+ if s != nil {
+ h.bmt.result <- s
+ }
+ return
+ }
+ var noHash bool
+ if isLeft {
+ // coming from left sister branch
+ // when the final section's path is going via left child node
+ // we include an all-zero subtree hash for the right level and toggle the node.
+ // when the path is going through right child node, nothing to do
n.right = h.pool.zerohashes[level]
- n.toggle()
+ if s != nil {
+ n.left = s
+ // if a left final node carries a hash, it must be the first (and only thread)
+ // so the toggle is already in passive state no need no call
+ // yet thread needs to carry on pushing hash to parent
+ } else {
+ // if again first thread then propagate nil and calculate no hash
+ noHash = n.toggle()
+ }
+ } else {
+ // right sister branch
+ // if s is nil, then thread arrived first at previous node and here there will be two,
+ // so no need to do anything
+ if s != nil {
+ n.right = s
+ noHash = n.toggle()
+ } else {
+ noHash = true
+ }
+ }
+ // the child-thread first arriving will just continue resetting s to nil
+ // the second thread now can be sure both left and right children are written
+ // it calculates the hash of left|right and pushes it to the parent
+ if noHash {
+ s = nil
+ } else {
+ s = doHash(bh, nil, n.left, n.right)
}
- skip = false
isLeft = n.isLeft
n = n.parent
+ level++
}
}
@@ -525,6 +538,15 @@ func (n *node) toggle() bool {
return atomic.AddInt32(&n.state, 1)%2 == 1
}
+// calculates the hash of the data using hash.Hash
+func doHash(h hash.Hash, b []byte, data ...[]byte) []byte {
+ h.Reset()
+ for _, v := range data {
+ h.Write(v)
+ }
+ return h.Sum(b)
+}
+
func hashstr(b []byte) string {
end := len(b)
if end > 4 {