diff options
-rw-r--r-- | core/blockchain.go | 29 | ||||
-rw-r--r-- | core/events.go | 1 | ||||
-rw-r--r-- | core/vm/log.go | 142 | ||||
-rw-r--r-- | core/vm/log_test.go | 73 | ||||
-rw-r--r-- | eth/filters/api.go | 19 | ||||
-rw-r--r-- | eth/filters/filter.go | 27 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 76 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 50 |
8 files changed, 252 insertions, 165 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 2eb207d39..1f762d147 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -988,7 +988,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { glog.Infof("inserted forked block #%d [%x…] (TD=%v) in %9v: %3d txs %d uncles.", block.Number(), block.Hash().Bytes()[0:4], block.Difficulty(), common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), len(block.Uncles())) } blockInsertTimer.UpdateSince(bstart) - events = append(events, ChainSideEvent{block, logs}) + events = append(events, ChainSideEvent{block}) case SplitStatTy: events = append(events, ChainSplitEvent{block, logs}) @@ -1062,24 +1062,25 @@ func countTransactions(chain []*types.Block) (c int) { // event about them func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { var ( - newChain types.Blocks - oldChain types.Blocks - commonBlock *types.Block - oldStart = oldBlock - newStart = newBlock - deletedTxs types.Transactions - deletedLogs vm.Logs - deletedLogsByHash = make(map[common.Hash]vm.Logs) + newChain types.Blocks + oldChain types.Blocks + commonBlock *types.Block + oldStart = oldBlock + newStart = newBlock + deletedTxs types.Transactions + deletedLogs vm.Logs // collectLogs collects the logs that were generated during the // processing of the block that corresponds with the given hash. // These logs are later announced as deleted. collectLogs = func(h common.Hash) { - // Coalesce logs + // Coalesce logs and set 'Removed'. receipts := GetBlockReceipts(self.chainDb, h, self.hc.GetBlockNumber(h)) for _, receipt := range receipts { - deletedLogs = append(deletedLogs, receipt.Logs...) - - deletedLogsByHash[h] = receipt.Logs + for _, log := range receipt.Logs { + del := *log + del.Removed = true + deletedLogs = append(deletedLogs, &del) + } } } ) @@ -1173,7 +1174,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if len(oldChain) > 0 { go func() { for _, block := range oldChain { - self.eventMux.Post(ChainSideEvent{Block: block, Logs: deletedLogsByHash[block.Hash()]}) + self.eventMux.Post(ChainSideEvent{Block: block}) } }() } diff --git a/core/events.go b/core/events.go index 322bcb769..414493fbf 100644 --- a/core/events.go +++ b/core/events.go @@ -61,7 +61,6 @@ type ChainEvent struct { type ChainSideEvent struct { Block *types.Block - Logs vm.Logs } type PendingBlockEvent struct { diff --git a/core/vm/log.go b/core/vm/log.go index 06f941703..347bd6e5d 100644 --- a/core/vm/log.go +++ b/core/vm/log.go @@ -29,20 +29,42 @@ import ( var errMissingLogFields = errors.New("missing required JSON log fields") -// Log represents a contract log event. These events are generated by the LOG -// opcode and stored/indexed by the node. +// Log represents a contract log event. These events are generated by the LOG opcode and +// stored/indexed by the node. type Log struct { // Consensus fields. Address common.Address // address of the contract that generated the event Topics []common.Hash // list of topics provided by the contract. Data []byte // supplied by the contract, usually ABI-encoded - // Derived fields (don't reorder!). + // Derived fields. These fields are filled in by the node + // but not secured by consensus. BlockNumber uint64 // block in which the transaction was included TxHash common.Hash // hash of the transaction TxIndex uint // index of the transaction in the block BlockHash common.Hash // hash of the block in which the transaction was included Index uint // index of the log in the receipt + + // The Removed field is true if this log was reverted due to a chain reorganisation. + // You must pay attention to this field if you receive logs through a filter query. + Removed bool +} + +type rlpLog struct { + Address common.Address + Topics []common.Hash + Data []byte +} + +type rlpStorageLog struct { + Address common.Address + Topics []common.Hash + Data []byte + BlockNumber uint64 + TxHash common.Hash + TxIndex uint + BlockHash common.Hash + Index uint } type jsonLog struct { @@ -54,27 +76,26 @@ type jsonLog struct { TxHash *common.Hash `json:"transactionHash"` BlockHash *common.Hash `json:"blockHash"` Index *hexutil.Uint `json:"logIndex"` + Removed bool `json:"removed"` } func NewLog(address common.Address, topics []common.Hash, data []byte, number uint64) *Log { return &Log{Address: address, Topics: topics, Data: data, BlockNumber: number} } +// EncodeRLP implements rlp.Encoder. func (l *Log) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{l.Address, l.Topics, l.Data}) + return rlp.Encode(w, rlpLog{Address: l.Address, Topics: l.Topics, Data: l.Data}) } +// DecodeRLP implements rlp.Decoder. func (l *Log) DecodeRLP(s *rlp.Stream) error { - var log struct { - Address common.Address - Topics []common.Hash - Data []byte + var dec rlpLog + err := s.Decode(&dec) + if err == nil { + l.Address, l.Topics, l.Data = dec.Address, dec.Topics, dec.Data } - if err := s.Decode(&log); err != nil { - return err - } - l.Address, l.Topics, l.Data = log.Address, log.Topics, log.Data - return nil + return err } func (l *Log) String() string { @@ -82,45 +103,88 @@ func (l *Log) String() string { } // MarshalJSON implements json.Marshaler. -func (r *Log) MarshalJSON() ([]byte, error) { - return json.Marshal(&jsonLog{ - Address: &r.Address, - Topics: &r.Topics, - Data: (*hexutil.Bytes)(&r.Data), - BlockNumber: (*hexutil.Uint64)(&r.BlockNumber), - TxIndex: (*hexutil.Uint)(&r.TxIndex), - TxHash: &r.TxHash, - BlockHash: &r.BlockHash, - Index: (*hexutil.Uint)(&r.Index), - }) +func (l *Log) MarshalJSON() ([]byte, error) { + jslog := &jsonLog{ + Address: &l.Address, + Topics: &l.Topics, + Data: (*hexutil.Bytes)(&l.Data), + TxIndex: (*hexutil.Uint)(&l.TxIndex), + TxHash: &l.TxHash, + Index: (*hexutil.Uint)(&l.Index), + Removed: l.Removed, + } + // Set block information for mined logs. + if (l.BlockHash != common.Hash{}) { + jslog.BlockHash = &l.BlockHash + jslog.BlockNumber = (*hexutil.Uint64)(&l.BlockNumber) + } + return json.Marshal(jslog) } // UnmarshalJSON implements json.Umarshaler. -func (r *Log) UnmarshalJSON(input []byte) error { +func (l *Log) UnmarshalJSON(input []byte) error { var dec jsonLog if err := json.Unmarshal(input, &dec); err != nil { return err } - if dec.Address == nil || dec.Topics == nil || dec.Data == nil || dec.BlockNumber == nil || - dec.TxIndex == nil || dec.TxHash == nil || dec.BlockHash == nil || dec.Index == nil { + if dec.Address == nil || dec.Topics == nil || dec.Data == nil || + dec.TxIndex == nil || dec.TxHash == nil || dec.Index == nil { return errMissingLogFields } - *r = Log{ - Address: *dec.Address, - Topics: *dec.Topics, - Data: *dec.Data, - BlockNumber: uint64(*dec.BlockNumber), - TxHash: *dec.TxHash, - TxIndex: uint(*dec.TxIndex), - BlockHash: *dec.BlockHash, - Index: uint(*dec.Index), + declog := Log{ + Address: *dec.Address, + Topics: *dec.Topics, + Data: *dec.Data, + TxHash: *dec.TxHash, + TxIndex: uint(*dec.TxIndex), + Index: uint(*dec.Index), + Removed: dec.Removed, + } + // Block information may be missing if the log is received through + // the pending log filter, so it's handled specially here. + if dec.BlockHash != nil && dec.BlockNumber != nil { + declog.BlockHash = *dec.BlockHash + declog.BlockNumber = uint64(*dec.BlockNumber) } + *l = declog return nil } type Logs []*Log -// LogForStorage is a wrapper around a Log that flattens and parses the entire -// content of a log, as opposed to only the consensus fields originally (by hiding -// the rlp interface methods). +// LogForStorage is a wrapper around a Log that flattens and parses the entire content of +// a log including non-consensus fields. type LogForStorage Log + +// EncodeRLP implements rlp.Encoder. +func (l *LogForStorage) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, rlpStorageLog{ + Address: l.Address, + Topics: l.Topics, + Data: l.Data, + BlockNumber: l.BlockNumber, + TxHash: l.TxHash, + TxIndex: l.TxIndex, + BlockHash: l.BlockHash, + Index: l.Index, + }) +} + +// DecodeRLP implements rlp.Decoder. +func (l *LogForStorage) DecodeRLP(s *rlp.Stream) error { + var dec rlpStorageLog + err := s.Decode(&dec) + if err == nil { + *l = LogForStorage{ + Address: dec.Address, + Topics: dec.Topics, + Data: dec.Data, + BlockNumber: dec.BlockNumber, + TxHash: dec.TxHash, + TxIndex: dec.TxIndex, + BlockHash: dec.BlockHash, + Index: dec.Index, + } + } + return err +} diff --git a/core/vm/log_test.go b/core/vm/log_test.go index 4d3189558..994753c62 100644 --- a/core/vm/log_test.go +++ b/core/vm/log_test.go @@ -18,18 +18,81 @@ package vm import ( "encoding/json" + "reflect" "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" ) var unmarshalLogTests = map[string]struct { input string + want *Log wantError error }{ "ok": { - input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x000000000000000000000000000000000000000000000001a055690d9db80000","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615","0x000000000000000000000000f9dff387dcb5cc4cca5b91adb07a95f54e9f1bb6"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`, + input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x000000000000000000000000000000000000000000000001a055690d9db80000","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`, + want: &Log{ + Address: common.HexToAddress("0xecf8f87f810ecf450940c9f60066b4a7a501d6a7"), + BlockHash: common.HexToHash("0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056"), + BlockNumber: 2019236, + Data: hexutil.MustDecode("0x000000000000000000000000000000000000000000000001a055690d9db80000"), + Index: 2, + TxIndex: 3, + TxHash: common.HexToHash("0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e"), + Topics: []common.Hash{ + common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), + common.HexToHash("0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615"), + }, + }, }, "empty data": { - input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615","0x000000000000000000000000f9dff387dcb5cc4cca5b91adb07a95f54e9f1bb6"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`, + input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`, + want: &Log{ + Address: common.HexToAddress("0xecf8f87f810ecf450940c9f60066b4a7a501d6a7"), + BlockHash: common.HexToHash("0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056"), + BlockNumber: 2019236, + Data: []byte{}, + Index: 2, + TxIndex: 3, + TxHash: common.HexToHash("0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e"), + Topics: []common.Hash{ + common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), + common.HexToHash("0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615"), + }, + }, + }, + "missing block fields (pending logs)": { + input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","data":"0x","logIndex":"0x0","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`, + want: &Log{ + Address: common.HexToAddress("0xecf8f87f810ecf450940c9f60066b4a7a501d6a7"), + BlockHash: common.Hash{}, + BlockNumber: 0, + Data: []byte{}, + Index: 0, + TxIndex: 3, + TxHash: common.HexToHash("0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e"), + Topics: []common.Hash{ + common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), + }, + }, + }, + "Removed: true": { + input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3","removed":true}`, + want: &Log{ + Address: common.HexToAddress("0xecf8f87f810ecf450940c9f60066b4a7a501d6a7"), + BlockHash: common.HexToHash("0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056"), + BlockNumber: 2019236, + Data: []byte{}, + Index: 2, + TxIndex: 3, + TxHash: common.HexToHash("0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e"), + Topics: []common.Hash{ + common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), + }, + Removed: true, + }, }, "missing data": { input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615","0x000000000000000000000000f9dff387dcb5cc4cca5b91adb07a95f54e9f1bb6"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`, @@ -38,10 +101,16 @@ var unmarshalLogTests = map[string]struct { } func TestUnmarshalLog(t *testing.T) { + dumper := spew.ConfigState{DisableMethods: true, Indent: " "} for name, test := range unmarshalLogTests { var log *Log err := json.Unmarshal([]byte(test.input), &log) checkError(t, name, err, test.wantError) + if test.wantError == nil && err == nil { + if !reflect.DeepEqual(log, test.want) { + t.Errorf("test %q:\nGOT %sWANT %s", name, dumper.Sdump(log), dumper.Sdump(test.want)) + } + } } } diff --git a/eth/filters/api.go b/eth/filters/api.go index d5dd57743..bbb34d3de 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" @@ -45,7 +46,7 @@ type filter struct { deadline *time.Timer // filter is inactiv when deadline triggers hashes []common.Hash crit FilterCriteria - logs []Log + logs []*vm.Log s *Subscription // associated subscription in event system } @@ -241,7 +242,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc var ( rpcSub = notifier.CreateSubscription() - matchedLogs = make(chan []Log) + matchedLogs = make(chan []*vm.Log) ) logsSub, err := api.events.SubscribeLogs(crit, matchedLogs) @@ -292,14 +293,14 @@ type FilterCriteria struct { // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { - logs := make(chan []Log) + logs := make(chan []*vm.Log) logsSub, err := api.events.SubscribeLogs(crit, logs) if err != nil { return rpc.ID(""), err } api.filtersMu.Lock() - api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub} + api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*vm.Log, 0), s: logsSub} api.filtersMu.Unlock() go func() { @@ -326,7 +327,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { // GetLogs returns logs matching the given argument that are stored within the state. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs -func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]Log, error) { +func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*vm.Log, error) { if crit.FromBlock == nil { crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } @@ -365,7 +366,7 @@ func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { // If the filter could not be found an empty array of logs is returned. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs -func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log, error) { +func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*vm.Log, error) { api.filtersMu.Lock() f, found := api.filters[id] api.filtersMu.Unlock() @@ -388,7 +389,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log filter.SetAddresses(f.crit.Addresses) filter.SetTopics(f.crit.Topics) - logs, err:= filter.Find(ctx) + logs, err := filter.Find(ctx) if err != nil { return nil, err } @@ -440,9 +441,9 @@ func returnHashes(hashes []common.Hash) []common.Hash { // returnLogs is a helper that will return an empty log array in case the given logs array is nil, // otherwise the given logs array is returned. -func returnLogs(logs []Log) []Log { +func returnLogs(logs []*vm.Log) []*vm.Log { if logs == nil { - return []Log{} + return []*vm.Log{} } return logs } diff --git a/eth/filters/filter.go b/eth/filters/filter.go index ce7383fb3..a695d7eb7 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" @@ -38,7 +39,7 @@ type Backend interface { GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) } -// Filter can be used to retrieve and filter logs +// Filter can be used to retrieve and filter logs. type Filter struct { backend Backend useMipMap bool @@ -85,7 +86,7 @@ func (f *Filter) SetTopics(topics [][]common.Hash) { } // Run filters logs with the current parameters set -func (f *Filter) Find(ctx context.Context) ([]Log, error) { +func (f *Filter) Find(ctx context.Context) ([]*vm.Log, error) { head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if head == nil { return nil, nil @@ -110,7 +111,7 @@ func (f *Filter) Find(ctx context.Context) ([]Log, error) { return f.mipFind(beginBlockNo, endBlockNo, 0), nil } -func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) { +func (f *Filter) mipFind(start, end uint64, depth int) (logs []*vm.Log) { level := core.MIPMapLevels[depth] // normalise numerator so we can work in level specific batches and // work with the proper range checks @@ -141,7 +142,7 @@ func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) { return logs } -func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, err error) { +func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*vm.Log, err error) { for i := start; i <= end; i++ { header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(i)) if header == nil || err != nil { @@ -156,13 +157,9 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, er if err != nil { return nil, err } - var unfiltered []Log + var unfiltered []*vm.Log for _, receipt := range receipts { - rl := make([]Log, len(receipt.Logs)) - for i, l := range receipt.Logs { - rl[i] = Log{l, false} - } - unfiltered = append(unfiltered, rl...) + unfiltered = append(unfiltered, ([]*vm.Log)(receipt.Logs)...) } logs = append(logs, filterLogs(unfiltered, nil, nil, f.addresses, f.topics)...) } @@ -181,15 +178,15 @@ func includes(addresses []common.Address, a common.Address) bool { return false } -func filterLogs(logs []Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []Log { - var ret []Log - // Filter the logs for interesting stuff +// filterLogs creates a slice of logs matching the given criteria. +func filterLogs(logs []*vm.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*vm.Log { + var ret []*vm.Log Logs: for _, log := range logs { - if fromBlock != nil && fromBlock.Int64() >= 0 && uint64(fromBlock.Int64()) > log.BlockNumber { + if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { continue } - if toBlock != nil && toBlock.Int64() >= 0 && uint64(toBlock.Int64()) < log.BlockNumber { + if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { continue } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index b59718aea..1b360cfdb 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -19,7 +19,6 @@ package filters import ( - "encoding/json" "errors" "fmt" "sync" @@ -60,42 +59,12 @@ var ( ErrInvalidSubscriptionID = errors.New("invalid id") ) -// Log is a helper that can hold additional information about vm.Log -// necessary for the RPC interface. -type Log struct { - *vm.Log - Removed bool `json:"removed"` -} - -// MarshalJSON returns *l as the JSON encoding of l. -func (l *Log) MarshalJSON() ([]byte, error) { - fields := map[string]interface{}{ - "address": l.Address, - "data": fmt.Sprintf("0x%x", l.Data), - "blockNumber": nil, - "logIndex": fmt.Sprintf("%#x", l.Index), - "blockHash": nil, - "transactionHash": l.TxHash, - "transactionIndex": fmt.Sprintf("%#x", l.TxIndex), - "topics": l.Topics, - "removed": l.Removed, - } - - // mined logs - if l.BlockHash != (common.Hash{}) { - fields["blockNumber"] = fmt.Sprintf("%#x", l.BlockNumber) - fields["blockHash"] = l.BlockHash - } - - return json.Marshal(fields) -} - type subscription struct { id rpc.ID typ Type created time.Time logsCrit FilterCriteria - logs chan []Log + logs chan []*vm.Log hashes chan common.Hash headers chan *types.Header installed chan struct{} // closed when the filter is installed @@ -182,7 +151,7 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription { // SubscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. Default value for the from and to // block is "latest". If the fromBlock > toBlock an error is returned. -func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Subscription, error) { +func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []*vm.Log) (*Subscription, error) { var from, to rpc.BlockNumber if crit.FromBlock == nil { from = rpc.LatestBlockNumber @@ -220,7 +189,7 @@ func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Sub // subscribeMinedPendingLogs creates a subscription that returned mined and // pending logs that match the given criteria. -func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { +func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: MinedAndPendingLogsSubscription, @@ -238,7 +207,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. -func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription { +func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: LogsSubscription, @@ -256,7 +225,7 @@ func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subs // subscribePendingLogs creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { +func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingLogsSubscription, @@ -279,7 +248,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti id: rpc.NewID(), typ: BlocksSubscription, created: time.Now(), - logs: make(chan []Log), + logs: make(chan []*vm.Log), hashes: make(chan common.Hash), headers: headers, installed: make(chan struct{}), @@ -296,7 +265,7 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr id: rpc.NewID(), typ: PendingTransactionsSubscription, created: time.Now(), - logs: make(chan []Log), + logs: make(chan []*vm.Log), hashes: hashes, headers: make(chan *types.Header), installed: make(chan struct{}), @@ -319,7 +288,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { if len(e) > 0 { for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -328,7 +297,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { case core.RemovedLogsEvent: for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -336,7 +305,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { case core.PendingLogsEvent: for _, f := range filters[PendingLogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e.Logs, false), nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -401,25 +370,22 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func } // filter logs of a single header in light client mode -func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []Log { - //fmt.Println("lightFilterLogs", header.Number.Uint64(), remove) +func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*vm.Log { if bloomFilter(header.Bloom, addresses, topics) { - //fmt.Println("bloom match") // Get the logs of the block ctx, _ := context.WithTimeout(context.Background(), time.Second*5) receipts, err := es.backend.GetReceipts(ctx, header.Hash()) if err != nil { return nil } - var unfiltered []Log + var unfiltered []*vm.Log for _, receipt := range receipts { - rl := make([]Log, len(receipt.Logs)) - for i, l := range receipt.Logs { - rl[i] = Log{l, remove} + for _, log := range receipt.Logs { + logcopy := *log + logcopy.Removed = remove + unfiltered = append(unfiltered, &logcopy) } - unfiltered = append(unfiltered, rl...) } - logs := filterLogs(unfiltered, nil, nil, addresses, topics) return logs } @@ -465,13 +431,3 @@ func (es *EventSystem) eventLoop() { } } } - -// convertLogs is a helper utility that converts vm.Logs to []filter.Log. -func convertLogs(in vm.Logs, removed bool) []Log { - - logs := make([]Log, len(in)) - for i, l := range in { - logs[i] = Log{l, removed} - } - return logs -} diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index e8591a2e4..3ce0cf663 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -74,10 +74,10 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false) genesis = core.WriteGenesisBlockForTesting(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {}) @@ -128,10 +128,10 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), @@ -178,10 +178,10 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false) testCases = []struct { crit FilterCriteria @@ -223,10 +223,10 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false) ) // different situations where log filter creation should fail. @@ -249,10 +249,10 @@ func TestLogFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -321,14 +321,14 @@ func TestLogFilter(t *testing.T) { } for i, tt := range testCases { - var fetched []Log + var fetched []*vm.Log for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { t.Fatalf("Unable to fetch logs: %v", err) } - fetched = append(fetched, results.([]Log)...) + fetched = append(fetched, results.([]*vm.Log)...) if len(fetched) >= len(tt.expected) { break } @@ -345,7 +345,7 @@ func TestLogFilter(t *testing.T) { if fetched[l].Removed { t.Errorf("expected log not to be removed for log %d in case %d", l, i) } - if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { + if !reflect.DeepEqual(fetched[l], tt.expected[l]) { t.Errorf("invalid log on index %d for case %d", l, i) } } @@ -357,10 +357,10 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -397,7 +397,7 @@ func TestPendingLogsSubscription(t *testing.T) { testCases = []struct { crit FilterCriteria expected vm.Logs - c chan []Log + c chan []*vm.Log sub *Subscription }{ // match all @@ -423,7 +423,7 @@ func TestPendingLogsSubscription(t *testing.T) { // on slow machines this could otherwise lead to missing events when the subscription is created after // (some) events are posted. for i := range testCases { - testCases[i].c = make(chan []Log) + testCases[i].c = make(chan []*vm.Log) testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) } @@ -431,7 +431,7 @@ func TestPendingLogsSubscription(t *testing.T) { i := n tt := test go func() { - var fetched []Log + var fetched []*vm.Log fetchLoop: for { logs := <-tt.c @@ -449,7 +449,7 @@ func TestPendingLogsSubscription(t *testing.T) { if fetched[l].Removed { t.Errorf("expected log not to be removed for log %d in case %d", l, i) } - if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { + if !reflect.DeepEqual(fetched[l], tt.expected[l]) { t.Errorf("invalid log on index %d for case %d", l, i) } } |