aboutsummaryrefslogtreecommitdiffstats
path: root/xeth/xeth.go
diff options
context:
space:
mode:
Diffstat (limited to 'xeth/xeth.go')
-rw-r--r--xeth/xeth.go46
1 files changed, 34 insertions, 12 deletions
diff --git a/xeth/xeth.go b/xeth/xeth.go
index 00b70da6c..623b3a963 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -532,8 +532,10 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
self.logMu.Lock()
defer self.logMu.Unlock()
- var id int
filter := core.NewFilter(self.backend)
+ id := self.filterManager.InstallFilter(filter)
+ self.logQueue[id] = &logQueue{timeout: time.Now()}
+
filter.SetEarliestBlock(earliest)
filter.SetLatestBlock(latest)
filter.SetSkip(skip)
@@ -544,10 +546,10 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
self.logMu.Lock()
defer self.logMu.Unlock()
- self.logQueue[id].add(logs...)
+ if queue := self.logQueue[id]; queue != nil {
+ queue.add(logs...)
+ }
}
- id = self.filterManager.InstallFilter(filter)
- self.logQueue[id] = &logQueue{timeout: time.Now()}
return id
}
@@ -556,16 +558,18 @@ func (self *XEth) NewTransactionFilter() int {
self.transactionMu.Lock()
defer self.transactionMu.Unlock()
- var id int
filter := core.NewFilter(self.backend)
+ id := self.filterManager.InstallFilter(filter)
+ self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
+
filter.TransactionCallback = func(tx *types.Transaction) {
self.transactionMu.Lock()
defer self.transactionMu.Unlock()
- self.transactionQueue[id].add(tx.Hash())
+ if queue := self.transactionQueue[id]; queue != nil {
+ queue.add(tx.Hash())
+ }
}
- id = self.filterManager.InstallFilter(filter)
- self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
return id
}
@@ -573,16 +577,18 @@ func (self *XEth) NewBlockFilter() int {
self.blockMu.Lock()
defer self.blockMu.Unlock()
- var id int
filter := core.NewFilter(self.backend)
+ id := self.filterManager.InstallFilter(filter)
+ self.blockQueue[id] = &hashQueue{timeout: time.Now()}
+
filter.BlockCallback = func(block *types.Block, logs state.Logs) {
self.blockMu.Lock()
defer self.blockMu.Unlock()
- self.blockQueue[id].add(block.Hash())
+ if queue := self.blockQueue[id]; queue != nil {
+ queue.add(block.Hash())
+ }
}
- id = self.filterManager.InstallFilter(filter)
- self.blockQueue[id] = &hashQueue{timeout: time.Now()}
return id
}
@@ -1022,16 +1028,24 @@ func (m callmsg) Value() *big.Int { return m.value }
func (m callmsg) Data() []byte { return m.data }
type logQueue struct {
+ mu sync.Mutex
+
logs state.Logs
timeout time.Time
id int
}
func (l *logQueue) add(logs ...*state.Log) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
l.logs = append(l.logs, logs...)
}
func (l *logQueue) get() state.Logs {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
@@ -1039,16 +1053,24 @@ func (l *logQueue) get() state.Logs {
}
type hashQueue struct {
+ mu sync.Mutex
+
hashes []common.Hash
timeout time.Time
id int
}
func (l *hashQueue) add(hashes ...common.Hash) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
l.hashes = append(l.hashes, hashes...)
}
func (l *hashQueue) get() []common.Hash {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil