aboutsummaryrefslogtreecommitdiffstats
path: root/ethchain
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2014-02-25 18:22:27 +0800
committerobscuren <geffobscura@gmail.com>2014-02-25 18:22:27 +0800
commite98b53bbef8cdbeed54546c75d856d53810e424c (patch)
treeabda9422caf3fe0aefd627e0ad43406b11b6e90b /ethchain
parent4b8c50e2cde130bf278b14040a267aab573dd53e (diff)
downloaddexon-e98b53bbef8cdbeed54546c75d856d53810e424c.tar.gz
dexon-e98b53bbef8cdbeed54546c75d856d53810e424c.tar.zst
dexon-e98b53bbef8cdbeed54546c75d856d53810e424c.zip
WIP Observing pattern
Diffstat (limited to 'ethchain')
-rw-r--r--ethchain/transaction_pool.go41
1 files changed, 33 insertions, 8 deletions
diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go
index 1278cc4dc..cd09bf02e 100644
--- a/ethchain/transaction_pool.go
+++ b/ethchain/transaction_pool.go
@@ -17,6 +17,17 @@ const (
)
type TxPoolHook chan *Transaction
+type TxMsgTy byte
+
+const (
+ TxPre = iota
+ TxPost
+)
+
+type TxMsg struct {
+ Tx *Transaction
+ Type TxMsgTy
+}
func FindTx(pool *list.List, finder func(*Transaction, *list.Element) bool) *Transaction {
for e := pool.Front(); e != nil; e = e.Next() {
@@ -59,6 +70,8 @@ type TxPool struct {
BlockManager *BlockManager
SecondaryProcessor TxProcessor
+
+ subscribers []chan TxMsg
}
func NewTxPool() *TxPool {
@@ -73,21 +86,17 @@ func NewTxPool() *TxPool {
// Blocking function. Don't use directly. Use QueueTransaction instead
func (pool *TxPool) addTransaction(tx *Transaction) {
- log.Println("Adding tx to pool")
pool.mutex.Lock()
pool.pool.PushBack(tx)
pool.mutex.Unlock()
// Broadcast the transaction to the rest of the peers
pool.Speaker.Broadcast(ethwire.MsgTxTy, []interface{}{tx.RlpData()})
- log.Println("broadcasting it")
}
// Process transaction validates the Tx and processes funds from the
// sender to the recipient.
func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error) {
- log.Printf("[TXPL] Processing Tx %x\n", tx.Hash())
-
defer func() {
if r := recover(); r != nil {
log.Println(r)
@@ -132,6 +141,11 @@ func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error
block.UpdateAddr(tx.Sender(), sender)
+ log.Printf("[TXPL] Processed Tx %x\n", tx.Hash())
+
+ // Notify the subscribers
+ pool.notifySubscribers(TxPost, tx)
+
return
}
@@ -145,7 +159,8 @@ func (pool *TxPool) ValidateTransaction(tx *Transaction) error {
}
// Get the sender
- sender := block.GetAddr(tx.Sender())
+ accountState := pool.BlockManager.GetAddrState(tx.Sender())
+ sender := accountState.Account
totAmount := new(big.Int).Add(tx.Value, new(big.Int).Mul(TxFee, TxFeeRat))
// Make sure there's enough in the sender's account. Having insufficient
@@ -185,9 +200,8 @@ out:
// doesn't matter since this is a goroutine
pool.addTransaction(tx)
- if pool.SecondaryProcessor != nil {
- pool.SecondaryProcessor.ProcessTransaction(tx)
- }
+ // Notify the subscribers
+ pool.notifySubscribers(TxPre, tx)
}
case <-pool.quit:
break out
@@ -231,3 +245,14 @@ func (pool *TxPool) Stop() {
pool.Flush()
}
+
+func (pool *TxPool) Subscribe(channel chan TxMsg) {
+ pool.subscribers = append(pool.subscribers, channel)
+}
+
+func (pool *TxPool) notifySubscribers(ty TxMsgTy, tx *Transaction) {
+ msg := TxMsg{Type: ty, Tx: tx}
+ for _, subscriber := range pool.subscribers {
+ subscriber <- msg
+ }
+}