diff options
author | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-10-12 23:58:51 +0800 |
---|---|---|
committer | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-10-17 03:28:59 +0800 |
commit | 6dc14788a238f3e0ec786c6c04d476a3b957e645 (patch) | |
tree | 8f3f5f91506bc4c7532543043add1eaea3fd28e7 /eth | |
parent | 30f057aaf9891fb37f82d94c24b8aa35d388e07b (diff) | |
download | dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.gz dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.zst dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.zip |
core, eth/filters, miner, xeth: Optimised log filtering
Log filtering is now using a MIPmap like approach where addresses of
logs are added to a mapped bloom bin. The current levels for the MIP are
in ranges of 1.000.000, 500.000, 100.000, 50.000, 1.000. Logs are
therefor filtered in batches of 1.000.
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 46 | ||||
-rw-r--r-- | eth/backend_test.go | 67 | ||||
-rw-r--r-- | eth/filters/filter.go | 116 | ||||
-rw-r--r-- | eth/filters/filter_test.go | 202 |
4 files changed, 368 insertions, 63 deletions
diff --git a/eth/backend.go b/eth/backend.go index 83eefca5b..f703b4ac0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -47,6 +47,7 @@ import ( "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/nat" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/whisper" ) @@ -288,6 +289,9 @@ func New(config *Config) (*Ethereum, error) { if err := upgradeChainDatabase(chainDb); err != nil { return nil, err } + if err := addMipmapBloomBins(chainDb); err != nil { + return nil, err + } dappDb, err := newdb(filepath.Join(config.DataDir, "dapp")) if err != nil { @@ -769,3 +773,45 @@ func upgradeChainDatabase(db ethdb.Database) error { } return nil } + +func addMipmapBloomBins(db ethdb.Database) (err error) { + const mipmapVersion uint = 2 + + // check if the version is set. We ignore data for now since there's + // only one version so we can easily ignore it for now + var data []byte + data, _ = db.Get([]byte("setting-mipmap-version")) + if len(data) > 0 { + var version uint + if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion { + return nil + } + } + + defer func() { + if err == nil { + var val []byte + val, err = rlp.EncodeToBytes(mipmapVersion) + if err == nil { + err = db.Put([]byte("setting-mipmap-version"), val) + } + return + } + }() + latestBlock := core.GetBlock(db, core.GetHeadBlockHash(db)) + if latestBlock == nil { // clean database + return + } + + tstart := time.Now() + glog.V(logger.Info).Infoln("upgrading db log bloom bins") + for i := uint64(0); i <= latestBlock.NumberU64(); i++ { + hash := core.GetCanonicalHash(db, i) + if (hash == common.Hash{}) { + return fmt.Errorf("chain db corrupted. Could not find block %d.", i) + } + core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash)) + } + glog.V(logger.Info).Infoln("upgrade completed in", time.Since(tstart)) + return nil +} diff --git a/eth/backend_test.go b/eth/backend_test.go new file mode 100644 index 000000000..220426c17 --- /dev/null +++ b/eth/backend_test.go @@ -0,0 +1,67 @@ +package eth + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/ethdb" +) + +func TestMipmapUpgrade(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + addr := common.BytesToAddress([]byte("jeff")) + genesis := core.WriteGenesisBlockForTesting(db) + + chain := core.GenerateChain(genesis, db, 10, func(i int, gen *core.BlockGen) { + var receipts types.Receipts + switch i { + case 1: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{&vm.Log{Address: addr}}) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 2: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{&vm.Log{Address: addr}}) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + } + + // store the receipts + err := core.PutReceipts(db, receipts) + if err != nil { + t.Fatal(err) + } + }) + for _, block := range chain { + core.WriteBlock(db, block) + if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := core.PutBlockReceipts(db, block, block.Receipts()); err != nil { + t.Fatal("error writing block receipts:", err) + } + } + + err := addMipmapBloomBins(db) + if err != nil { + t.Fatal(err) + } + + bloom := core.GetMipmapBloom(db, 1, core.MIPMapLevels[0]) + if (bloom == types.Bloom{}) { + t.Error("got empty bloom filter") + } + + data, _ := db.Get([]byte("setting-mipmap-version")) + if len(data) == 0 { + t.Error("setting-mipmap-version not written to database") + } +} diff --git a/eth/filters/filter.go b/eth/filters/filter.go index d3d430775..2e81ea177 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -17,6 +17,8 @@ package filters import ( + "math" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -30,13 +32,10 @@ type AccountChange struct { // Filtering interface type Filter struct { - db ethdb.Database - earliest int64 - latest int64 - skip int - address []common.Address - max int - topics [][]common.Hash + db ethdb.Database + begin, end int64 + addresses []common.Address + topics [][]common.Hash BlockCallback func(*types.Block, vm.Logs) TransactionCallback func(*types.Transaction) @@ -52,59 +51,82 @@ func New(db ethdb.Database) *Filter { // Set the earliest and latest block for filtering. // -1 = latest block (i.e., the current block) // hash = particular hash from-to -func (self *Filter) SetEarliestBlock(earliest int64) { - self.earliest = earliest +func (self *Filter) SetBeginBlock(begin int64) { + self.begin = begin } -func (self *Filter) SetLatestBlock(latest int64) { - self.latest = latest +func (self *Filter) SetEndBlock(end int64) { + self.end = end } -func (self *Filter) SetAddress(addr []common.Address) { - self.address = addr +func (self *Filter) SetAddresses(addr []common.Address) { + self.addresses = addr } func (self *Filter) SetTopics(topics [][]common.Hash) { self.topics = topics } -func (self *Filter) SetMax(max int) { - self.max = max -} - -func (self *Filter) SetSkip(skip int) { - self.skip = skip -} - // Run filters logs with the current parameters set func (self *Filter) Find() vm.Logs { - earliestBlock := core.GetBlock(self.db, core.GetHeadBlockHash(self.db)) - var earliestBlockNo uint64 = uint64(self.earliest) - if self.earliest == -1 { - earliestBlockNo = earliestBlock.NumberU64() + latestBlock := core.GetBlock(self.db, core.GetHeadBlockHash(self.db)) + var beginBlockNo uint64 = uint64(self.begin) + if self.begin == -1 { + beginBlockNo = latestBlock.NumberU64() } - var latestBlockNo uint64 = uint64(self.latest) - if self.latest == -1 { - latestBlockNo = earliestBlock.NumberU64() + var endBlockNo uint64 = uint64(self.end) + if self.end == -1 { + endBlockNo = latestBlock.NumberU64() } - var ( - logs vm.Logs - block *types.Block - ) - hash := core.GetCanonicalHash(self.db, latestBlockNo) - if hash != (common.Hash{}) { - block = core.GetBlock(self.db, hash) + // if no addresses are present we can't make use of fast search which + // uses the mipmap bloom filters to check for fast inclusion and uses + // higher range probability in order to ensure at least a false positive + if len(self.addresses) == 0 { + return self.getLogs(beginBlockNo, endBlockNo) } + return self.mipFind(beginBlockNo, endBlockNo, 0) +} -done: - for i := 0; block != nil; i++ { - // Quit on latest - switch { - case block.NumberU64() == 0: - break done - case block.NumberU64() < earliestBlockNo: - break done +func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) { + level := core.MIPMapLevels[depth] + // normalise numerator so we can work in level specific batches and + // work with the proper range checks + for num := start / level * level; num <= end; num += level { + // find addresses in bloom filters + bloom := core.GetMipmapBloom(self.db, num, level) + for _, addr := range self.addresses { + if bloom.TestBytes(addr[:]) { + // range check normalised values and make sure that + // we're resolving the correct range instead of the + // normalised values. + start := uint64(math.Max(float64(num), float64(start))) + end := uint64(math.Min(float64(num+level-1), float64(end))) + if depth+1 == len(core.MIPMapLevels) { + logs = append(logs, self.getLogs(start, end)...) + } else { + logs = append(logs, self.mipFind(start, end, depth+1)...) + } + // break so we don't check the same range for each + // possible address. Checks on multiple addresses + // are handled further down the stack. + break + } + } + } + + return logs +} + +func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) { + var block *types.Block + + for i := start; i <= end; i++ { + hash := core.GetCanonicalHash(self.db, i) + if hash != (common.Hash{}) { + block = core.GetBlock(self.db, hash) + } else { // block not found + return logs } // Use bloom filtering to see if this block is interesting given the @@ -120,8 +142,6 @@ done: } logs = append(logs, self.FilterLogs(unfiltered)...) } - - block = core.GetBlock(self.db, block.ParentHash()) } return logs @@ -143,7 +163,7 @@ func (self *Filter) FilterLogs(logs vm.Logs) vm.Logs { // Filter the logs for interesting stuff Logs: for _, log := range logs { - if len(self.address) > 0 && !includes(self.address, log.Address) { + if len(self.addresses) > 0 && !includes(self.addresses, log.Address) { continue } @@ -179,9 +199,9 @@ Logs: } func (self *Filter) bloomFilter(block *types.Block) bool { - if len(self.address) > 0 { + if len(self.addresses) > 0 { var included bool - for _, addr := range self.address { + for _, addr := range self.addresses { if types.BloomLookup(block.Bloom(), addr) { included = true break diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 950a84579..9e7538fac 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -1,6 +1,7 @@ package filters import ( + "io/ioutil" "math/big" "os" "testing" @@ -23,40 +24,42 @@ func makeReceipt(addr common.Address) *types.Receipt { } func BenchmarkMipmaps(b *testing.B) { - const dbname = "/tmp/mipmap" + dir, err := ioutil.TempDir("", "mipmap") + if err != nil { + b.Fatal(err) + } + defer os.RemoveAll(dir) + var ( - db, _ = ethdb.NewLDBDatabase(dbname, 16) + db, _ = ethdb.NewLDBDatabase(dir, 16) key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) addr2 = common.BytesToAddress([]byte("jeff")) addr3 = common.BytesToAddress([]byte("ethereum")) addr4 = common.BytesToAddress([]byte("random addresses please")) ) - defer func() { - db.Close() - os.Remove(dbname) - }() + defer db.Close() genesis := core.WriteGenesisBlockForTesting(db, core.GenesisAccount{addr1, big.NewInt(1000000)}) - chain := core.GenerateChain(genesis, db, 100000, func(i int, gen *core.BlockGen) { + chain := core.GenerateChain(genesis, db, 100010, func(i int, gen *core.BlockGen) { var receipts types.Receipts switch i { case 2403: receipt := makeReceipt(addr1) receipts = types.Receipts{receipt} - gen.AddReceipt(receipt) - case 10340: + gen.AddUncheckedReceipt(receipt) + case 1034: receipt := makeReceipt(addr2) receipts = types.Receipts{receipt} - gen.AddReceipt(receipt) + gen.AddUncheckedReceipt(receipt) case 34: receipt := makeReceipt(addr3) receipts = types.Receipts{receipt} - gen.AddReceipt(receipt) + gen.AddUncheckedReceipt(receipt) case 99999: receipt := makeReceipt(addr4) receipts = types.Receipts{receipt} - gen.AddReceipt(receipt) + gen.AddUncheckedReceipt(receipt) } @@ -65,6 +68,7 @@ func BenchmarkMipmaps(b *testing.B) { if err != nil { b.Fatal(err) } + core.WriteMipmapBloom(db, uint64(i+1), receipts) }) for _, block := range chain { core.WriteBlock(db, block) @@ -82,9 +86,9 @@ func BenchmarkMipmaps(b *testing.B) { b.ResetTimer() filter := New(db) - filter.SetAddress([]common.Address{addr1, addr2, addr3, addr4}) - filter.SetEarliestBlock(0) - filter.SetLatestBlock(-1) + filter.SetAddresses([]common.Address{addr1, addr2, addr3, addr4}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) for i := 0; i < b.N; i++ { logs := filter.Find() @@ -93,3 +97,171 @@ func BenchmarkMipmaps(b *testing.B) { } } } + +func TestFilters(t *testing.T) { + dir, err := ioutil.TempDir("", "mipmap") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + var ( + db, _ = ethdb.NewLDBDatabase(dir, 16) + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) + + hash1 = common.BytesToHash([]byte("topic1")) + hash2 = common.BytesToHash([]byte("topic2")) + hash3 = common.BytesToHash([]byte("topic3")) + hash4 = common.BytesToHash([]byte("topic4")) + ) + defer db.Close() + + genesis := core.WriteGenesisBlockForTesting(db, core.GenesisAccount{addr, big.NewInt(1000000)}) + chain := core.GenerateChain(genesis, db, 1000, func(i int, gen *core.BlockGen) { + var receipts types.Receipts + switch i { + case 1: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash1}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 2: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash2}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 998: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash3}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 999: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash4}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + } + + // store the receipts + err := core.PutReceipts(db, receipts) + if err != nil { + t.Fatal(err) + } + // i is used as block number for the writes but since the i + // starts at 0 and block 0 (genesis) is already present increment + // by one + core.WriteMipmapBloom(db, uint64(i+1), receipts) + }) + for _, block := range chain { + core.WriteBlock(db, block) + if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := core.PutBlockReceipts(db, block, block.Receipts()); err != nil { + t.Fatal("error writing block receipts:", err) + } + } + + filter := New(db) + filter.SetAddresses([]common.Address{addr}) + filter.SetTopics([][]common.Hash{[]common.Hash{hash1, hash2, hash3, hash4}}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) + + logs := filter.Find() + if len(logs) != 4 { + t.Error("expected 4 log, got", len(logs)) + } + + filter = New(db) + filter.SetAddresses([]common.Address{addr}) + filter.SetTopics([][]common.Hash{[]common.Hash{hash3}}) + filter.SetBeginBlock(900) + filter.SetEndBlock(999) + logs = filter.Find() + if len(logs) != 1 { + t.Error("expected 1 log, got", len(logs)) + } + if len(logs) > 0 && logs[0].Topics[0] != hash3 { + t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) + } + + filter = New(db) + filter.SetAddresses([]common.Address{addr}) + filter.SetTopics([][]common.Hash{[]common.Hash{hash3}}) + filter.SetBeginBlock(990) + filter.SetEndBlock(-1) + logs = filter.Find() + if len(logs) != 1 { + t.Error("expected 1 log, got", len(logs)) + } + if len(logs) > 0 && logs[0].Topics[0] != hash3 { + t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) + } + + filter = New(db) + filter.SetTopics([][]common.Hash{[]common.Hash{hash1, hash2}}) + filter.SetBeginBlock(1) + filter.SetEndBlock(10) + + logs = filter.Find() + if len(logs) != 2 { + t.Error("expected 2 log, got", len(logs)) + } + + failHash := common.BytesToHash([]byte("fail")) + filter = New(db) + filter.SetTopics([][]common.Hash{[]common.Hash{failHash}}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) + + logs = filter.Find() + if len(logs) != 0 { + t.Error("expected 0 log, got", len(logs)) + } + + failAddr := common.BytesToAddress([]byte("failmenow")) + filter = New(db) + filter.SetAddresses([]common.Address{failAddr}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) + + logs = filter.Find() + if len(logs) != 0 { + t.Error("expected 0 log, got", len(logs)) + } + + filter = New(db) + filter.SetTopics([][]common.Hash{[]common.Hash{failHash}, []common.Hash{hash1}}) + filter.SetBeginBlock(0) + filter.SetEndBlock(-1) + + logs = filter.Find() + if len(logs) != 0 { + t.Error("expected 0 log, got", len(logs)) + } +} |