aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/pyramid.go
blob: 74e00a497133a093a100ae96a7bc90a9e2b3ea5a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package storage

import (
    "encoding/binary"
    "fmt"
    "io"
    "math"
    "strings"
    "sync"

    "github.com/ethereum/go-ethereum/common"
)

const (
    processors = 8
)

type Tree struct {
    Chunks int64
    Levels []map[int64]*Node
    Lock   sync.RWMutex
}

type Node struct {
    Pending  int64
    Size     uint64
    Children []common.Hash
    Last     bool
}

func (self *Node) String() string {
    var children []string
    for _, node := range self.Children {
        children = append(children, node.Hex())
    }
    return fmt.Sprintf("pending: %v, size: %v, last :%v, children: %v", self.Pending, self.Size, self.Last, strings.Join(children, ", "))
}

type Task struct {
    Index int64 // Index of the chunk being processed
    Size  uint64
    Data  []byte // Binary blob of the chunk
    Last  bool
}

type PyramidChunker struct {
    hashFunc    Hasher
    chunkSize   int64
    hashSize    int64
    branches    int64
    workerCount int
}

func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
    self = &PyramidChunker{}
    self.hashFunc = MakeHashFunc(params.Hash)
    self.branches = params.Branches
    self.hashSize = int64(self.hashFunc().Size())
    self.chunkSize = self.hashSize * self.branches
    self.workerCount = 1
    return
}

func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {

    chunks := (size + self.chunkSize - 1) / self.chunkSize
    depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1

    results := Tree{
        Chunks: chunks,
        Levels: make([]map[int64]*Node, depth),
    }
    for i := 0; i < depth; i++ {
        results.Levels[i] = make(map[int64]*Node)
    }
    // Create a pool of workers to crunch through the file
    tasks := make(chan *Task, 2*processors)
    pend := new(sync.WaitGroup)
    abortC := make(chan bool)
    for i := 0; i < processors; i++ {
        pend.Add(1)
        go self.processor(pend, swg, tasks, chunkC, &results)
    }
    // Feed the chunks into the task pool
    read := 0
    for index := 0; ; index++ {
        buffer := make([]byte, self.chunkSize+8)
        n, err := data.Read(buffer[8:])
        read += n
        last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF
        if err != nil && !last {
            close(abortC)
            break
        }
        binary.LittleEndian.PutUint64(buffer[:8], uint64(n))
        pend.Add(1)
        select {
        case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}:
        case <-abortC:
            return nil, err
        }
        if last {
            break
        }
    }
    // Wait for the workers and return
    close(tasks)
    pend.Wait()

    key := results.Levels[0][0].Children[0][:]
    return key, nil
}

func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) {
    defer pend.Done()

    // Start processing leaf chunks ad infinitum
    hasher := self.hashFunc()
    for task := range tasks {
        depth, pow := len(results.Levels)-1, self.branches
        size := task.Size
        data := task.Data
        var node *Node
        for depth >= 0 {
            // New chunk received, reset the hasher and start processing
            hasher.Reset()
            if node == nil { // Leaf node, hash the data chunk
                hasher.Write(task.Data)
            } else { // Internal node, hash the children
                size = node.Size
                data = make([]byte, hasher.Size()*len(node.Children)+8)
                binary.LittleEndian.PutUint64(data[:8], size)

                hasher.Write(data[:8])
                for i, hash := range node.Children {
                    copy(data[i*hasher.Size()+8:], hash[:])
                    hasher.Write(hash[:])
                }
            }
            hash := hasher.Sum(nil)
            last := task.Last || (node != nil) && node.Last
            // Insert the subresult into the memoization tree
            results.Lock.Lock()
            if node = results.Levels[depth][task.Index/pow]; node == nil {
                // Figure out the pending tasks
                pending := self.branches
                if task.Index/pow == results.Chunks/pow {
                    pending = (results.Chunks + pow/self.branches - 1) / (pow / self.branches) % self.branches
                }
                node = &Node{pending, 0, make([]common.Hash, pending), last}
                results.Levels[depth][task.Index/pow] = node
            }
            node.Pending--
            i := task.Index / (pow / self.branches) % self.branches
            if last {
                node.Last = true
            }
            copy(node.Children[i][:], hash)
            node.Size += size
            left := node.Pending
            if chunkC != nil {
                if swg != nil {
                    swg.Add(1)
                }

                chunkC <- &Chunk{Key: hash, SData: data, wg: swg}
                // TODO: consider selecting on self.quitC to avoid blocking forever on shutdown
            }
            if depth+1 < len(results.Levels) {
                delete(results.Levels[depth+1], task.Index/(pow/self.branches))
            }

            results.Lock.Unlock()
            // If there's more work to be done, leave for others
            if left > 0 {
                break
            }
            // We're the last ones in this batch, merge the children together
            depth--
            pow *= self.branches
        }
        pend.Done()
    }
}