diff options
-rw-r--r-- | ethchain/address.go | 30 | ||||
-rw-r--r-- | ethchain/block.go | 90 | ||||
-rw-r--r-- | ethchain/block_chain.go | 2 | ||||
-rw-r--r-- | ethchain/block_manager.go | 32 | ||||
-rw-r--r-- | ethchain/contract.go | 17 | ||||
-rw-r--r-- | ethchain/state.go | 67 | ||||
-rw-r--r-- | ethchain/transaction_pool.go | 8 | ||||
-rw-r--r-- | ethchain/vm.go | 4 | ||||
-rw-r--r-- | ethutil/common_test.go | 43 | ||||
-rw-r--r-- | ethutil/reactor.go | 86 | ||||
-rw-r--r-- | ethutil/reactor_test.go | 30 | ||||
-rw-r--r-- | peer.go | 6 |
12 files changed, 282 insertions, 133 deletions
diff --git a/ethchain/address.go b/ethchain/address.go index a228c7566..aa1709f2c 100644 --- a/ethchain/address.go +++ b/ethchain/address.go @@ -5,31 +5,31 @@ import ( "math/big" ) -type Address struct { +type Account struct { Amount *big.Int Nonce uint64 } -func NewAddress(amount *big.Int) *Address { - return &Address{Amount: amount, Nonce: 0} +func NewAccount(amount *big.Int) *Account { + return &Account{Amount: amount, Nonce: 0} } -func NewAddressFromData(data []byte) *Address { - address := &Address{} +func NewAccountFromData(data []byte) *Account { + address := &Account{} address.RlpDecode(data) return address } -func (a *Address) AddFee(fee *big.Int) { +func (a *Account) AddFee(fee *big.Int) { a.Amount.Add(a.Amount, fee) } -func (a *Address) RlpEncode() []byte { +func (a *Account) RlpEncode() []byte { return ethutil.Encode([]interface{}{a.Amount, a.Nonce}) } -func (a *Address) RlpDecode(data []byte) { +func (a *Account) RlpDecode(data []byte) { decoder := ethutil.NewValueFromBytes(data) a.Amount = decoder.Get(0).BigInt() @@ -37,24 +37,24 @@ func (a *Address) RlpDecode(data []byte) { } type AddrStateStore struct { - states map[string]*AddressState + states map[string]*AccountState } func NewAddrStateStore() *AddrStateStore { - return &AddrStateStore{states: make(map[string]*AddressState)} + return &AddrStateStore{states: make(map[string]*AccountState)} } -func (s *AddrStateStore) Add(addr []byte, account *Address) *AddressState { - state := &AddressState{Nonce: account.Nonce, Account: account} +func (s *AddrStateStore) Add(addr []byte, account *Account) *AccountState { + state := &AccountState{Nonce: account.Nonce, Account: account} s.states[string(addr)] = state return state } -func (s *AddrStateStore) Get(addr []byte) *AddressState { +func (s *AddrStateStore) Get(addr []byte) *AccountState { return s.states[string(addr)] } -type AddressState struct { +type AccountState struct { Nonce uint64 - Account *Address + Account *Account } diff --git a/ethchain/block.go b/ethchain/block.go index 7ca44a47d..20af73ba2 100644 --- a/ethchain/block.go +++ b/ethchain/block.go @@ -34,7 +34,8 @@ type Block struct { // The coin base address Coinbase []byte // Block Trie state - state *ethutil.Trie + //state *ethutil.Trie + state *State // Difficulty for the current block Difficulty *big.Int // Creation time @@ -94,7 +95,7 @@ func CreateBlock(root interface{}, block.SetTransactions(txes) block.SetUncles([]*Block{}) - block.state = ethutil.NewTrie(ethutil.Config.Db, root) + block.state = NewState(ethutil.NewTrie(ethutil.Config.Db, root)) for _, tx := range txes { block.MakeContract(tx) @@ -109,15 +110,15 @@ func (block *Block) Hash() []byte { } func (block *Block) HashNoNonce() []byte { - return ethutil.Sha3Bin(ethutil.Encode([]interface{}{block.PrevHash, block.UncleSha, block.Coinbase, block.state.Root, block.TxSha, block.Difficulty, block.Time, block.Extra})) + return ethutil.Sha3Bin(ethutil.Encode([]interface{}{block.PrevHash, block.UncleSha, block.Coinbase, block.state.trie.Root, block.TxSha, block.Difficulty, block.Time, block.Extra})) } func (block *Block) PrintHash() { fmt.Println(block) - fmt.Println(ethutil.NewValue(ethutil.Encode([]interface{}{block.PrevHash, block.UncleSha, block.Coinbase, block.state.Root, block.TxSha, block.Difficulty, block.Time, block.Extra, block.Nonce}))) + fmt.Println(ethutil.NewValue(ethutil.Encode([]interface{}{block.PrevHash, block.UncleSha, block.Coinbase, block.state.trie.Root, block.TxSha, block.Difficulty, block.Time, block.Extra, block.Nonce}))) } -func (block *Block) State() *ethutil.Trie { +func (block *Block) State() *State { return block.state } @@ -125,54 +126,8 @@ func (block *Block) Transactions() []*Transaction { return block.transactions } -func (block *Block) GetContract(addr []byte) *Contract { - data := block.state.Get(string(addr)) - if data == "" { - return nil - } - - value := ethutil.NewValueFromBytes([]byte(data)) - if value.Len() == 2 { - return nil - } - - contract := &Contract{} - contract.RlpDecode([]byte(data)) - - cachedState := block.contractStates[string(addr)] - if cachedState != nil { - contract.state = cachedState - } else { - block.contractStates[string(addr)] = contract.state - } - - return contract -} -func (block *Block) UpdateContract(addr []byte, contract *Contract) { - // Make sure the state is synced - //contract.State().Sync() - - block.state.Update(string(addr), string(contract.RlpEncode())) -} - -func (block *Block) GetAddr(addr []byte) *Address { - var address *Address - - data := block.State().Get(string(addr)) - if data == "" { - address = NewAddress(big.NewInt(0)) - } else { - address = NewAddressFromData([]byte(data)) - } - - return address -} -func (block *Block) UpdateAddr(addr []byte, address *Address) { - block.state.Update(string(addr), string(address.RlpEncode())) -} - func (block *Block) PayFee(addr []byte, fee *big.Int) bool { - contract := block.GetContract(addr) + contract := block.state.GetContract(addr) // If we can't pay the fee return if contract == nil || contract.Amount.Cmp(fee) < 0 /* amount < fee */ { fmt.Println("Contract has insufficient funds", contract.Amount, fee) @@ -182,17 +137,17 @@ func (block *Block) PayFee(addr []byte, fee *big.Int) bool { base := new(big.Int) contract.Amount = base.Sub(contract.Amount, fee) - block.state.Update(string(addr), string(contract.RlpEncode())) + block.state.trie.Update(string(addr), string(contract.RlpEncode())) - data := block.state.Get(string(block.Coinbase)) + data := block.state.trie.Get(string(block.Coinbase)) // Get the ether (Coinbase) and add the fee (gief fee to miner) - ether := NewAddressFromData([]byte(data)) + ether := NewAccountFromData([]byte(data)) base = new(big.Int) ether.Amount = base.Add(ether.Amount, fee) - block.state.Update(string(block.Coinbase), string(ether.RlpEncode())) + block.state.trie.Update(string(block.Coinbase), string(ether.RlpEncode())) return true } @@ -207,27 +162,18 @@ func (block *Block) BlockInfo() BlockInfo { // Sync the block's state and contract respectively func (block *Block) Sync() { - // Sync all contracts currently in cache - for _, val := range block.contractStates { - val.Sync() - } - // Sync the block state itself block.state.Sync() } func (block *Block) Undo() { - // Sync all contracts currently in cache - for _, val := range block.contractStates { - val.Undo() - } // Sync the block state itself - block.state.Undo() + block.state.Reset() } func (block *Block) MakeContract(tx *Transaction) { - contract := MakeContract(tx, NewState(block.state)) + contract := MakeContract(tx, block.state) if contract != nil { - block.contractStates[string(tx.Hash()[12:])] = contract.state + block.state.states[string(tx.Hash()[12:])] = contract.state } } @@ -308,7 +254,7 @@ func (block *Block) RlpValueDecode(decoder *ethutil.Value) { block.PrevHash = header.Get(0).Bytes() block.UncleSha = header.Get(1).Bytes() block.Coinbase = header.Get(2).Bytes() - block.state = ethutil.NewTrie(ethutil.Config.Db, header.Get(3).Val) + block.state = NewState(ethutil.NewTrie(ethutil.Config.Db, header.Get(3).Val)) block.TxSha = header.Get(4).Bytes() block.Difficulty = header.Get(5).BigInt() block.Time = int64(header.Get(6).BigInt().Uint64()) @@ -345,7 +291,7 @@ func NewUncleBlockFromValue(header *ethutil.Value) *Block { block.PrevHash = header.Get(0).Bytes() block.UncleSha = header.Get(1).Bytes() block.Coinbase = header.Get(2).Bytes() - block.state = ethutil.NewTrie(ethutil.Config.Db, header.Get(3).Val) + block.state = NewState(ethutil.NewTrie(ethutil.Config.Db, header.Get(3).Val)) block.TxSha = header.Get(4).Bytes() block.Difficulty = header.Get(5).BigInt() block.Time = int64(header.Get(6).BigInt().Uint64()) @@ -356,7 +302,7 @@ func NewUncleBlockFromValue(header *ethutil.Value) *Block { } func (block *Block) String() string { - return fmt.Sprintf("Block(%x):\nPrevHash:%x\nUncleSha:%x\nCoinbase:%x\nRoot:%x\nTxSha:%x\nDiff:%v\nTime:%d\nNonce:%x\nTxs:%d\n", block.Hash(), block.PrevHash, block.UncleSha, block.Coinbase, block.state.Root, block.TxSha, block.Difficulty, block.Time, block.Nonce, len(block.transactions)) + return fmt.Sprintf("Block(%x):\nPrevHash:%x\nUncleSha:%x\nCoinbase:%x\nRoot:%x\nTxSha:%x\nDiff:%v\nTime:%d\nNonce:%x\nTxs:%d\n", block.Hash(), block.PrevHash, block.UncleSha, block.Coinbase, block.state.trie.Root, block.TxSha, block.Difficulty, block.Time, block.Nonce, len(block.transactions)) } //////////// UNEXPORTED ///////////////// @@ -369,7 +315,7 @@ func (block *Block) header() []interface{} { // Coinbase address block.Coinbase, // root state - block.state.Root, + block.state.trie.Root, // Sha of tx block.TxSha, // Current block Difficulty diff --git a/ethchain/block_chain.go b/ethchain/block_chain.go index 96d22366d..026fc1cea 100644 --- a/ethchain/block_chain.go +++ b/ethchain/block_chain.go @@ -39,7 +39,7 @@ func (bc *BlockChain) NewBlock(coinbase []byte, txs []*Transaction) *Block { hash := ZeroHash256 if bc.CurrentBlock != nil { - root = bc.CurrentBlock.State().Root + root = bc.CurrentBlock.state.trie.Root hash = bc.LastBlockHash lastBlockTime = bc.CurrentBlock.Time } diff --git a/ethchain/block_manager.go b/ethchain/block_manager.go index 364a06158..b184fa9c9 100644 --- a/ethchain/block_manager.go +++ b/ethchain/block_manager.go @@ -51,9 +51,9 @@ func AddTestNetFunds(block *Block) { } { //log.Println("2^200 Wei to", addr) codedAddr, _ := hex.DecodeString(addr) - addr := block.GetAddr(codedAddr) + addr := block.state.GetAccount(codedAddr) addr.Amount = ethutil.BigPow(2, 200) - block.UpdateAddr(codedAddr, addr) + block.state.UpdateAccount(codedAddr, addr) } } @@ -71,7 +71,7 @@ func NewBlockManager(speaker PublicSpeaker) *BlockManager { if bm.bc.CurrentBlock == nil { AddTestNetFunds(bm.bc.genesisBlock) - bm.bc.genesisBlock.State().Sync() + bm.bc.genesisBlock.state.trie.Sync() // Prepare the genesis block bm.bc.Add(bm.bc.genesisBlock) @@ -85,17 +85,17 @@ func NewBlockManager(speaker PublicSpeaker) *BlockManager { } // Watches any given address and puts it in the address state store -func (bm *BlockManager) WatchAddr(addr []byte) *AddressState { - account := bm.bc.CurrentBlock.GetAddr(addr) +func (bm *BlockManager) WatchAddr(addr []byte) *AccountState { + account := bm.bc.CurrentBlock.state.GetAccount(addr) return bm.addrStateStore.Add(addr, account) } -func (bm *BlockManager) GetAddrState(addr []byte) *AddressState { +func (bm *BlockManager) GetAddrState(addr []byte) *AccountState { account := bm.addrStateStore.Get(addr) if account == nil { - a := bm.bc.CurrentBlock.GetAddr(addr) - account = &AddressState{Nonce: a.Nonce, Account: a} + a := bm.bc.CurrentBlock.state.GetAccount(addr) + account = &AccountState{Nonce: a.Nonce, Account: a} } return account @@ -112,7 +112,7 @@ func (bm *BlockManager) ApplyTransactions(block *Block, txs []*Transaction) { if tx.IsContract() { block.MakeContract(tx) } else { - if contract := block.GetContract(tx.Recipient); contract != nil { + if contract := block.state.GetContract(tx.Recipient); contract != nil { bm.ProcessContract(contract, tx, block) } else { err := bm.TransactionPool.ProcessTransaction(tx, block) @@ -161,8 +161,8 @@ func (bm *BlockManager) ProcessBlock(block *Block) error { return err } - if !block.State().Cmp(bm.bc.CurrentBlock.State()) { - return fmt.Errorf("Invalid merkle root. Expected %x, got %x", block.State().Root, bm.bc.CurrentBlock.State().Root) + if !block.state.Cmp(bm.bc.CurrentBlock.state) { + return fmt.Errorf("Invalid merkle root. Expected %x, got %x", block.State().trie.Root, bm.bc.CurrentBlock.State().trie.Root) } // Calculate the new total difficulty and sync back to the db @@ -267,17 +267,17 @@ func CalculateUncleReward(block *Block) *big.Int { func (bm *BlockManager) AccumelateRewards(processor *Block, block *Block) error { // Get the coinbase rlp data - addr := processor.GetAddr(block.Coinbase) + addr := processor.state.GetAccount(block.Coinbase) // Reward amount of ether to the coinbase address addr.AddFee(CalculateBlockReward(block, len(block.Uncles))) - processor.UpdateAddr(block.Coinbase, addr) + processor.state.UpdateAccount(block.Coinbase, addr) for _, uncle := range block.Uncles { - uncleAddr := processor.GetAddr(uncle.Coinbase) + uncleAddr := processor.state.GetAccount(uncle.Coinbase) uncleAddr.AddFee(CalculateUncleReward(uncle)) - processor.UpdateAddr(uncle.Coinbase, uncleAddr) + processor.state.UpdateAccount(uncle.Coinbase, uncleAddr) } return nil @@ -298,7 +298,7 @@ func (bm *BlockManager) ProcessContract(contract *Contract, tx *Transaction, blo */ vm := &Vm{} - vm.Process(contract, NewState(block.state), RuntimeVars{ + vm.Process(contract, block.state, RuntimeVars{ address: tx.Hash()[12:], blockNumber: block.BlockInfo().Number, sender: tx.Sender(), diff --git a/ethchain/contract.go b/ethchain/contract.go index dbcbb3697..21ac828fe 100644 --- a/ethchain/contract.go +++ b/ethchain/contract.go @@ -8,18 +8,19 @@ import ( type Contract struct { Amount *big.Int Nonce uint64 - state *ethutil.Trie + //state *ethutil.Trie + state *State } func NewContract(Amount *big.Int, root []byte) *Contract { contract := &Contract{Amount: Amount, Nonce: 0} - contract.state = ethutil.NewTrie(ethutil.Config.Db, string(root)) + contract.state = NewState(ethutil.NewTrie(ethutil.Config.Db, string(root))) return contract } func (c *Contract) RlpEncode() []byte { - return ethutil.Encode([]interface{}{c.Amount, c.Nonce, c.state.Root}) + return ethutil.Encode([]interface{}{c.Amount, c.Nonce, c.state.trie.Root}) } func (c *Contract) RlpDecode(data []byte) { @@ -27,18 +28,18 @@ func (c *Contract) RlpDecode(data []byte) { c.Amount = decoder.Get(0).BigInt() c.Nonce = decoder.Get(1).Uint() - c.state = ethutil.NewTrie(ethutil.Config.Db, decoder.Get(2).Interface()) + c.state = NewState(ethutil.NewTrie(ethutil.Config.Db, decoder.Get(2).Interface())) } func (c *Contract) Addr(addr []byte) *ethutil.Value { - return ethutil.NewValueFromBytes([]byte(c.state.Get(string(addr)))) + return ethutil.NewValueFromBytes([]byte(c.state.trie.Get(string(addr)))) } func (c *Contract) SetAddr(addr []byte, value interface{}) { - c.state.Update(string(addr), string(ethutil.NewValue(value).Encode())) + c.state.trie.Update(string(addr), string(ethutil.NewValue(value).Encode())) } -func (c *Contract) State() *ethutil.Trie { +func (c *Contract) State() *State { return c.state } @@ -59,7 +60,7 @@ func MakeContract(tx *Transaction, state *State) *Contract { for i, val := range tx.Data { if len(val) > 0 { bytNum := ethutil.BigToBytes(big.NewInt(int64(i)), 256) - contract.state.Update(string(bytNum), string(ethutil.Encode(val))) + contract.state.trie.Update(string(bytNum), string(ethutil.Encode(val))) } } state.trie.Update(string(addr), string(contract.RlpEncode())) diff --git a/ethchain/state.go b/ethchain/state.go index 1a18ea1d7..e6649cf22 100644 --- a/ethchain/state.go +++ b/ethchain/state.go @@ -5,12 +5,46 @@ import ( "math/big" ) +// States within the ethereum protocol are used to store anything +// within the merkle trie. States take care of caching and storing +// nested states. It's the general query interface to retrieve: +// * Contracts +// * Accounts type State struct { + // The trie for this structure trie *ethutil.Trie + // Nested states + states map[string]*State } +// Create a new state from a given trie func NewState(trie *ethutil.Trie) *State { - return &State{trie: trie} + return &State{trie: trie, states: make(map[string]*State)} +} + +// Resets the trie and all siblings +func (s *State) Reset() { + s.trie.Undo() + + // Reset all nested states + for _, state := range s.states { + state.Reset() + } +} + +// Syncs the trie and all siblings +func (s *State) Sync() { + s.trie.Sync() + + // Sync all nested states + for _, state := range s.states { + state.Sync() + } +} + +// Purges the current trie. +func (s *State) Purge() int { + return s.trie.NewIterator().Purge() } func (s *State) GetContract(addr []byte) *Contract { @@ -19,9 +53,28 @@ func (s *State) GetContract(addr []byte) *Contract { return nil } + // Whet get contract is called the retrieved value might + // be an account. The StateManager uses this to check + // to see if the address a tx was sent to is a contract + // or an account + value := ethutil.NewValueFromBytes([]byte(data)) + if value.Len() == 2 { + return nil + } + + // build contract contract := &Contract{} contract.RlpDecode([]byte(data)) + // Check if there's a cached state for this contract + cachedState := s.states[string(addr)] + if cachedState != nil { + contract.state = cachedState + } else { + // If it isn't cached, cache the state + s.states[string(addr)] = contract.state + } + return contract } @@ -40,17 +93,21 @@ func Compile(code []string) (script []string) { return } -func (s *State) GetAccount(addr []byte) (account *Address) { +func (s *State) GetAccount(addr []byte) (account *Account) { data := s.trie.Get(string(addr)) if data == "" { - account = NewAddress(big.NewInt(0)) + account = NewAccount(big.NewInt(0)) } else { - account = NewAddressFromData([]byte(data)) + account = NewAccountFromData([]byte(data)) } return } -func (s *State) UpdateAccount(addr []byte, account *Address) { +func (s *State) UpdateAccount(addr []byte, account *Account) { s.trie.Update(string(addr), string(account.RlpEncode())) } + +func (s *State) Cmp(other *State) bool { + return s.trie.Cmp(other.trie) +} diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go index cd09bf02e..763560570 100644 --- a/ethchain/transaction_pool.go +++ b/ethchain/transaction_pool.go @@ -104,7 +104,7 @@ func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error } }() // Get the sender - sender := block.GetAddr(tx.Sender()) + sender := block.state.GetAccount(tx.Sender()) // Make sure there's enough in the sender's account. Having insufficient // funds won't invalidate this transaction but simple ignores it. @@ -122,7 +122,7 @@ func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error } // Get the receiver - receiver := block.GetAddr(tx.Recipient) + receiver := block.state.GetAccount(tx.Recipient) sender.Nonce += 1 // Send Tx to self @@ -136,10 +136,10 @@ func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error // Add the amount to receivers account which should conclude this transaction receiver.Amount.Add(receiver.Amount, tx.Value) - block.UpdateAddr(tx.Recipient, receiver) + block.state.UpdateAccount(tx.Recipient, receiver) } - block.UpdateAddr(tx.Sender(), sender) + block.state.UpdateAccount(tx.Sender(), sender) log.Printf("[TXPL] Processed Tx %x\n", tx.Hash()) diff --git a/ethchain/vm.go b/ethchain/vm.go index c7a91a9c5..7e119ac99 100644 --- a/ethchain/vm.go +++ b/ethchain/vm.go @@ -330,7 +330,7 @@ out: // Load the value in storage and push it on the stack x := vm.stack.Pop() // decode the object as a big integer - decoder := ethutil.NewValueFromBytes([]byte(contract.State().Get(x.String()))) + decoder := contract.Addr(x.Bytes()) if !decoder.IsNil() { vm.stack.Push(decoder.BigInt()) } else { @@ -375,7 +375,7 @@ out: case oSUICIDE: recAddr := vm.stack.Pop().Bytes() // Purge all memory - deletedMemory := contract.state.NewIterator().Purge() + deletedMemory := contract.state.Purge() // Add refunds to the pop'ed address refund := new(big.Int).Mul(StoreFee, big.NewInt(int64(deletedMemory))) account := state.GetAccount(recAddr) diff --git a/ethutil/common_test.go b/ethutil/common_test.go index 3a6a37ff5..b5c733ff3 100644 --- a/ethutil/common_test.go +++ b/ethutil/common_test.go @@ -1,17 +1,44 @@ package ethutil import ( - "fmt" "math/big" "testing" ) func TestCommon(t *testing.T) { - fmt.Println(CurrencyToString(BigPow(10, 19))) - fmt.Println(CurrencyToString(BigPow(10, 16))) - fmt.Println(CurrencyToString(BigPow(10, 13))) - fmt.Println(CurrencyToString(BigPow(10, 10))) - fmt.Println(CurrencyToString(BigPow(10, 7))) - fmt.Println(CurrencyToString(BigPow(10, 4))) - fmt.Println(CurrencyToString(big.NewInt(10))) + ether := CurrencyToString(BigPow(10, 19)) + finney := CurrencyToString(BigPow(10, 16)) + szabo := CurrencyToString(BigPow(10, 13)) + vito := CurrencyToString(BigPow(10, 10)) + turing := CurrencyToString(BigPow(10, 7)) + eins := CurrencyToString(BigPow(10, 4)) + wei := CurrencyToString(big.NewInt(10)) + + if ether != "10 Ether" { + t.Error("Got", ether) + } + + if finney != "10 Finney" { + t.Error("Got", finney) + } + + if szabo != "10 Szabo" { + t.Error("Got", szabo) + } + + if vito != "10 Vito" { + t.Error("Got", vito) + } + + if turing != "10 Turing" { + t.Error("Got", turing) + } + + if eins != "10 Eins" { + t.Error("Got", eins) + } + + if wei != "10 Wei" { + t.Error("Got", wei) + } } diff --git a/ethutil/reactor.go b/ethutil/reactor.go new file mode 100644 index 000000000..f8084986c --- /dev/null +++ b/ethutil/reactor.go @@ -0,0 +1,86 @@ +package ethutil + +import ( + "sync" +) + +type ReactorEvent struct { + mut sync.Mutex + event string + chans []chan React +} + +// Post the specified reactor resource on the channels +// currently subscribed +func (e *ReactorEvent) Post(react React) { + e.mut.Lock() + defer e.mut.Unlock() + + for _, ch := range e.chans { + go func(ch chan React) { + ch <- react + }(ch) + } +} + +// Add a subscriber to this event +func (e *ReactorEvent) Add(ch chan React) { + e.mut.Lock() + defer e.mut.Unlock() + + e.chans = append(e.chans, ch) +} + +// Remove a subscriber +func (e *ReactorEvent) Remove(ch chan React) { + e.mut.Lock() + defer e.mut.Unlock() + + for i, c := range e.chans { + if c == ch { + e.chans = append(e.chans[:i], e.chans[i+1:]...) + } + } +} + +// Basic reactor resource +type React struct { + Resource interface{} +} + +// The reactor basic engine. Acts as bridge +// between the events and the subscribers/posters +type ReactorEngine struct { + patterns map[string]*ReactorEvent +} + +func NewReactorEngine() *ReactorEngine { + return &ReactorEngine{patterns: make(map[string]*ReactorEvent)} +} + +// Subscribe a channel to the specified event +func (reactor *ReactorEngine) Subscribe(event string, ch chan React) { + ev := reactor.patterns[event] + // Create a new event if one isn't available + if ev == nil { + ev = &ReactorEvent{event: event} + reactor.patterns[event] = ev + } + + // Add the channel to reactor event handler + ev.Add(ch) +} + +func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) { + ev := reactor.patterns[event] + if ev != nil { + ev.Remove(ch) + } +} + +func (reactor *ReactorEngine) Post(event string, resource interface{}) { + ev := reactor.patterns[event] + if ev != nil { + ev.Post(React{Resource: resource}) + } +} diff --git a/ethutil/reactor_test.go b/ethutil/reactor_test.go new file mode 100644 index 000000000..48c2f0df3 --- /dev/null +++ b/ethutil/reactor_test.go @@ -0,0 +1,30 @@ +package ethutil + +import "testing" + +func TestReactorAdd(t *testing.T) { + engine := NewReactorEngine() + ch := make(chan React) + engine.Subscribe("test", ch) + if len(engine.patterns) != 1 { + t.Error("Expected patterns to be 1, got", len(engine.patterns)) + } +} + +func TestReactorEvent(t *testing.T) { + engine := NewReactorEngine() + + // Buffer 1, so it doesn't block for this test + ch := make(chan React, 1) + engine.Subscribe("test", ch) + engine.Post("test", "hello") + + value := <-ch + if val, ok := value.Resource.(string); ok { + if val != "hello" { + t.Error("Expected Resource to be 'hello', got", val) + } + } else { + t.Error("Unable to cast") + } +} @@ -17,6 +17,8 @@ import ( const ( // The size of the output buffer for writing messages outputBufferSize = 50 + // Current protocol version + ProtocolVersion = 7 ) type DiscReason byte @@ -469,7 +471,7 @@ func (p *Peer) pushHandshake() error { pubkey := ethutil.NewValueFromBytes(data).Get(2).Bytes() msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ - uint32(5), uint32(0), p.Version, byte(p.caps), p.port, pubkey, + uint32(ProtocolVersion), uint32(0), p.Version, byte(p.caps), p.port, pubkey, }) p.QueueMessage(msg) @@ -496,7 +498,7 @@ func (p *Peer) pushPeers() { func (p *Peer) handleHandshake(msg *ethwire.Msg) { c := msg.Data - if c.Get(0).Uint() != 5 { + if c.Get(0).Uint() != ProtocolVersion { ethutil.Config.Log.Debugln("Invalid peer version. Require protocol v5") p.Stop() return |