diff options
author | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-10-12 23:58:51 +0800 |
---|---|---|
committer | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-10-17 03:28:59 +0800 |
commit | 6dc14788a238f3e0ec786c6c04d476a3b957e645 (patch) | |
tree | 8f3f5f91506bc4c7532543043add1eaea3fd28e7 | |
parent | 30f057aaf9891fb37f82d94c24b8aa35d388e07b (diff) | |
download | go-tangerine-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.gz go-tangerine-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.zst go-tangerine-6dc14788a238f3e0ec786c6c04d476a3b957e645.zip |
core, eth/filters, miner, xeth: Optimised log filtering
Log filtering is now using a MIPmap like approach where addresses of
logs are added to a mapped bloom bin. The current levels for the MIP are
in ranges of 1.000.000, 500.000, 100.000, 50.000, 1.000. Logs are
therefor filtered in batches of 1.000.
-rw-r--r-- | core/blockchain.go | 27 | ||||
-rw-r--r-- | core/chain_makers.go | 7 | ||||
-rw-r--r-- | core/chain_util.go | 44 | ||||
-rw-r--r-- | core/chain_util_test.go | 112 | ||||
-rw-r--r-- | core/transaction_util.go | 32 | ||||
-rw-r--r-- | core/types/bloom9.go | 41 | ||||
-rw-r--r-- | core/types/bloom9_test.go | 34 | ||||
-rw-r--r-- | core/types/common.go | 35 | ||||
-rw-r--r-- | eth/backend.go | 46 | ||||
-rw-r--r-- | eth/backend_test.go | 67 | ||||
-rw-r--r-- | eth/filters/filter.go | 116 | ||||
-rw-r--r-- | eth/filters/filter_test.go | 202 | ||||
-rw-r--r-- | miner/worker.go | 2 | ||||
-rw-r--r-- | xeth/xeth.go | 16 |
14 files changed, 647 insertions, 134 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 6c555e9ee..5cb800f1d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -654,10 +654,17 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { events = append(events, ChainEvent{block, block.Hash(), logs}) // This puts transactions in a extra db for rpc - PutTransactions(self.chainDb, block, block.Transactions()) + if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil { + return i, err + } // store the receipts - PutReceipts(self.chainDb, receipts) - + if err := PutReceipts(self.chainDb, receipts); err != nil { + return i, err + } + // Write map map bloom filters + if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil { + return i, err + } case SideStatTy: if glog.V(logger.Detail) { glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) @@ -743,8 +750,18 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // insert the block in the canonical way, re-writing history self.insert(block) // write canonical receipts and transactions - PutTransactions(self.chainDb, block, block.Transactions()) - PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash())) + if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil { + return err + } + receipts := GetBlockReceipts(self.chainDb, block.Hash()) + // write receipts + if err := PutReceipts(self.chainDb, receipts); err != nil { + return err + } + // Write map map bloom filters + if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil { + return err + } addedTxs = append(addedTxs, block.Transactions()...) } diff --git a/core/chain_makers.go b/core/chain_makers.go index c2871a097..5f2cfeb63 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -105,7 +105,12 @@ func (b *BlockGen) AddTx(tx *types.Transaction) { b.receipts = append(b.receipts, receipt) } -func (b *BlockGen) AddReceipt(receipt *types.Receipt) { +// AddUncheckedReceipts forcefully adds a receipts to the block without a +// backing transaction. +// +// AddUncheckedReceipts will cause consensus failures when used during real +// chain processing. This is best used in conjuction with raw block insertion. +func (b *BlockGen) AddUncheckedReceipt(receipt *types.Receipt) { b.receipts = append(b.receipts, receipt) } diff --git a/core/chain_util.go b/core/chain_util.go index 33d94cebd..42b6a5be2 100644 --- a/core/chain_util.go +++ b/core/chain_util.go @@ -18,6 +18,8 @@ package core import ( "bytes" + "encoding/binary" + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -42,6 +44,9 @@ var ( ExpDiffPeriod = big.NewInt(100000) blockHashPre = []byte("block-hash-") // [deprecated by eth/63] + + mipmapPre = []byte("mipmap-log-bloom-") + MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000} ) // CalcDifficulty is the difficulty adjustment algorithm. It returns @@ -346,3 +351,42 @@ func GetBlockByHashOld(db ethdb.Database, hash common.Hash) *types.Block { } return (*types.Block)(&block) } + +// returns a formatted MIP mapped key by adding prefix, canonical number and level +// +// ex. fn(98, 1000) = (prefix || 1000 || 0) +func mipmapKey(num, level uint64) []byte { + lkey := make([]byte, 8) + binary.BigEndian.PutUint64(lkey, level) + key := new(big.Int).SetUint64(num / level * level) + + return append(mipmapPre, append(lkey, key.Bytes()...)...) +} + +// WriteMapmapBloom writes each address included in the receipts' logs to the +// MIP bloom bin. +func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts) error { + batch := db.NewBatch() + for _, level := range MIPMapLevels { + key := mipmapKey(number, level) + bloomDat, _ := db.Get(key) + bloom := types.BytesToBloom(bloomDat) + for _, receipt := range receipts { + for _, log := range receipt.Logs() { + bloom.Add(log.Address.Big()) + } + } + batch.Put(key, bloom.Bytes()) + } + if err := batch.Write(); err != nil { + return fmt.Errorf("mipmap write fail for: %d: %v", number, err) + } + return nil +} + +// GetMipmapBloom returns a bloom filter using the number and level as input +// parameters. For available levels see MIPMapLevels. +func GetMipmapBloom(db ethdb.Database, number, level uint64) types.Bloom { + bloomDat, _ := db.Get(mipmapKey(number, level)) + return types.BytesToBloom(bloomDat) +} diff --git a/core/chain_util_test.go b/core/chain_util_test.go index 3f0446715..62b73a064 100644 --- a/core/chain_util_test.go +++ b/core/chain_util_test.go @@ -18,12 +18,15 @@ package core import ( "encoding/json" + "io/ioutil" "math/big" "os" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -318,3 +321,112 @@ func TestHeadStorage(t *testing.T) { t.Fatalf("Head block hash mismatch: have %v, want %v", entry, blockFull.Hash()) } } + +func TestMipmapBloom(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + + receipt1 := new(types.Receipt) + receipt1.SetLogs(vm.Logs{ + &vm.Log{Address: common.BytesToAddress([]byte("test"))}, + &vm.Log{Address: common.BytesToAddress([]byte("address"))}, + }) + receipt2 := new(types.Receipt) + receipt2.SetLogs(vm.Logs{ + &vm.Log{Address: common.BytesToAddress([]byte("test"))}, + &vm.Log{Address: common.BytesToAddress([]byte("address1"))}, + }) + + WriteMipmapBloom(db, 1, types.Receipts{receipt1}) + WriteMipmapBloom(db, 2, types.Receipts{receipt2}) + + for _, level := range MIPMapLevels { + bloom := GetMipmapBloom(db, 2, level) + if !bloom.Test(new(big.Int).SetBytes([]byte("address1"))) { + t.Error("expected test to be included on level:", level) + } + } + + // reset + db, _ = ethdb.NewMemDatabase() + receipt := new(types.Receipt) + receipt.SetLogs(vm.Logs{ + &vm.Log{Address: common.BytesToAddress([]byte("test"))}, + }) + WriteMipmapBloom(db, 999, types.Receipts{receipt1}) + + receipt = new(types.Receipt) + receipt.SetLogs(vm.Logs{ + &vm.Log{Address: common.BytesToAddress([]byte("test 1"))}, + }) + WriteMipmapBloom(db, 1000, types.Receipts{receipt}) + + bloom := GetMipmapBloom(db, 1000, 1000) + if bloom.TestBytes([]byte("test")) { + t.Error("test should not have been included") + } +} + +func TestMipmapChain(t *testing.T) { + dir, err := ioutil.TempDir("", "mipmap") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + var ( + db, _ = ethdb.NewLDBDatabase(dir, 16) + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + + hash1 = common.BytesToHash([]byte("topic1")) + ) + defer db.Close() + + genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr, big.NewInt(1000000)}) + chain := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) { + var receipts types.Receipts + switch i { + case 1: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash1}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 1000: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{&vm.Log{Address: addr2}}) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + + } + + // store the receipts + err := PutReceipts(db, receipts) + if err != nil { + t.Fatal(err) + } + WriteMipmapBloom(db, uint64(i+1), receipts) + }) + for _, block := range chain { + WriteBlock(db, block) + if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := WriteHeadBlockHash(db, block.Hash()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := PutBlockReceipts(db, block, block.Receipts()); err != nil { + t.Fatal("error writing block receipts:", err) + } + } + + bloom := GetMipmapBloom(db, 0, 1000) + if bloom.TestBytes(addr2[:]) { + t.Error("address was included in bloom and should not have") + } +} diff --git a/core/transaction_util.go b/core/transaction_util.go index ebe095abb..d55ed14da 100644 --- a/core/transaction_util.go +++ b/core/transaction_util.go @@ -17,6 +17,8 @@ package core import ( + "fmt" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -32,22 +34,16 @@ var ( ) // PutTransactions stores the transactions in the given database -func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) { - batch := new(leveldb.Batch) - _, batchWrite := db.(*ethdb.LDBDatabase) +func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) error { + batch := db.NewBatch() for i, tx := range block.Transactions() { rlpEnc, err := rlp.EncodeToBytes(tx) if err != nil { - glog.V(logger.Debug).Infoln("Failed encoding tx", err) - return + return fmt.Errorf("failed encoding tx: %v", err) } - if batchWrite { - batch.Put(tx.Hash().Bytes(), rlpEnc) - } else { - db.Put(tx.Hash().Bytes(), rlpEnc) - } + batch.Put(tx.Hash().Bytes(), rlpEnc) var txExtra struct { BlockHash common.Hash @@ -59,22 +55,16 @@ func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactio txExtra.Index = uint64(i) rlpMeta, err := rlp.EncodeToBytes(txExtra) if err != nil { - glog.V(logger.Debug).Infoln("Failed encoding tx meta data", err) - return + return fmt.Errorf("failed encoding tx meta data: %v", err) } - if batchWrite { - batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta) - } else { - db.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta) - } + batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta) } - if db, ok := db.(*ethdb.LDBDatabase); ok { - if err := db.LDB().Write(batch, nil); err != nil { - glog.V(logger.Error).Infoln("db write err:", err) - } + if err := batch.Write(); err != nil { + return fmt.Errorf("failed writing tx to db: %v", err) } + return nil } func DeleteTransaction(db ethdb.Database, txHash common.Hash) { diff --git a/core/types/bloom9.go b/core/types/bloom9.go index f87ae58e6..97db20ee9 100644 --- a/core/types/bloom9.go +++ b/core/types/bloom9.go @@ -17,6 +17,7 @@ package types import ( + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -28,6 +29,46 @@ type bytesBacked interface { Bytes() []byte } +const bloomLength = 256 + +type Bloom [bloomLength]byte + +func BytesToBloom(b []byte) Bloom { + var bloom Bloom + bloom.SetBytes(b) + return bloom +} + +func (b *Bloom) SetBytes(d []byte) { + if len(b) < len(d) { + panic(fmt.Sprintf("bloom bytes too big %d %d", len(b), len(d))) + } + + copy(b[bloomLength-len(d):], d) +} + +func (b *Bloom) Add(d *big.Int) { + bin := new(big.Int).SetBytes(b[:]) + bin.Or(bin, bloom9(d.Bytes())) + b.SetBytes(bin.Bytes()) +} + +func (b Bloom) Big() *big.Int { + return common.Bytes2Big(b[:]) +} + +func (b Bloom) Bytes() []byte { + return b[:] +} + +func (b Bloom) Test(test *big.Int) bool { + return BloomLookup(b, test) +} + +func (b Bloom) TestBytes(test []byte) bool { + return b.Test(common.BytesToBig(test)) +} + func CreateBloom(receipts Receipts) Bloom { bin := new(big.Int) for _, receipt := range receipts { diff --git a/core/types/bloom9_test.go b/core/types/bloom9_test.go index f020670b1..5744bec6c 100644 --- a/core/types/bloom9_test.go +++ b/core/types/bloom9_test.go @@ -16,6 +16,40 @@ package types +import ( + "math/big" + "testing" +) + +func TestBloom(t *testing.T) { + positive := []string{ + "testtest", + "test", + "hallo", + "other", + } + negative := []string{ + "tes", + "lo", + } + + var bloom Bloom + for _, data := range positive { + bloom.Add(new(big.Int).SetBytes([]byte(data))) + } + + for _, data := range positive { + if !bloom.Test(new(big.Int).SetBytes([]byte(data))) { + t.Error("expected", data, "to test true") + } + } + for _, data := range negative { + if bloom.Test(new(big.Int).SetBytes([]byte(data))) { + t.Error("did not expect", data, "to test true") + } + } +} + /* import ( "testing" diff --git a/core/types/common.go b/core/types/common.go index dc428c00c..29019a1b4 100644 --- a/core/types/common.go +++ b/core/types/common.go @@ -16,41 +16,8 @@ package types -import ( - "math/big" - - "fmt" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/vm" -) +import "github.com/ethereum/go-ethereum/core/vm" type BlockProcessor interface { Process(*Block) (vm.Logs, Receipts, error) } - -const bloomLength = 256 - -type Bloom [bloomLength]byte - -func BytesToBloom(b []byte) Bloom { - var bloom Bloom - bloom.SetBytes(b) - return bloom -} - -func (b *Bloom) SetBytes(d []byte) { - if len(b) < len(d) { - panic(fmt.Sprintf("bloom bytes too big %d %d", len(b), len(d))) - } - - copy(b[bloomLength-len(d):], d) -} - -func (b Bloom) Big() *big.Int { - return common.Bytes2Big(b[:]) -} - -func (b Bloom) Bytes() []byte { - return b[:] -} diff --git a/eth/backend.go b/eth/backend.go index 83eefca5b..f703b4ac0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -47,6 +47,7 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/nat" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/whisper" ) @@ -288,6 +289,9 @@ func New(config *Config) (*Ethereum, error) { if err := upgradeChainDatabase(chainDb); err != nil { return nil, err } + if err := addMipmapBloomBins(chainDb); err != nil { + return nil, err + } dappDb, err := newdb(filepath.Join(config.DataDir, "dapp")) if err != nil { @@ -769,3 +773,45 @@ func upgradeChainDatabase(db ethdb.Database) error { } return nil } + +func addMipmapBloomBins(db ethdb.Database) (err error) { + const mipmapVersion uint = 2 + + // check if the version is set. We ignore data for now since there's + // only one version so we can easily ignore it for now + var data []byte + data, _ = db.Get([]byte("setting-mipmap-version")) + if len(data) > 0 { + var version uint + if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion { + return nil + } + } + + defer func() { + if err == nil { + var val []byte + val, err = rlp.EncodeToBytes(mipmapVersion) + if err == nil { + err = db.Put([]byte("setting-mipmap-version"), val) + } + return + } + }() + latestBlock := core.GetBlock(db, core.GetHeadBlockHash(db)) + if latestBlock == nil { // clean database + return + } + + tstart := time.Now() + glog.V(logger.Info).Infoln("upgrading db log bloom bins") + for i := uint64(0); i <= latestBlock.NumberU64(); i++ { + hash := core.GetCanonicalHash(db, i) + if (hash == common.Hash{}) { + return fmt.Errorf("chain db corrupted. Could not find block %d.", i) + } + core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash)) + } + glog.V(logger.Info).Infoln("upgrade completed in", time.Since(tstart)) + return nil +} diff --git a/eth/backend_test.go b/eth/backend_test.go new file mode 100644 index 000000000..220426c17 --- /dev/null +++ b/eth/backend_test.go @@ -0,0 +1,67 @@ +package eth + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/ethdb" +) + +func TestMipmapUpgrade(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + addr := common.BytesToAddress([]byte("jeff")) + genesis := core.WriteGenesisBlockForTesting(db) + + chain := core.GenerateChain(genesis, db, 10, func(i int, gen *core.BlockGen) { + var receipts types.Receipts + switch i { + case 1: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{&vm.Log{Address: addr}}) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 2: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{&vm.Log{Address: addr}}) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + } + + // store the receipts + err := core.PutReceipts(db, receipts) + if err != nil { + t.Fatal(err) + } + }) + for _, block := range chain { + core.WriteBlock(db, block) + if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := core.PutBlockReceipts(db, block, block.Receipts()); err != nil { + t.Fatal("error writing block receipts:", err) + } + } + + err := addMipmapBloomBins(db) + if err != nil { + t.Fatal(err) + } + + bloom := core.GetMipmapBloom(db, 1, core.MIPMapLevels[0]) + if (bloom == types.Bloom{}) { + t.Error("got empty bloom filter") + } + + data, _ := db.Get([]byte("setting-mipmap-version")) + if len(data) == 0 { + t.Error("setting-mipmap-version not written to database") + } +} diff --git a/eth/filters/filter.go b/eth/filters/filter.go index d3d430775..2e81ea177 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -17,6 +17,8 @@ package filters import ( + "math" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -30,13 +32,10 @@ type AccountChange struct { // Filtering interface type Filter struct { - db ethdb.Database - earliest int64 - latest int64 - skip int - address []common.Address - max int - topics [][]common.Hash + db ethdb.Database + begin, end int64 + addresses []common.Address + topics [][]common.Hash BlockCallback func(*types.Block, vm.Logs) TransactionCallback func(*types.Transaction) @@ -52,59 +51,82 @@ func New(db ethdb.Database) *Filter { // Set the earliest and latest block for filtering. // -1 = latest block (i.e., the current block) // hash = particular hash from-to -func (self *Filter) SetEarliestBlock(earliest int64) { - self.earliest = earliest +func (self *Filter) SetBeginBlock(begin int64) { + self.begin = begin } -func (self *Filter) SetLatestBlock(latest int64) { - self.latest = latest +func (self *Filter) SetEndBlock(end int64) { + self.end = end } -func (self *Filter) SetAddress(addr []common.Address) { - self.address = addr +func (self *Filter) SetAddresses(addr []common.Address) { + self.addresses = addr } func (self *Filter) SetTopics(topics [][]common.Hash) { self.topics = topics } -func (self *Filter) SetMax(max int) { - self.max = max -} - -func (self *Filter) SetSkip(skip int) { - self.skip = skip -} - // Run filters logs with the current parameters set func (self *Filter) Find() vm.Logs { - earliestBlock := core.GetBlock(self.db, core.GetHeadBlockHash(self.db)) - var earliestBlockNo uint64 = uint64(self.earliest) - if self.earliest == -1 { - earliestBlockNo = earliestBlock.NumberU64() + latestBlock := core.GetBlock(self.db, core.GetHeadBlockHash(self.db)) + var beginBlockNo uint64 = uint64(self.begin) + if self.begin == -1 { + beginBlockNo = latestBlock.NumberU64() } - var latestBlockNo uint64 = uint64(self.latest) - if self.latest == -1 { - latestBlockNo = earliestBlock.NumberU64() + var endBlockNo uint64 = uint64(self.end) + if self.end == -1 { + endBlockNo = latestBlock.NumberU64() } - var ( - logs vm.Logs - block *types.Block - ) - hash := core.GetCanonicalHash(self.db, latestBlockNo) - if hash != (common.Hash{}) { - block = core.GetBlock(self.db, hash) + // if no addresses are present we can't make use of fast search which + // uses the mipmap bloom filters to check for fast inclusion and uses + // higher range probability in order to ensure at least a false positive + if len(self.addresses) == 0 { + return self.getLogs(beginBlockNo, endBlockNo) } + return self.mipFind(beginBlockNo, endBlockNo, 0) +} -done: - for i := 0; block != nil; i++ { - // Quit on latest - switch { - case block.NumberU64() == 0: - break done - case block.NumberU64() < earliestBlockNo: - break done +func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) { + level := core.MIPMapLevels[depth] + // normalise numerator so we can work in level specific batches and + // work with the proper range checks + for num := start / level * level; num <= end; num += level { + // find addresses in bloom filters + bloom := core.GetMipmapBloom(self.db, num, level) + for _, addr := range self.addresses { + if bloom.TestBytes(addr[:]) { + // range check normalised values and make sure that + // we're resolving the correct range instead of the + // normalised values. + start := uint64(math.Max(float64(num), float64(start))) + end := uint64(math.Min(float64(num+level-1), float64(end))) + if depth+1 == len(core.MIPMapLevels) { + logs = append(logs, self.getLogs(start, end)...) + } else { + logs = append(logs, self.mipFind(start, end, depth+1)...) + } + // break so we don't check the same range for each + // possible address. Checks on multiple addresses + // are handled further down the stack. + break + } + } + } + + return logs +} + +func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) { + var block *types.Block + + for i := start; i <= end; i++ { + hash := core.GetCanonicalHash(self.db, i) + if hash != (common.Hash{}) { + block = core.GetBlock(self.db, hash) + } else { // block not found + return logs } // Use bloom filtering to see if this block is interesting given the @@ -120,8 +142,6 @@ done: } logs = append(logs, self.FilterLogs(unfiltered)...) } - - block = core.GetBlock(self.db, block.ParentHash()) } return logs @@ -143,7 +163,7 @@ func (self *Filter) FilterLogs(logs vm.Logs) vm.Logs { // Filter the logs for interesting stuff Logs: for _, log := range logs { - if len(self.address) > 0 && !includes(self.address, log.Address) { + if len(self.addresses) > 0 && !includes(self.addresses, log.Address) { continue } @@ -179,9 +199,9 @@ Logs: } func (self *Filter) bloomFilter(block *types.Block) bool { - if len(self.address) > 0 { + if len(self.addresses) > 0 { var included bool - for _, addr := range self.address { + for _, addr := range self.addresses { if types.BloomLookup(block.Bloom(), addr) { included = true break diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 950a84579..9e7538fac 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -1,6 +1,7 @@ package filters import ( + "io/ioutil" "math/big" "os" "testing" @@ -23,40 +24,42 @@ func makeReceipt(addr common.Address) *types.Receipt { } func BenchmarkMipmaps(b *testing.B) { - const dbname = "/tmp/mipmap" + dir, err := ioutil.TempDir("", "mipmap") + if err != nil { + b.Fatal(err) + } + defer os.RemoveAll(dir) + var ( - db, _ = ethdb.NewLDBDatabase(dbname, 16) + db, _ = ethdb.NewLDBDatabase(dir, 16) key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) addr2 = common.BytesToAddress([]byte("jeff")) addr3 = common.BytesToAddress([]byte("ethereum")) addr4 = common.BytesToAddress([]byte("random addresses please")) ) - defer func() { - db.Close() - os.Remove(dbname) - }() + defer db.Close() genesis := core.WriteGenesisBlockForTesting(db, core.GenesisAccount{addr1, big.NewInt(1000000)}) - chain := core.GenerateChain(genesis, db, 100000, func(i int, gen *core.BlockGen) { + chain := core.GenerateChain(genesis, db, 100010, func(i int, gen *core.BlockGen) { var receipts types.Receipts switch i { case 2403: receipt := makeReceipt(addr1) receipts = types.Receipts{receipt} - gen.AddReceipt(receipt) - case 10340: + gen.AddUncheckedReceipt(receipt) + case 1034: receipt := makeReceipt(addr2) receipts = types.Receipts{receipt} - gen.AddReceipt(receipt) + gen.AddUncheckedReceipt(receipt) case 34: receipt := makeReceipt(addr3) receipts = types.Receipts{receipt} - gen.AddReceipt(receipt) + gen.AddUncheckedReceipt(receipt) case 99999: receipt := makeReceipt(addr4) receipts = types.Receipts{receipt} - gen.AddReceipt(receipt) + gen.AddUncheckedReceipt(receipt) } @@ -65,6 +68,7 @@ func BenchmarkMipmaps(b *testing.B) { if err != nil { b.Fatal(err) } + core.WriteMipmapBloom(db, uint64(i+1), receipts) }) for _, block := range chain { core.WriteBlock(db, block) @@ -82,9 +86,9 @@ func BenchmarkMipmaps(b *testing.B) { b.ResetTimer() filter := New(db) - filter.SetAddress([]common.Address{addr1, addr2, addr3, addr4}) - filter.SetEarliestBlock(0) - filter.SetLatestBlock(-1) + filter.SetAddresses([]common.Address{addr1, addr2, addr3, addr4}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) for i := 0; i < b.N; i++ { logs := filter.Find() @@ -93,3 +97,171 @@ func BenchmarkMipmaps(b *testing.B) { } } } + +func TestFilters(t *testing.T) { + dir, err := ioutil.TempDir("", "mipmap") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + var ( + db, _ = ethdb.NewLDBDatabase(dir, 16) + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) + + hash1 = common.BytesToHash([]byte("topic1")) + hash2 = common.BytesToHash([]byte("topic2")) + hash3 = common.BytesToHash([]byte("topic3")) + hash4 = common.BytesToHash([]byte("topic4")) + ) + defer db.Close() + + genesis := core.WriteGenesisBlockForTesting(db, core.GenesisAccount{addr, big.NewInt(1000000)}) + chain := core.GenerateChain(genesis, db, 1000, func(i int, gen *core.BlockGen) { + var receipts types.Receipts + switch i { + case 1: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash1}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 2: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash2}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 998: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash3}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 999: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash4}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + } + + // store the receipts + err := core.PutReceipts(db, receipts) + if err != nil { + t.Fatal(err) + } + // i is used as block number for the writes but since the i + // starts at 0 and block 0 (genesis) is already present increment + // by one + core.WriteMipmapBloom(db, uint64(i+1), receipts) + }) + for _, block := range chain { + core.WriteBlock(db, block) + if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := core.PutBlockReceipts(db, block, block.Receipts()); err != nil { + t.Fatal("error writing block receipts:", err) + } + } + + filter := New(db) + filter.SetAddresses([]common.Address{addr}) + filter.SetTopics([][]common.Hash{[]common.Hash{hash1, hash2, hash3, hash4}}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) + + logs := filter.Find() + if len(logs) != 4 { + t.Error("expected 4 log, got", len(logs)) + } + + filter = New(db) + filter.SetAddresses([]common.Address{addr}) + filter.SetTopics([][]common.Hash{[]common.Hash{hash3}}) + filter.SetBeginBlock(900) + filter.SetEndBlock(999) + logs = filter.Find() + if len(logs) != 1 { + t.Error("expected 1 log, got", len(logs)) + } + if len(logs) > 0 && logs[0].Topics[0] != hash3 { + t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) + } + + filter = New(db) + filter.SetAddresses([]common.Address{addr}) + filter.SetTopics([][]common.Hash{[]common.Hash{hash3}}) + filter.SetBeginBlock(990) + filter.SetEndBlock(-1) + logs = filter.Find() + if len(logs) != 1 { + t.Error("expected 1 log, got", len(logs)) + } + if len(logs) > 0 && logs[0].Topics[0] != hash3 { + t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) + } + + filter = New(db) + filter.SetTopics([][]common.Hash{[]common.Hash{hash1, hash2}}) + filter.SetBeginBlock(1) + filter.SetEndBlock(10) + + logs = filter.Find() + if len(logs) != 2 { + t.Error("expected 2 log, got", len(logs)) + } + + failHash := common.BytesToHash([]byte("fail")) + filter = New(db) + filter.SetTopics([][]common.Hash{[]common.Hash{failHash}}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) + + logs = filter.Find() + if len(logs) != 0 { + t.Error("expected 0 log, got", len(logs)) + } + + failAddr := common.BytesToAddress([]byte("failmenow")) + filter = New(db) + filter.SetAddresses([]common.Address{failAddr}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) + + logs = filter.Find() + if len(logs) != 0 { + t.Error("expected 0 log, got", len(logs)) + } + + filter = New(db) + filter.SetTopics([][]common.Hash{[]common.Hash{failHash}, []common.Hash{hash1}}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) + + logs = filter.Find() + if len(logs) != 0 { + t.Error("expected 0 log, got", len(logs)) + } +} diff --git a/miner/worker.go b/miner/worker.go index 43f6f9909..57f668d49 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -301,6 +301,8 @@ func (self *worker) wait() { core.PutTransactions(self.chainDb, block, block.Transactions()) // store the receipts core.PutReceipts(self.chainDb, work.receipts) + // Write map map bloom filters + core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts) } // broadcast before waiting for validation diff --git a/xeth/xeth.go b/xeth/xeth.go index 13e171270..ae03471d5 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -543,11 +543,9 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address [] id := self.filterManager.Add(filter) self.logQueue[id] = &logQueue{timeout: time.Now()} - filter.SetEarliestBlock(earliest) - filter.SetLatestBlock(latest) - filter.SetSkip(skip) - filter.SetMax(max) - filter.SetAddress(cAddress(address)) + filter.SetBeginBlock(earliest) + filter.SetEndBlock(latest) + filter.SetAddresses(cAddress(address)) filter.SetTopics(cTopics(topics)) filter.LogsCallback = func(logs vm.Logs) { self.logMu.Lock() @@ -652,11 +650,9 @@ func (self *XEth) Logs(id int) vm.Logs { func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []string, topics [][]string) vm.Logs { filter := filters.New(self.backend.ChainDb()) - filter.SetEarliestBlock(earliest) - filter.SetLatestBlock(latest) - filter.SetSkip(skip) - filter.SetMax(max) - filter.SetAddress(cAddress(address)) + filter.SetBeginBlock(earliest) + filter.SetEndBlock(latest) + filter.SetAddresses(cAddress(address)) filter.SetTopics(cTopics(topics)) return filter.Find() |