diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/api_backend.go | 16 | ||||
-rw-r--r-- | eth/backend.go | 2 | ||||
-rw-r--r-- | eth/filters/api.go | 12 | ||||
-rw-r--r-- | eth/filters/filter.go | 14 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 82 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 7 | ||||
-rw-r--r-- | eth/gasprice/lightprice.go | 160 |
7 files changed, 268 insertions, 25 deletions
diff --git a/eth/api_backend.go b/eth/api_backend.go index 42b84bf9b..639f186c1 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -44,17 +44,17 @@ func (b *EthApiBackend) SetHead(number uint64) { b.eth.blockchain.SetHead(number) } -func (b *EthApiBackend) HeaderByNumber(blockNr rpc.BlockNumber) *types.Header { +func (b *EthApiBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { // Pending block is only known by the miner if blockNr == rpc.PendingBlockNumber { block, _ := b.eth.miner.Pending() - return block.Header() + return block.Header(), nil } // Otherwise resolve and return the block if blockNr == rpc.LatestBlockNumber { - return b.eth.blockchain.CurrentBlock().Header() + return b.eth.blockchain.CurrentBlock().Header(), nil } - return b.eth.blockchain.GetHeaderByNumber(uint64(blockNr)) + return b.eth.blockchain.GetHeaderByNumber(uint64(blockNr)), nil } func (b *EthApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { @@ -70,16 +70,16 @@ func (b *EthApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumb return b.eth.blockchain.GetBlockByNumber(uint64(blockNr)), nil } -func (b *EthApiBackend) StateAndHeaderByNumber(blockNr rpc.BlockNumber) (ethapi.State, *types.Header, error) { +func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (ethapi.State, *types.Header, error) { // Pending state is only known by the miner if blockNr == rpc.PendingBlockNumber { block, state := b.eth.miner.Pending() return EthApiState{state}, block.Header(), nil } // Otherwise resolve the block number and return its state - header := b.HeaderByNumber(blockNr) - if header == nil { - return nil, nil, nil + header, err := b.HeaderByNumber(ctx, blockNr) + if header == nil || err != nil { + return nil, nil, err } stateDb, err := b.eth.BlockChain().StateAt(header.Root) return EthApiState{stateDb}, header, err diff --git a/eth/backend.go b/eth/backend.go index 24419d6d8..9c5e11a59 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -312,7 +312,7 @@ func (s *Ethereum) APIs() []rpc.API { }, { Namespace: "eth", Version: "1.0", - Service: filters.NewPublicFilterAPI(s.chainDb, s.eventMux), + Service: filters.NewPublicFilterAPI(s.ApiBackend, false), Public: true, }, { Namespace: "admin", diff --git a/eth/filters/api.go b/eth/filters/api.go index 3bc220348..fa4bef283 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -61,12 +61,14 @@ type PublicFilterAPI struct { } // NewPublicFilterAPI returns a new PublicFilterAPI instance. -func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI { +func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ - mux: mux, - chainDb: chainDb, - events: NewEventSystem(mux), - filters: make(map[rpc.ID]*filter), + backend: backend, + useMipMap: !lightMode, + mux: backend.EventMux(), + chainDb: backend.ChainDb(), + events: NewEventSystem(backend.EventMux(), backend, lightMode), + filters: make(map[rpc.ID]*filter), } go api.timeoutLoop() diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 4226620dc..d181d0892 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -207,11 +207,15 @@ Logs: return ret } -func (f *Filter) bloomFilter(block *types.Block) bool { - if len(f.addresses) > 0 { +func (f *Filter) bloomFilter(bloom types.Bloom) bool { + return bloomFilter(bloom, f.addresses, f.topics) +} + +func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool { + if len(addresses) > 0 { var included bool - for _, addr := range f.addresses { - if types.BloomLookup(block.Bloom(), addr) { + for _, addr := range addresses { + if types.BloomLookup(bloom, addr) { included = true break } @@ -222,7 +226,7 @@ func (f *Filter) bloomFilter(block *types.Block) bool { } } - for _, sub := range f.topics { + for _, sub := range topics { var included bool for _, topic := range sub { if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 04a55fd09..1e330b24f 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" ) // Type determines the kind of filter and is used to put the filter in to @@ -95,6 +96,9 @@ type subscription struct { type EventSystem struct { mux *event.TypeMux sub event.Subscription + backend Backend + lightMode bool + lastHead *types.Header install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification } @@ -105,9 +109,11 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(mux *event.TypeMux) *EventSystem { +func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ mux: mux, + backend: backend, + lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), } @@ -235,7 +241,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. -func broadcast(filters filterIndex, ev *event.Event) { +func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { if ev == nil { return } @@ -279,7 +285,77 @@ func broadcast(filters filterIndex, ev *event.Event) { f.headers <- e.Block.Header() } } + if es.lightMode && len(filters[LogsSubscription]) > 0 { + es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { + for _, f := range filters[LogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + } + }) + } + } +} + +func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { + oldh := es.lastHead + es.lastHead = newHeader + if oldh == nil { + return + } + newh := newHeader + // find common ancestor, create list of rolled back and new block hashes + var oldHeaders, newHeaders []*types.Header + for oldh.Hash() != newh.Hash() { + if oldh.GetNumberU64() >= newh.GetNumberU64() { + oldHeaders = append(oldHeaders, oldh) + oldh = core.GetHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) + } + if oldh.GetNumberU64() < newh.GetNumberU64() { + newHeaders = append(newHeaders, newh) + newh = core.GetHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) + if newh == nil { + // happens when CHT syncing, nothing to do + newh = oldh + } + } + } + // roll back old blocks + for _, h := range oldHeaders { + callBack(h, true) + } + // check new blocks (array is in reverse order) + for i := len(newHeaders) - 1; i >= 0; i-- { + callBack(newHeaders[i], false) + } +} + +// filter logs of a single header in light client mode +func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []Log { + //fmt.Println("lightFilterLogs", header.Number.Uint64(), remove) + if bloomFilter(header.Bloom, addresses, topics) { + //fmt.Println("bloom match") + // Get the logs of the block + ctx, _ := context.WithTimeout(context.Background(), time.Second*5) + receipts, err := es.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil + } + var unfiltered []Log + for _, receipt := range receipts { + rl := make([]Log, len(receipt.Logs)) + for i, l := range receipt.Logs { + rl[i] = Log{l, remove} + } + unfiltered = append(unfiltered, rl...) + } + logs := filterLogs(unfiltered, addresses, topics) + //fmt.Println("found", len(logs)) + return logs } + return nil } // eventLoop (un)installs filters and processes mux events. @@ -294,7 +370,7 @@ func (es *EventSystem) eventLoop() { if !active { // system stopped return } - broadcast(index, ev) + es.broadcast(index, ev) case f := <-es.install: if _, found := index[f.typ]; !found { index[f.typ] = make(map[rpc.ID]*subscription) diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 9e6fde1c6..1bd4d502d 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -32,9 +32,10 @@ import ( ) var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - api = NewPublicFilterAPI(db, mux) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) ) // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. diff --git a/eth/gasprice/lightprice.go b/eth/gasprice/lightprice.go new file mode 100644 index 000000000..ce075ff4e --- /dev/null +++ b/eth/gasprice/lightprice.go @@ -0,0 +1,160 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package gasprice + +import ( + "math/big" + "sort" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/internal/ethapi" + "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" +) + +const ( + LpoAvgCount = 5 + LpoMinCount = 3 + LpoMaxBlocks = 20 + LpoSelect = 50 + LpoDefaultPrice = 20000000000 +) + +// LightPriceOracle recommends gas prices based on the content of recent +// blocks. Suitable for both light and full clients. +type LightPriceOracle struct { + backend ethapi.Backend + lastHead common.Hash + lastPrice *big.Int + cacheLock sync.RWMutex + fetchLock sync.Mutex +} + +// NewLightPriceOracle returns a new oracle. +func NewLightPriceOracle(backend ethapi.Backend) *LightPriceOracle { + return &LightPriceOracle{ + backend: backend, + lastPrice: big.NewInt(LpoDefaultPrice), + } +} + +// SuggestPrice returns the recommended gas price. +func (self *LightPriceOracle) SuggestPrice(ctx context.Context) (*big.Int, error) { + self.cacheLock.RLock() + lastHead := self.lastHead + lastPrice := self.lastPrice + self.cacheLock.RUnlock() + + head, _ := self.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + headHash := head.Hash() + if headHash == lastHead { + return lastPrice, nil + } + + self.fetchLock.Lock() + defer self.fetchLock.Unlock() + + // try checking the cache again, maybe the last fetch fetched what we need + self.cacheLock.RLock() + lastHead = self.lastHead + lastPrice = self.lastPrice + self.cacheLock.RUnlock() + if headHash == lastHead { + return lastPrice, nil + } + + blockNum := head.GetNumberU64() + chn := make(chan lpResult, LpoMaxBlocks) + sent := 0 + exp := 0 + var lps bigIntArray + for sent < LpoAvgCount && blockNum > 0 { + go self.getLowestPrice(ctx, blockNum, chn) + sent++ + exp++ + blockNum-- + } + maxEmpty := LpoAvgCount - LpoMinCount + for exp > 0 { + res := <-chn + if res.err != nil { + return nil, res.err + } + exp-- + if res.price != nil { + lps = append(lps, res.price) + } else { + if maxEmpty > 0 { + maxEmpty-- + } else { + if blockNum > 0 && sent < LpoMaxBlocks { + go self.getLowestPrice(ctx, blockNum, chn) + sent++ + exp++ + blockNum-- + } + } + } + } + price := lastPrice + if len(lps) > 0 { + sort.Sort(lps) + price = lps[(len(lps)-1)*LpoSelect/100] + } + + self.cacheLock.Lock() + self.lastHead = headHash + self.lastPrice = price + self.cacheLock.Unlock() + return price, nil +} + +type lpResult struct { + price *big.Int + err error +} + +// getLowestPrice calculates the lowest transaction gas price in a given block +// and sends it to the result channel. If the block is empty, price is nil. +func (self *LightPriceOracle) getLowestPrice(ctx context.Context, blockNum uint64, chn chan lpResult) { + block, err := self.backend.BlockByNumber(ctx, rpc.BlockNumber(blockNum)) + if block == nil { + chn <- lpResult{nil, err} + return + } + txs := block.Transactions() + if len(txs) == 0 { + chn <- lpResult{nil, nil} + return + } + // find smallest gasPrice + minPrice := txs[0].GasPrice() + for i := 1; i < len(txs); i++ { + price := txs[i].GasPrice() + if price.Cmp(minPrice) < 0 { + minPrice = price + } + } + chn <- lpResult{minPrice, nil} +} + +type bigIntArray []*big.Int + +func (s bigIntArray) Len() int { return len(s) } +func (s bigIntArray) Less(i, j int) bool { return s[i].Cmp(s[j]) < 0 } +func (s bigIntArray) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |