diff options
58 files changed, 962 insertions, 446 deletions
diff --git a/Dockerfile b/Dockerfile index 6bf13dc31..947f045e5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,13 @@ -FROM alpine:3.5 +FROM alpine:3.6 ADD . /go-ethereum RUN \ - apk add --update git go make gcc musl-dev linux-headers && \ - (cd go-ethereum && make geth) && \ - cp go-ethereum/build/bin/geth /usr/local/bin/ && \ - apk del git go make gcc musl-dev linux-headers && \ - rm -rf /go-ethereum && rm -rf /var/cache/apk/* + apk add --no-cache git go make gcc musl-dev linux-headers && \ + (cd go-ethereum && make geth) && \ + cp go-ethereum/build/bin/geth /usr/local/bin/ && \ + apk del git go make gcc musl-dev linux-headers && \ + rm -rf /go-ethereum -EXPOSE 8545 -EXPOSE 30303 -EXPOSE 30303/udp +EXPOSE 8545 30303 30303/udp ENTRYPOINT ["geth"] @@ -116,7 +116,7 @@ To get an idea how the file should look like you can use the `dumpconfig` subcom $ geth --your-favourite-flags dumpconfig ``` -*Note: This works only with geth v1.6.0 and above* +*Note: This works only with geth v1.6.0 and above.* #### Docker quick start diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 14bf7bd75..e0ee06a0a 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -61,7 +60,7 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend { database, _ := ethdb.NewMemDatabase() genesis := core.Genesis{Config: params.AllProtocolChanges, Alloc: alloc} genesis.MustCommit(database) - blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), vm.Config{}) backend := &SimulatedBackend{database: database, blockchain: blockchain, config: genesis.Config} backend.rollback() return backend diff --git a/cmd/puppeth/module_ethstats.go b/cmd/puppeth/module_ethstats.go index da34acb3b..5d3fa5fc0 100644 --- a/cmd/puppeth/module_ethstats.go +++ b/cmd/puppeth/module_ethstats.go @@ -42,7 +42,7 @@ RUN \ WORKDIR /eth-netstats EXPOSE 3000 -RUN echo 'module.exports = {trusted: [{{.Trusted}}], banned: []};' > lib/utils/config.js +RUN echo 'module.exports = {trusted: [{{.Trusted}}], banned: [{{.Banned}}]};' > lib/utils/config.js CMD ["npm", "start"] ` @@ -59,7 +59,8 @@ services: - "{{.Port}}:3000"{{end}} environment: - WS_SECRET={{.Secret}}{{if .VHost}} - - VIRTUAL_HOST={{.VHost}}{{end}} + - VIRTUAL_HOST={{.VHost}}{{end}}{{if .Banned}} + - BANNED={{.Banned}}{{end}} logging: driver: "json-file" options: @@ -71,18 +72,24 @@ services: // deployEthstats deploys a new ethstats container to a remote machine via SSH, // docker and docker-compose. If an instance with the specified network name // already exists there, it will be overwritten! -func deployEthstats(client *sshClient, network string, port int, secret string, vhost string, trusted []string) ([]byte, error) { +func deployEthstats(client *sshClient, network string, port int, secret string, vhost string, trusted []string, banned []string) ([]byte, error) { // Generate the content to upload to the server workdir := fmt.Sprintf("%d", rand.Int63()) files := make(map[string][]byte) + trustedLabels := make([]string, len(trusted)) for i, address := range trusted { - trusted[i] = fmt.Sprintf("\"%s\"", address) + trustedLabels[i] = fmt.Sprintf("\"%s\"", address) + } + bannedLabels := make([]string, len(banned)) + for i, address := range banned { + bannedLabels[i] = fmt.Sprintf("\"%s\"", address) } dockerfile := new(bytes.Buffer) template.Must(template.New("").Parse(ethstatsDockerfile)).Execute(dockerfile, map[string]interface{}{ - "Trusted": strings.Join(trusted, ", "), + "Trusted": strings.Join(trustedLabels, ", "), + "Banned": strings.Join(bannedLabels, ", "), }) files[filepath.Join(workdir, "Dockerfile")] = dockerfile.Bytes() @@ -92,6 +99,7 @@ func deployEthstats(client *sshClient, network string, port int, secret string, "Port": port, "Secret": secret, "VHost": vhost, + "Banned": strings.Join(banned, ","), }) files[filepath.Join(workdir, "docker-compose.yaml")] = composefile.Bytes() @@ -112,11 +120,12 @@ type ethstatsInfos struct { port int secret string config string + banned []string } // String implements the stringer interface. func (info *ethstatsInfos) String() string { - return fmt.Sprintf("host=%s, port=%d, secret=%s", info.host, info.port, info.secret) + return fmt.Sprintf("host=%s, port=%d, secret=%s, banned=%v", info.host, info.port, info.secret, info.banned) } // checkEthstats does a health-check against an ethstats server to verify whether @@ -150,6 +159,9 @@ func checkEthstats(client *sshClient, network string) (*ethstatsInfos, error) { if port != 80 && port != 443 { config += fmt.Sprintf(":%d", port) } + // Retrieve the IP blacklist + banned := strings.Split(infos.envvars["BANNED"], ",") + // Run a sanity check to see if the port is reachable if err = checkPort(host, port); err != nil { log.Warn("Ethstats service seems unreachable", "server", host, "port", port, "err", err) @@ -160,5 +172,6 @@ func checkEthstats(client *sshClient, network string) (*ethstatsInfos, error) { port: port, secret: secret, config: config, + banned: banned, }, nil } diff --git a/cmd/puppeth/wizard.go b/cmd/puppeth/wizard.go index f19bcbbcd..518741279 100644 --- a/cmd/puppeth/wizard.go +++ b/cmd/puppeth/wizard.go @@ -22,6 +22,7 @@ import ( "fmt" "io/ioutil" "math/big" + "net" "os" "path/filepath" "sort" @@ -277,3 +278,26 @@ func (w *wizard) readJSON() string { return string(blob) } } + +// readIPAddress reads a single line from stdin, trimming if from spaces and +// converts it to a network IP address. +func (w *wizard) readIPAddress() net.IP { + for { + // Read the IP address from the user + fmt.Printf("> ") + text, err := w.in.ReadString('\n') + if err != nil { + log.Crit("Failed to read user input", "err", err) + } + if text = strings.TrimSpace(text); text == "" { + return nil + } + // Make sure it looks ok and return it if so + ip := net.ParseIP(text) + if ip == nil { + log.Error("Invalid IP address, please retry") + continue + } + return ip + } +} diff --git a/cmd/puppeth/wizard_ethstats.go b/cmd/puppeth/wizard_ethstats.go index c117a6027..504d8fd9c 100644 --- a/cmd/puppeth/wizard_ethstats.go +++ b/cmd/puppeth/wizard_ethstats.go @@ -60,6 +60,22 @@ func (w *wizard) deployEthstats() { fmt.Printf("What should be the secret password for the API? (default = %s)\n", infos.secret) infos.secret = w.readDefaultString(infos.secret) } + // Gather any blacklists to ban from reporting + fmt.Println() + fmt.Printf("Keep existing IP %v blacklist (y/n)? (default = yes)\n", infos.banned) + if w.readDefaultString("y") != "y" { + infos.banned = nil + + fmt.Println() + fmt.Println("Which IP addresses should be blacklisted?") + for { + if ip := w.readIPAddress(); ip != nil { + infos.banned = append(infos.banned, ip.String()) + continue + } + break + } + } // Try to deploy the ethstats server on the host trusted := make([]string, 0, len(w.servers)) for _, client := range w.servers { @@ -67,7 +83,7 @@ func (w *wizard) deployEthstats() { trusted = append(trusted, client.address) } } - if out, err := deployEthstats(client, w.network, infos.port, infos.secret, infos.host, trusted); err != nil { + if out, err := deployEthstats(client, w.network, infos.port, infos.secret, infos.host, trusted, infos.banned); err != nil { log.Error("Failed to deploy ethstats container", "err", err) if len(out) > 0 { fmt.Printf("%s\n", out) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 04728b5c6..b25708471 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -41,7 +41,6 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethstats" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/les" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -1103,7 +1102,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai Fatalf("%v", err) } vmcfg := vm.Config{EnablePreimageRecording: ctx.GlobalBool(VMEnableDebugFlag.Name)} - chain, err = core.NewBlockChain(chainDb, config, engine, new(event.TypeMux), vmcfg) + chain, err = core.NewBlockChain(chainDb, config, engine, vmcfg) if err != nil { Fatalf("Can't create BlockChain: %v", err) } diff --git a/core/bench_test.go b/core/bench_test.go index b9250f7d3..ab25c27d3 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -175,8 +174,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { // Time the insertion of the new chain. // State and blocks are stored in the same DB. - evmux := new(event.TypeMux) - chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer chainman.Stop() b.ReportAllocs() b.ResetTimer() @@ -286,7 +284,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) { if err != nil { b.Fatalf("error opening database at %v: %v", dir, err) } - chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) if err != nil { b.Fatalf("error creating chain: %v", err) } diff --git a/core/block_validator_test.go b/core/block_validator_test.go index c0afc2955..6d54c2b93 100644 --- a/core/block_validator_test.go +++ b/core/block_validator_test.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -43,7 +42,7 @@ func TestHeaderVerification(t *testing.T) { headers[i] = block.Header() } // Run the header checker for blocks one-by-one, checking for both valid and invalid nonces - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) defer chain.Stop() for i := 0; i < len(blocks); i++ { @@ -107,11 +106,11 @@ func testHeaderConcurrentVerification(t *testing.T, threads int) { var results <-chan error if valid { - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{}) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } else { - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{}) _, results = chain.engine.VerifyHeaders(chain, headers, seals) chain.Stop() } @@ -174,7 +173,7 @@ func testHeaderConcurrentAbortion(t *testing.T, threads int) { defer runtime.GOMAXPROCS(old) // Start the verifications and immediately abort - chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), new(event.TypeMux), vm.Config{}) + chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{}) defer chain.Stop() abort, results := chain.engine.VerifyHeaders(chain, headers, seals) diff --git a/core/blockchain.go b/core/blockchain.go index 3fb8be15f..f3ca4e08c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -79,10 +79,16 @@ const ( type BlockChain struct { config *params.ChainConfig // chain & network configuration - hc *HeaderChain - chainDb ethdb.Database - eventMux *event.TypeMux - genesisBlock *types.Block + hc *HeaderChain + chainDb ethdb.Database + rmTxFeed event.Feed + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock @@ -115,7 +121,7 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux, vmConfig vm.Config) (*BlockChain, error) { +func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -126,7 +132,6 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co config: config, chainDb: chainDb, stateCache: state.NewDatabase(chainDb), - eventMux: mux, quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -594,6 +599,8 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } + // Unsubscribe all subscriptions registered from blockchain + bc.scope.Close() close(bc.quit) atomic.StoreInt32(&bc.procInterrupt, 1) @@ -1000,6 +1007,12 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) + // We need some control over the mining operation. Acquiring locks and waiting + // for the miner to create new block takes too long and in most cases isn't + // even necessary. + if bc.LastBlockHash() == block.Hash() { + events = append(events, ChainHeadEvent{block}) + } // Write the positional metadata for transaction and receipt lookups if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { @@ -1024,7 +1037,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { stats.usedGas += usedGas.Uint64() stats.report(chain, i) } - go bc.postChainEvents(events, coalescedLogs) + go bc.PostChainEvents(events, coalescedLogs) return 0, nil } @@ -1184,16 +1197,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Must be posted in a goroutine because of the transaction pool trying // to acquire the chain manager lock if len(diff) > 0 { - go bc.eventMux.Post(RemovedTransactionEvent{diff}) + go bc.rmTxFeed.Send(RemovedTransactionEvent{diff}) } if len(deletedLogs) > 0 { - go bc.eventMux.Post(RemovedLogsEvent{deletedLogs}) + go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } if len(oldChain) > 0 { go func() { for _, block := range oldChain { - bc.eventMux.Post(ChainSideEvent{Block: block}) + bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } }() } @@ -1201,22 +1214,25 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } -// postChainEvents iterates over the events generated by a chain insertion and -// posts them into the event mux. -func (bc *BlockChain) postChainEvents(events []interface{}, logs []*types.Log) { +// PostChainEvents iterates over the events generated by a chain insertion and +// posts them into the event feed. +// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. +func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { // post event logs for further processing - bc.eventMux.Post(logs) + if logs != nil { + bc.logsFeed.Send(logs) + } for _, event := range events { - if event, ok := event.(ChainEvent); ok { - // We need some control over the mining operation. Acquiring locks and waiting - // for the miner to create new block takes too long and in most cases isn't - // even necessary. - if bc.LastBlockHash() == event.Hash { - bc.eventMux.Post(ChainHeadEvent{event.Block}) - } + switch ev := event.(type) { + case ChainEvent: + bc.chainFeed.Send(ev) + + case ChainHeadEvent: + bc.chainHeadFeed.Send(ev) + + case ChainSideEvent: + bc.chainSideFeed.Send(ev) } - // Fire the insertion events individually too - bc.eventMux.Post(event) } } @@ -1384,3 +1400,33 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config } // Engine retrieves the blockchain's consensus engine. func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } + +// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent. +func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { + return bc.scope.Track(bc.rmTxFeed.Subscribe(ch)) +} + +// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent. +func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription { + return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch)) +} + +// SubscribeChainEvent registers a subscription of ChainEvent. +func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription { + return bc.scope.Track(bc.chainFeed.Subscribe(ch)) +} + +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) +} + +// SubscribeChainSideEvent registers a subscription of ChainSideEvent. +func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { + return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) +} + +// SubscribeLogsEvent registers a subscription of []*types.Log. +func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return bc.scope.Track(bc.logsFeed.Subscribe(ch)) +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 4a0f44940..470974a0a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -31,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -47,7 +46,7 @@ func newTestBlockChain(fake bool) *BlockChain { if !fake { engine = ethash.NewTester() } - blockchain, err := NewBlockChain(db, gspec.Config, engine, new(event.TypeMux), vm.Config{}) + blockchain, err := NewBlockChain(db, gspec.Config, engine, vm.Config{}) if err != nil { panic(err) } @@ -497,7 +496,7 @@ func testReorgBadHashes(t *testing.T, full bool) { } // Create a new BlockChain and check that it rolled back the state. - ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), vm.Config{}) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } @@ -610,7 +609,7 @@ func TestFastVsFullChains(t *testing.T) { // Import the chain as an archive node for the comparison baseline archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer archive.Stop() if n, err := archive.InsertChain(blocks); err != nil { @@ -619,7 +618,7 @@ func TestFastVsFullChains(t *testing.T) { // Fast import the chain as a non-archive node to test fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -697,7 +696,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { archiveDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(archiveDb) - archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := archive.InsertChain(blocks); err != nil { t.Fatalf("failed to process block %d: %v", n, err) } @@ -710,7 +709,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { // Import the chain as a non-archive node and ensure all pointers are updated fastDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(fastDb) - fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{}) defer fast.Stop() headers := make([]*types.Header, len(blocks)) @@ -731,7 +730,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) { lightDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(lightDb) - light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), vm.Config{}) if n, err := light.InsertHeaderChain(headers, 1); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) } @@ -800,8 +799,7 @@ func TestChainTxReorgs(t *testing.T) { } }) // Import the chain. This runs all block validation rules. - evmux := &event.TypeMux{} - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) if i, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert original chain[%d]: %v", i, err) } @@ -872,11 +870,11 @@ func TestLogReorgs(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - var evmux event.TypeMux - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() - subs := evmux.Subscribe(RemovedLogsEvent{}) + rmLogsCh := make(chan RemovedLogsEvent) + blockchain.SubscribeRemovedLogsEvent(rmLogsCh) chain, _ := GenerateChain(params.TestChainConfig, genesis, db, 2, func(i int, gen *BlockGen) { if i == 1 { tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code), signer, key1) @@ -895,9 +893,14 @@ func TestLogReorgs(t *testing.T) { t.Fatalf("failed to insert forked chain: %v", err) } - ev := <-subs.Chan() - if len(ev.Data.(RemovedLogsEvent).Logs) == 0 { - t.Error("expected logs") + timeout := time.NewTimer(1 * time.Second) + select { + case ev := <-rmLogsCh: + if len(ev.Logs) == 0 { + t.Error("expected logs") + } + case <-timeout.C: + t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") } } @@ -914,8 +917,7 @@ func TestReorgSideEvent(t *testing.T) { signer = types.NewEIP155Signer(gspec.Config.ChainId) ) - evmux := &event.TypeMux{} - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() chain, _ := GenerateChain(gspec.Config, genesis, db, 3, func(i int, gen *BlockGen) {}) @@ -933,7 +935,8 @@ func TestReorgSideEvent(t *testing.T) { } gen.AddTx(tx) }) - subs := evmux.Subscribe(ChainSideEvent{}) + chainSideCh := make(chan ChainSideEvent) + blockchain.SubscribeChainSideEvent(chainSideCh) if _, err := blockchain.InsertChain(replacementBlocks); err != nil { t.Fatalf("failed to insert chain: %v", err) } @@ -956,8 +959,8 @@ func TestReorgSideEvent(t *testing.T) { done: for { select { - case ev := <-subs.Chan(): - block := ev.Data.(ChainSideEvent).Block + case ev := <-chainSideCh: + block := ev.Block if _, ok := expectedSideHashes[block.Hash()]; !ok { t.Errorf("%d: didn't expect %x to be in side chain", i, block.Hash()) } @@ -977,7 +980,7 @@ done: // make sure no more events are fired select { - case e := <-subs.Chan(): + case e := <-chainSideCh: t.Errorf("unexpected event fired: %v", e) case <-time.After(250 * time.Millisecond): } @@ -1038,10 +1041,9 @@ func TestEIP155Transition(t *testing.T) { Alloc: GenesisAlloc{address: {Balance: funds}, deleteAddr: {Balance: new(big.Int)}}, } genesis = gspec.MustCommit(db) - mux event.TypeMux ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &mux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, db, 4, func(i int, block *BlockGen) { @@ -1148,9 +1150,8 @@ func TestEIP161AccountRemoval(t *testing.T) { Alloc: GenesisAlloc{address: {Balance: funds}}, } genesis = gspec.MustCommit(db) - mux event.TypeMux ) - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &mux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() blocks, _ := GenerateChain(gspec.Config, genesis, db, 3, func(i int, block *BlockGen) { diff --git a/core/chain_makers.go b/core/chain_makers.go index 976a8114d..cb5825d18 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -236,7 +235,7 @@ func newCanonical(n int, full bool) (ethdb.Database, *BlockChain, error) { db, _ := ethdb.NewMemDatabase() genesis := gspec.MustCommit(db) - blockchain, _ := NewBlockChain(db, params.AllProtocolChanges, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + blockchain, _ := NewBlockChain(db, params.AllProtocolChanges, ethash.NewFaker(), vm.Config{}) // Create and inject the requested chain if n == 0 { return db, blockchain, nil diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index 28eb76c63..2260c62fb 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -80,9 +79,7 @@ func ExampleGenerateChain() { }) // Import the chain. This runs all block validation rules. - evmux := &event.TypeMux{} - - blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{}) + blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{}) defer blockchain.Stop() if i, err := blockchain.InsertChain(chain); err != nil { diff --git a/core/dao_test.go b/core/dao_test.go index 99bf1ecae..d6e11d78a 100644 --- a/core/dao_test.go +++ b/core/dao_test.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -42,13 +41,13 @@ func TestDAOForkRangeExtradata(t *testing.T) { proDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(proDb) proConf := ¶ms.ChainConfig{HomesteadBlock: big.NewInt(0), DAOForkBlock: forkBlock, DAOForkSupport: true} - proBc, _ := NewBlockChain(proDb, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + proBc, _ := NewBlockChain(proDb, proConf, ethash.NewFaker(), vm.Config{}) defer proBc.Stop() conDb, _ := ethdb.NewMemDatabase() gspec.MustCommit(conDb) conConf := ¶ms.ChainConfig{HomesteadBlock: big.NewInt(0), DAOForkBlock: forkBlock, DAOForkSupport: false} - conBc, _ := NewBlockChain(conDb, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + conBc, _ := NewBlockChain(conDb, conConf, ethash.NewFaker(), vm.Config{}) defer conBc.Stop() if _, err := proBc.InsertChain(prefix); err != nil { @@ -62,8 +61,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a pro-fork block, and try to feed into the no-fork chain db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -85,8 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Create a no-fork block, and try to feed into the pro-fork chain db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) @@ -109,8 +106,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that contra-forkers accept pro-fork extra-datas after forking finishes db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64())) @@ -127,8 +123,7 @@ func TestDAOForkRangeExtradata(t *testing.T) { // Verify that pro-forkers accept contra-fork extra-datas after forking finishes db, _ = ethdb.NewMemDatabase() gspec.MustCommit(db) - - bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) + bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), vm.Config{}) defer bc.Stop() blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64())) diff --git a/core/genesis_test.go b/core/genesis_test.go index 8b193759f..482f86190 100644 --- a/core/genesis_test.go +++ b/core/genesis_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -119,9 +118,8 @@ func TestSetupGenesis(t *testing.T) { // Commit the 'old' genesis block with Homestead transition at #2. // Advance to block #4, past the homestead transition block of customg. genesis := oldcustomg.MustCommit(db) - bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{}) + bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), vm.Config{}) defer bc.Stop() - bc.SetValidator(bproc{}) bc.InsertChain(makeBlockChainWithDiff(genesis, []int{2, 3, 4, 5}, 0)) bc.CurrentBlock() diff --git a/core/tx_pool.go b/core/tx_pool.go index b0c251f92..d835c94d1 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -34,6 +34,13 @@ import ( "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) +const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // rmTxChanSize is the size of channel listening to RemovedTransactionEvent. + rmTxChanSize = 10 +) + var ( // ErrInvalidSender is returned if the transaction contains an invalid signature. ErrInvalidSender = errors.New("invalid sender") @@ -95,7 +102,14 @@ var ( underpricedTxCounter = metrics.NewCounter("txpool/underpriced") ) -type stateFn func() (*state.StateDB, error) +// blockChain provides the state of blockchain and current gas limit to do +// some pre checks in tx pool and event subscribers. +type blockChain interface { + State() (*state.StateDB, error) + GasLimit() *big.Int + SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription + SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription +} // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { @@ -160,12 +174,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { type TxPool struct { config TxPoolConfig chainconfig *params.ChainConfig - currentState stateFn // The state function which will allow us to do some pre checks + blockChain blockChain pendingState *state.ManagedState - gasLimit func() *big.Int // The current gas limit function callback gasPrice *big.Int - eventMux *event.TypeMux - events *event.TypeMuxSubscription + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + rmTxCh chan RemovedTransactionEvent + rmTxSub event.Subscription signer types.Signer mu sync.RWMutex @@ -185,7 +202,7 @@ type TxPool struct { // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() @@ -193,17 +210,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e pool := &TxPool{ config: config, chainconfig: chainconfig, + blockChain: blockChain, signer: types.NewEIP155Signer(chainconfig.ChainId), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), all: make(map[common.Hash]*types.Transaction), - eventMux: eventMux, - currentState: currentStateFn, - gasLimit: gasLimitFn, + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, - events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) @@ -220,6 +236,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e log.Warn("Failed to rotate transaction journal", "err", err) } } + // Subscribe events from blockchain + pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh) + pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh) // Start the event loop and return pool.wg.Add(1) go pool.loop() @@ -248,25 +267,27 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { - // Handle any events fired by the system - case ev, ok := <-pool.events.Chan(): - if !ok { - return - } - switch ev := ev.Data.(type) { - case ChainHeadEvent: - pool.mu.Lock() - if ev.Block != nil { - if pool.chainconfig.IsHomestead(ev.Block.Number()) { - pool.homestead = true - } + // Handle ChainHeadEvent + case ev := <-pool.chainHeadCh: + pool.mu.Lock() + if ev.Block != nil { + if pool.chainconfig.IsHomestead(ev.Block.Number()) { + pool.homestead = true } - pool.reset() - pool.mu.Unlock() - case RemovedTransactionEvent: - pool.addTxs(ev.Txs, false) } + pool.reset() + pool.mu.Unlock() + // Be unsubscribed due to system stopped + case <-pool.chainHeadSub.Err(): + return + + // Handle RemovedTransactionEvent + case ev := <-pool.rmTxCh: + pool.addTxs(ev.Txs, false) + // Be unsubscribed due to system stopped + case <-pool.rmTxSub.Err(): + return // Handle stats reporting ticks case <-report.C: @@ -322,7 +343,7 @@ func (pool *TxPool) lockedReset() { // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. func (pool *TxPool) reset() { - currentState, err := pool.currentState() + currentState, err := pool.blockChain.State() if err != nil { log.Error("Failed reset txpool state", "err", err) return @@ -347,7 +368,11 @@ func (pool *TxPool) reset() { // Stop terminates the transaction pool. func (pool *TxPool) Stop() { - pool.events.Unsubscribe() + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain + pool.chainHeadSub.Unsubscribe() + pool.rmTxSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { @@ -356,6 +381,12 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } +// SubscribeTxPreEvent registers a subscription of TxPreEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) +} + // GasPrice returns the current gas price enforced by the transaction pool. func (pool *TxPool) GasPrice() *big.Int { pool.mu.RLock() @@ -468,7 +499,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. - if pool.gasLimit().Cmp(tx.Gas()) < 0 { + if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 { return ErrGasLimit } // Make sure the transaction is signed properly @@ -482,7 +513,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering - currentState, err := pool.currentState() + currentState, err := pool.blockChain.State() if err != nil { return err } @@ -647,7 +678,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) - go pool.eventMux.Post(TxPreEvent{tx}) + go pool.txFeed.Send(TxPreEvent{tx}) } // AddLocal enqueues a single transaction into the pool if it is valid, marking @@ -690,7 +721,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // If we added a new transaction, run promotion checks and return if !replace { - state, err := pool.currentState() + state, err := pool.blockChain.State() if err != nil { return err } @@ -717,7 +748,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { } // Only reprocess the internal state if something was actually added if len(dirty) > 0 { - state, err := pool.currentState() + state, err := pool.blockChain.State() if err != nil { return err } @@ -804,7 +835,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) { - gaslimit := pool.gasLimit() + gaslimit := pool.blockChain.GasLimit() // Gather all the accounts potentially needing updates if accounts == nil { @@ -973,7 +1004,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { - gaslimit := pool.gasLimit() + gaslimit := pool.blockChain.GasLimit() // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index fcb330051..ee1ddd4bb 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -44,6 +44,29 @@ func init() { testTxPoolConfig.Journal = "" } +type testBlockChain struct { + statedb *state.StateDB + gasLimit *big.Int + chainHeadFeed *event.Feed + rmTxFeed *event.Feed +} + +func (bc *testBlockChain) State() (*state.StateDB, error) { + return bc.statedb, nil +} + +func (bc *testBlockChain) GasLimit() *big.Int { + return new(big.Int).Set(bc.gasLimit) +} + +func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.chainHeadFeed.Subscribe(ch) +} + +func (bc *testBlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription { + return bc.rmTxFeed.Subscribe(ch) +} + func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction { return pricedTransaction(nonce, gaslimit, big.NewInt(1), key) } @@ -56,9 +79,10 @@ func pricedTransaction(nonce uint64, gaslimit, gasprice *big.Int, key *ecdsa.Pri func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} key, _ := crypto.GenerateKey() - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) return pool, key } @@ -96,6 +120,31 @@ func deriveSender(tx *types.Transaction) (common.Address, error) { return types.Sender(types.HomesteadSigner{}, tx) } +type testChain struct { + *testBlockChain + address common.Address + trigger *bool +} + +// testChain.State() is used multiple times to reset the pending state. +// when simulate is true it will create a state that indicates +// that tx0 and tx1 are included in the chain. +func (c *testChain) State() (*state.StateDB, error) { + // delay "state change" by one. The tx pool fetches the + // state multiple times and by delaying it a bit we simulate + // a state change between those fetches. + stdb := c.statedb + if *c.trigger { + db, _ := ethdb.NewMemDatabase() + c.statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) + // simulate that the new head block included tx0 and tx1 + c.statedb.SetNonce(c.address, 2) + c.statedb.SetBalance(c.address, new(big.Int).SetUint64(params.Ether)) + *c.trigger = false + } + return stdb, nil +} + // This test simulates a scenario where a new block is imported during a // state reset and tests whether the pending state is in sync with the // block head event that initiated the resetState(). @@ -104,38 +153,18 @@ func TestStateChangeDuringPoolReset(t *testing.T) { db, _ = ethdb.NewMemDatabase() key, _ = crypto.GenerateKey() address = crypto.PubkeyToAddress(key.PublicKey) - mux = new(event.TypeMux) statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) trigger = false ) // setup pool with 2 transaction in it statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) + blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed), new(event.Feed)}, address, &trigger} tx0 := transaction(0, big.NewInt(100000), key) tx1 := transaction(1, big.NewInt(100000), key) - // stateFunc is used multiple times to reset the pending state. - // when simulate is true it will create a state that indicates - // that tx0 and tx1 are included in the chain. - stateFunc := func() (*state.StateDB, error) { - // delay "state change" by one. The tx pool fetches the - // state multiple times and by delaying it a bit we simulate - // a state change between those fetches. - stdb := statedb - if trigger { - statedb, _ = state.New(common.Hash{}, state.NewDatabase(db)) - // simulate that the new head block included tx0 and tx1 - statedb.SetNonce(address, 2) - statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) - trigger = false - } - return stdb, nil - } - - gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) } - - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() nonce := pool.State().GetNonce(address) @@ -176,7 +205,7 @@ func TestInvalidTransactions(t *testing.T) { tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1)) if err := pool.AddRemote(tx); err != ErrInsufficientFunds { t.Error("expected", ErrInsufficientFunds) @@ -211,7 +240,7 @@ func TestTransactionQueue(t *testing.T) { tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset() pool.enqueueTx(tx.Hash(), tx) @@ -241,7 +270,7 @@ func TestTransactionQueue(t *testing.T) { tx2 := transaction(10, big.NewInt(100), key) tx3 := transaction(11, big.NewInt(100), key) from, _ = deriveSender(tx1) - currentState, _ = pool.currentState() + currentState, _ = pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset() @@ -264,7 +293,7 @@ func TestRemoveTx(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(1)) tx1 := transaction(0, big.NewInt(100), key) @@ -296,7 +325,7 @@ func TestNegativeValue(t *testing.T) { tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1)) if err := pool.AddRemote(tx); err != ErrNegativeValue { t.Error("expected", ErrNegativeValue, "got", err) @@ -311,8 +340,8 @@ func TestTransactionChainFork(t *testing.T) { resetState := func() { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool.currentState = func() (*state.StateDB, error) { return statedb, nil } - currentState, _ := pool.currentState() + pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() } @@ -339,8 +368,8 @@ func TestTransactionDoubleNonce(t *testing.T) { resetState := func() { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool.currentState = func() (*state.StateDB, error) { return statedb, nil } - currentState, _ := pool.currentState() + pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() } @@ -358,7 +387,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if replace, err := pool.add(tx2, false); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } - state, _ := pool.currentState() + state, _ := pool.blockChain.State() pool.promoteExecutables(state, []common.Address{addr}) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) @@ -386,7 +415,7 @@ func TestMissingNonce(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(addr, big.NewInt(100000000000000)) tx := transaction(1, big.NewInt(100000), key) if _, err := pool.add(tx, false); err != nil { @@ -409,7 +438,7 @@ func TestNonceRecovery(t *testing.T) { defer pool.Stop() addr := crypto.PubkeyToAddress(key.PublicKey) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.SetNonce(addr, n) currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.lockedReset() @@ -431,11 +460,14 @@ func TestRemovedTxEvent(t *testing.T) { tx := transaction(0, big.NewInt(1000000), key) from, _ := deriveSender(tx) - currentState, _ := pool.currentState() + currentState, _ := pool.blockChain.State() currentState.AddBalance(from, big.NewInt(1000000000000)) pool.lockedReset() - pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) - pool.eventMux.Post(ChainHeadEvent{nil}) + blockChain, _ := pool.blockChain.(*testBlockChain) + blockChain.rmTxFeed.Send(RemovedTransactionEvent{types.Transactions{tx}}) + blockChain.chainHeadFeed.Send(ChainHeadEvent{nil}) + // wait for handling events + <-time.After(500 * time.Millisecond) if pool.pending[from].Len() != 1 { t.Error("expected 1 pending tx, got", pool.pending[from].Len()) } @@ -453,7 +485,7 @@ func TestTransactionDropping(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000)) // Add some pending and some queued transactions @@ -518,7 +550,7 @@ func TestTransactionDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - pool.gasLimit = func() *big.Int { return big.NewInt(100) } + pool.blockChain.(*testBlockChain).gasLimit = big.NewInt(100) pool.lockedReset() if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { @@ -548,7 +580,7 @@ func TestTransactionPostponing(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000)) // Add a batch consecutive pending transactions for validation @@ -624,7 +656,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) pool.lockedReset() @@ -667,16 +699,17 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.NoLocals = nolocals config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them (last one will be the local) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -757,19 +790,20 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { // Create the pool to test the non-expiration enforcement db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.Lifetime = time.Second config.NoLocals = nolocals - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey() - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) state.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) @@ -821,7 +855,7 @@ func TestTransactionPendingLimiting(t *testing.T) { account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) pool.lockedReset() @@ -853,7 +887,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { defer pool1.Stop() account1, _ := deriveSender(transaction(0, big.NewInt(0), key1)) - state1, _ := pool1.currentState() + state1, _ := pool1.blockChain.State() state1.AddBalance(account1, big.NewInt(1000000)) for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { @@ -866,7 +900,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { defer pool2.Stop() account2, _ := deriveSender(transaction(0, big.NewInt(0), key2)) - state2, _ := pool2.currentState() + state2, _ := pool2.blockChain.State() state2.AddBalance(account2, big.NewInt(1000000)) txns := []*types.Transaction{} @@ -900,15 +934,16 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -946,17 +981,18 @@ func TestTransactionCapClearsFromAll(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.AccountSlots = 2 config.AccountQueue = 2 config.GlobalSlots = 8 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() key, _ := crypto.GenerateKey() addr := crypto.PubkeyToAddress(key.PublicKey) @@ -980,15 +1016,16 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = 0 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { @@ -1028,12 +1065,13 @@ func TestTransactionPoolRepricing(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { @@ -1112,16 +1150,17 @@ func TestTransactionPoolUnderpricing(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.GlobalSlots = 2 config.GlobalQueue = 2 - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) defer pool.Stop() // Create a number of test accounts and fund them - state, _ := pool.currentState() + state, _ := pool.blockChain.State() keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { @@ -1199,14 +1238,15 @@ func TestTransactionReplacement(t *testing.T) { // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() // Create a test account to add transactions with key, _ := crypto.GenerateKey() - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000)) // Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) @@ -1278,19 +1318,20 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Create the original pool to inject transaction into the journal db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} config := testTxPoolConfig config.NoLocals = nolocals config.Journal = journal config.Rejournal = time.Second - pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool := NewTxPool(config, params.TestChainConfig, blockchain) // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() remote, _ := crypto.GenerateKey() - statedb, _ = pool.currentState() + statedb, _ = pool.blockChain.State() statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) @@ -1320,7 +1361,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() if queued != 0 { @@ -1344,7 +1386,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { time.Sleep(2 * config.Rejournal) pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)} + pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() if pending != 0 { @@ -1377,7 +1420,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { @@ -1403,7 +1446,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) for i := 0; i < size; i++ { @@ -1424,7 +1467,7 @@ func BenchmarkPoolInsert(b *testing.B) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) txs := make(types.Transactions, b.N) @@ -1449,7 +1492,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { defer pool.Stop() account, _ := deriveSender(transaction(0, big.NewInt(0), key)) - state, _ := pool.currentState() + state, _ := pool.blockChain.State() state.AddBalance(account, big.NewInt(1000000)) batches := make([]types.Transactions, b.N) diff --git a/core/vm/evm.go b/core/vm/evm.go index 448acd469..8d654c666 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -168,8 +168,10 @@ func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas // above we revert to the snapshot and consume any gas remaining. Additionally // when we're in homestead this also counts for code storage gas errors. if err != nil { - contract.UseGas(contract.Gas) evm.StateDB.RevertToSnapshot(snapshot) + if err != errExecutionReverted { + contract.UseGas(contract.Gas) + } } return ret, contract.Gas, err } @@ -207,10 +209,11 @@ func (evm *EVM) CallCode(caller ContractRef, addr common.Address, input []byte, ret, err = run(evm, snapshot, contract, input) if err != nil { - contract.UseGas(contract.Gas) evm.StateDB.RevertToSnapshot(snapshot) + if err != errExecutionReverted { + contract.UseGas(contract.Gas) + } } - return ret, contract.Gas, err } @@ -239,10 +242,11 @@ func (evm *EVM) DelegateCall(caller ContractRef, addr common.Address, input []by ret, err = run(evm, snapshot, contract, input) if err != nil { - contract.UseGas(contract.Gas) evm.StateDB.RevertToSnapshot(snapshot) + if err != errExecutionReverted { + contract.UseGas(contract.Gas) + } } - return ret, contract.Gas, err } @@ -281,8 +285,10 @@ func (evm *EVM) StaticCall(caller ContractRef, addr common.Address, input []byte // when we're in Homestead this also counts for code storage gas errors. ret, err = run(evm, snapshot, contract, input) if err != nil { - contract.UseGas(contract.Gas) evm.StateDB.RevertToSnapshot(snapshot) + if err != errExecutionReverted { + contract.UseGas(contract.Gas) + } } return ret, contract.Gas, err } @@ -339,18 +345,12 @@ func (evm *EVM) Create(caller ContractRef, code []byte, gas uint64, value *big.I // When an error was returned by the EVM or when setting the creation code // above we revert to the snapshot and consume any gas remaining. Additionally // when we're in homestead this also counts for code storage gas errors. - if maxCodeSizeExceeded || - (err != nil && (evm.ChainConfig().IsHomestead(evm.BlockNumber) || err != ErrCodeStoreOutOfGas)) { - contract.UseGas(contract.Gas) + if maxCodeSizeExceeded || (err != nil && (evm.ChainConfig().IsHomestead(evm.BlockNumber) || err != ErrCodeStoreOutOfGas)) { evm.StateDB.RevertToSnapshot(snapshot) + if err != errExecutionReverted { + contract.UseGas(contract.Gas) + } } - // If the vm returned with an error the return value should be set to nil. - // This isn't consensus critical but merely to for behaviour reasons such as - // tests, RPC calls, etc. - if err != nil { - ret = nil - } - return ret, contractAddr, contract.Gas, err } diff --git a/core/vm/gas_table.go b/core/vm/gas_table.go index a6346bd80..3b0698ce9 100644 --- a/core/vm/gas_table.go +++ b/core/vm/gas_table.go @@ -396,6 +396,10 @@ func gasReturn(gt params.GasTable, evm *EVM, contract *Contract, stack *Stack, m return memoryGasCost(mem, memorySize) } +func gasRevert(gt params.GasTable, evm *EVM, contract *Contract, stack *Stack, mem *Memory, memorySize uint64) (uint64, error) { + return memoryGasCost(mem, memorySize) +} + func gasSuicide(gt params.GasTable, evm *EVM, contract *Contract, stack *Stack, mem *Memory, memorySize uint64) (uint64, error) { var gas uint64 // EIP150 homestead gas reprice fork: diff --git a/core/vm/instructions.go b/core/vm/instructions.go index 4d6197912..ece4d2229 100644 --- a/core/vm/instructions.go +++ b/core/vm/instructions.go @@ -32,6 +32,7 @@ var ( bigZero = new(big.Int) errWriteProtection = errors.New("evm: write protection") errReturnDataOutOfBounds = errors.New("evm: return data out of bounds") + errExecutionReverted = errors.New("evm: execution reverted") ) func opAdd(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stack *Stack) ([]byte, error) { @@ -579,7 +580,7 @@ func opCreate(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stack *S } contract.UseGas(gas) - _, addr, returnGas, suberr := evm.Create(contract, input, gas, value) + res, addr, returnGas, suberr := evm.Create(contract, input, gas, value) // Push item on the stack based on the returned error. If the ruleset is // homestead we must check for CodeStoreOutOfGasError (homestead only // rule) and treat as an error, if the ruleset is frontier we must @@ -592,9 +593,11 @@ func opCreate(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stack *S stack.push(addr.Big()) } contract.Gas += returnGas - evm.interpreter.intPool.put(value, offset, size) + if suberr == errExecutionReverted { + return res, nil + } return nil, nil } @@ -622,7 +625,8 @@ func opCall(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stack *Sta stack.push(new(big.Int)) } else { stack.push(big.NewInt(1)) - + } + if err == nil || err == errExecutionReverted { memory.Set(retOffset.Uint64(), retSize.Uint64(), ret) } contract.Gas += returnGas @@ -653,10 +657,10 @@ func opCallCode(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stack ret, returnGas, err := evm.CallCode(contract, address, args, gas, value) if err != nil { stack.push(new(big.Int)) - } else { stack.push(big.NewInt(1)) - + } + if err == nil || err == errExecutionReverted { memory.Set(retOffset.Uint64(), retSize.Uint64(), ret) } contract.Gas += returnGas @@ -676,6 +680,8 @@ func opDelegateCall(pc *uint64, evm *EVM, contract *Contract, memory *Memory, st stack.push(new(big.Int)) } else { stack.push(big.NewInt(1)) + } + if err == nil || err == errExecutionReverted { memory.Set(outOffset.Uint64(), outSize.Uint64(), ret) } contract.Gas += returnGas @@ -704,7 +710,8 @@ func opStaticCall(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stac stack.push(new(big.Int)) } else { stack.push(big.NewInt(1)) - + } + if err == nil || err == errExecutionReverted { memory.Set(retOffset.Uint64(), retSize.Uint64(), ret) } contract.Gas += returnGas @@ -718,7 +725,14 @@ func opReturn(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stack *S ret := memory.GetPtr(offset.Int64(), size.Int64()) evm.interpreter.intPool.put(offset, size) + return ret, nil +} + +func opRevert(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stack *Stack) ([]byte, error) { + offset, size := stack.pop(), stack.pop() + ret := memory.GetPtr(offset.Int64(), size.Int64()) + evm.interpreter.intPool.put(offset, size) return ret, nil } @@ -731,7 +745,6 @@ func opSuicide(pc *uint64, evm *EVM, contract *Contract, memory *Memory, stack * evm.StateDB.AddBalance(common.BigToAddress(stack.pop()), balance) evm.StateDB.Suicide(contract.Address()) - return nil, nil } diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index 954839f2e..d3e24a7a4 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -92,7 +92,7 @@ func (in *Interpreter) enforceRestrictions(op OpCode, operation operation, stack if in.readOnly { // If the interpreter is operating in readonly mode, make sure no // state-modifying operation is performed. The 3rd stack item - // for a call operation is the value. Transfering value from one + // for a call operation is the value. Transferring value from one // account to the others means the state is modified and should also // return with an error. if operation.writes || (op == CALL && stack.Back(2).BitLen() > 0) { @@ -209,20 +209,22 @@ func (in *Interpreter) Run(snapshot int, contract *Contract, input []byte) (ret if verifyPool { verifyIntegerPool(in.intPool) } + // if the operation clears the return data (e.g. it has returning data) + // set the last return to the result of the operation. + if operation.returns { + in.returnData = res + } switch { case err != nil: return nil, err + case operation.reverts: + return res, errExecutionReverted case operation.halts: return res, nil case !operation.jumps: pc++ } - // if the operation clears the return data (e.g. it has returning data) - // set the last return to the result of the operation. - if operation.returns { - in.returnData = res - } } return nil, nil } diff --git a/core/vm/jump_table.go b/core/vm/jump_table.go index 2d238f7a1..f0a922912 100644 --- a/core/vm/jump_table.go +++ b/core/vm/jump_table.go @@ -41,20 +41,13 @@ type operation struct { validateStack stackValidationFunc // memorySize returns the memory size required for the operation memorySize memorySizeFunc - // halts indicates whether the operation shoult halt further execution - // and return - halts bool - // jumps indicates whether operation made a jump. This prevents the program - // counter from further incrementing. - jumps bool - // writes determines whether this a state modifying operation - writes bool - // valid is used to check whether the retrieved operation is valid and known - valid bool - // reverts determined whether the operation reverts state - reverts bool - // returns determines whether the opertions sets the return data - returns bool + + halts bool // indicates whether the operation shoult halt further execution + jumps bool // indicates whether the program counter should not increment + writes bool // determines whether this a state modifying operation + valid bool // indication whether the retrieved operation is valid and known + reverts bool // determines whether the operation reverts state (implicitly halts) + returns bool // determines whether the opertions sets the return data content } var ( @@ -89,6 +82,15 @@ func NewMetropolisInstructionSet() [256]operation { memorySize: memoryReturnDataCopy, valid: true, } + instructionSet[REVERT] = operation{ + execute: opRevert, + gasCost: gasRevert, + validateStack: makeStackFunc(2, 0), + memorySize: memoryRevert, + valid: true, + reverts: true, + returns: true, + } return instructionSet } diff --git a/core/vm/memory_table.go b/core/vm/memory_table.go index f1b671adc..bec0235bc 100644 --- a/core/vm/memory_table.go +++ b/core/vm/memory_table.go @@ -89,6 +89,10 @@ func memoryReturn(stack *Stack) *big.Int { return calcMemSize(stack.Back(0), stack.Back(1)) } +func memoryRevert(stack *Stack) *big.Int { + return calcMemSize(stack.Back(0), stack.Back(1)) +} + func memoryLog(stack *Stack) *big.Int { mSize, mStart := stack.Back(1), stack.Back(0) return calcMemSize(mStart, mSize) diff --git a/core/vm/opcodes.go b/core/vm/opcodes.go index be87cae18..0c6550735 100644 --- a/core/vm/opcodes.go +++ b/core/vm/opcodes.go @@ -204,6 +204,7 @@ const ( DELEGATECALL STATICCALL = 0xfa + REVERT = 0xfd SELFDESTRUCT = 0xff ) @@ -360,6 +361,7 @@ var opCodeToString = map[OpCode]string{ CALLCODE: "CALLCODE", DELEGATECALL: "DELEGATECALL", STATICCALL: "STATICCALL", + REVERT: "REVERT", SELFDESTRUCT: "SELFDESTRUCT", PUSH: "PUSH", @@ -509,6 +511,7 @@ var stringToOp = map[string]OpCode{ "CALL": CALL, "RETURN": RETURN, "CALLCODE": CALLCODE, + "REVERT": REVERT, "SELFDESTRUCT": SELFDESTRUCT, } diff --git a/eth/api_backend.go b/eth/api_backend.go index 7ef7c030d..abf52326b 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -115,6 +115,30 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil } +func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription { + return b.eth.BlockChain().SubscribeRemovedTxEvent(ch) +} + +func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainHeadEvent(ch) +} + +func (b *EthApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return b.eth.BlockChain().SubscribeChainSideEvent(ch) +} + +func (b *EthApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.eth.BlockChain().SubscribeLogsEvent(ch) +} + func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { return b.eth.txPool.AddLocal(signedTx) } @@ -151,6 +175,10 @@ func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.TxPool().Content() } +func (b *EthApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.eth.TxPool().SubscribeTxPreEvent(ch) +} + func (b *EthApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } diff --git a/eth/backend.go b/eth/backend.go index 8a837f7b8..5f4f2097a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -137,7 +137,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} - eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig) + eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } @@ -151,7 +151,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } - eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) + eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) maxPeers := config.MaxPeers if config.LightServ > 0 { diff --git a/eth/filters/filter.go b/eth/filters/filter.go index f27b76929..f848bc6af 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -34,6 +34,10 @@ type Backend interface { EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription } // Filter can be used to retrieve and filter logs. diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ab0b7473e..00ade0ffb 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -54,6 +54,19 @@ const ( LastIndexSubscription ) +const ( + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. + rmLogsChanSize = 10 + // logsChanSize is the size of channel listening to LogsEvent. + logsChanSize = 10 + // chainEvChanSize is the size of channel listening to ChainEvent. + chainEvChanSize = 10 +) + var ( ErrInvalidSubscriptionID = errors.New("invalid id") ) @@ -276,57 +289,50 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. -func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) { +func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { if ev == nil { return } - switch e := ev.Data.(type) { + switch e := ev.(type) { case []*types.Log: if len(e) > 0 { for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } } case core.RemovedLogsEvent: for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - case core.PendingLogsEvent: - for _, f := range filters[PendingLogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs + case *event.TypeMuxEvent: + switch muxe := e.Data.(type) { + case core.PendingLogsEvent: + for _, f := range filters[PendingLogsSubscription] { + if e.Time.After(f.created) { + if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } } } } case core.TxPreEvent: for _, f := range filters[PendingTransactionsSubscription] { - if ev.Time.After(f.created) { - f.hashes <- e.Tx.Hash() - } + f.hashes <- e.Tx.Hash() } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { - if ev.Time.After(f.created) { - f.headers <- e.Block.Header() - } + f.headers <- e.Block.Header() } if es.lightMode && len(filters[LogsSubscription]) > 0 { es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { for _, f := range filters[LogsSubscription] { - if ev.Time.After(f.created) { - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } }) @@ -395,9 +401,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. func (es *EventSystem) eventLoop() { var ( index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{}) + sub = es.mux.Subscribe(core.PendingLogsEvent{}) + // Subscribe TxPreEvent form txpool + txCh = make(chan core.TxPreEvent, txChanSize) + txSub = es.backend.SubscribeTxPreEvent(txCh) + // Subscribe RemovedLogsEvent + rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) + rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) + // Subscribe []*types.Log + logsCh = make(chan []*types.Log, logsChanSize) + logsSub = es.backend.SubscribeLogsEvent(logsCh) + // Subscribe ChainEvent + chainEvCh = make(chan core.ChainEvent, chainEvChanSize) + chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) ) + // Unsubscribe all events + defer sub.Unsubscribe() + defer txSub.Unsubscribe() + defer rmLogsSub.Unsubscribe() + defer logsSub.Unsubscribe() + defer chainEvSub.Unsubscribe() + for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } @@ -409,6 +434,17 @@ func (es *EventSystem) eventLoop() { return } es.broadcast(index, ev) + + // Handle subscribed events + case ev := <-txCh: + es.broadcast(index, ev) + case ev := <-rmLogsCh: + es.broadcast(index, ev) + case ev := <-logsCh: + es.broadcast(index, ev) + case ev := <-chainEvCh: + es.broadcast(index, ev) + case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -427,6 +463,16 @@ func (es *EventSystem) eventLoop() { delete(index[f.typ], f.id) } close(f.err) + + // System stopped + case <-txSub.Err(): + return + case <-rmLogsSub.Err(): + return + case <-logsSub.Err(): + return + case <-chainEvSub.Err(): + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 23e6d66e1..fcc888b8c 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -34,8 +34,12 @@ import ( ) type testBackend struct { - mux *event.TypeMux - db ethdb.Database + mux *event.TypeMux + db ethdb.Database + txFeed *event.Feed + rmLogsFeed *event.Feed + logsFeed *event.Feed + chainFeed *event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -64,6 +68,22 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t return core.GetBlockReceipts(b.db, blockHash, num), nil } +func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.txFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.rmLogsFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.logsFeed.Subscribe(ch) +} + +func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.chainFeed.Subscribe(ch) +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) @@ -75,7 +95,11 @@ func TestBlockSubscription(t *testing.T) { var ( mux = new(event.TypeMux) db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} api = NewPublicFilterAPI(backend, false) genesis = new(core.Genesis).MustCommit(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {}) @@ -114,7 +138,7 @@ func TestBlockSubscription(t *testing.T) { time.Sleep(1 * time.Second) for _, e := range chainEvents { - mux.Post(e) + chainFeed.Send(e) } <-sub0.Err() @@ -126,10 +150,14 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), @@ -147,9 +175,10 @@ func TestPendingTxFilter(t *testing.T) { time.Sleep(1 * time.Second) for _, tx := range transactions { ev := core.TxPreEvent{Tx: tx} - mux.Post(ev) + txFeed.Send(ev) } + timeout := time.Now().Add(1 * time.Second) for { results, err := api.GetFilterChanges(fid0) if err != nil { @@ -161,10 +190,18 @@ func TestPendingTxFilter(t *testing.T) { if len(hashes) >= len(transactions) { break } + // check timeout + if time.Now().After(timeout) { + break + } time.Sleep(100 * time.Millisecond) } + if len(hashes) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + return + } for i := range hashes { if hashes[i] != transactions[i].Hash() { t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) @@ -176,10 +213,14 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) testCases = []struct { crit FilterCriteria @@ -221,10 +262,14 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) ) // different situations where log filter creation should fail. @@ -242,15 +287,19 @@ func TestInvalidLogFilterCreation(t *testing.T) { } } -// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. +// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed. func TestLogFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -311,8 +360,8 @@ func TestLogFilter(t *testing.T) { // raise events time.Sleep(1 * time.Second) - if err := mux.Post(allLogs); err != nil { - t.Fatal(err) + if nsend := logsFeed.Send(allLogs); nsend == 0 { + t.Fatal("Shoud have at least one subscription") } if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil { t.Fatal(err) @@ -320,6 +369,7 @@ func TestLogFilter(t *testing.T) { for i, tt := range testCases { var fetched []*types.Log + timeout := time.Now().Add(1 * time.Second) for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { @@ -330,6 +380,10 @@ func TestLogFilter(t *testing.T) { if len(fetched) >= len(tt.expected) { break } + // check timeout + if time.Now().After(timeout) { + break + } time.Sleep(100 * time.Millisecond) } @@ -350,15 +404,19 @@ func TestLogFilter(t *testing.T) { } } -// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux. +// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed. func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -456,6 +514,7 @@ func TestPendingLogsSubscription(t *testing.T) { // raise events time.Sleep(1 * time.Second) + // allLogs are type of core.PendingLogsEvent for _, l := range allLogs { if err := mux.Post(l); err != nil { t.Fatal(err) diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b6cfd4bbc..3244c04d7 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -49,14 +49,18 @@ func BenchmarkMipmaps(b *testing.B) { defer os.RemoveAll(dir) var ( - db, _ = ethdb.NewLDBDatabase(dir, 0, 0) - mux = new(event.TypeMux) - backend = &testBackend{mux, db} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) ) defer db.Close() @@ -119,11 +123,15 @@ func TestFilters(t *testing.T) { defer os.RemoveAll(dir) var ( - db, _ = ethdb.NewLDBDatabase(dir, 0, 0) - mux = new(event.TypeMux) - backend = &testBackend{mux, db} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr = crypto.PubkeyToAddress(key1.PublicKey) + db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) hash1 = common.BytesToHash([]byte("topic1")) hash2 = common.BytesToHash([]byte("topic2")) diff --git a/eth/handler.go b/eth/handler.go index 6c6449340..9d230a4ad 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,6 +45,10 @@ import ( const ( softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 ) var ( @@ -78,7 +82,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txSub *event.TypeMuxSubscription + txCh chan core.TxPreEvent + txSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -200,7 +205,8 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start() { // broadcast transactions - pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + pm.txCh = make(chan core.TxPreEvent, txChanSize) + pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) @@ -688,6 +694,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { peer.SendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + return } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash) { @@ -723,10 +730,15 @@ func (self *ProtocolManager) minedBroadcastLoop() { } func (self *ProtocolManager) txBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.txSub.Chan() { - event := obj.Data.(core.TxPreEvent) - self.BroadcastTx(event.Tx.Hash(), event.Tx) + for { + select { + case event := <-self.txCh: + self.BroadcastTx(event.Tx.Hash(), event.Tx) + + // Err() channel will be closed when unsubscribing. + case <-self.txSub.Err(): + return + } } } diff --git a/eth/handler_test.go b/eth/handler_test.go index ca9c9e1b4..aba277444 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -474,7 +474,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} gspec = &core.Genesis{Config: config} genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, config, pow, evmux, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{}) ) pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db) if err != nil { diff --git a/eth/helper_test.go b/eth/helper_test.go index 546478a3e..f1dab9528 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -59,7 +59,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}}, } genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{}) + blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{}) ) chain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator) if _, err := blockchain.InsertChain(chain); err != nil { @@ -88,8 +88,9 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i // testTxPool is a fake, helper transaction pool for testing purposes type testTxPool struct { - pool []*types.Transaction // Collection of all transactions - added chan<- []*types.Transaction // Notification channel for new transactions + txFeed event.Feed + pool []*types.Transaction // Collection of all transactions + added chan<- []*types.Transaction // Notification channel for new transactions lock sync.RWMutex // Protects the transaction pool } @@ -124,6 +125,10 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { return batches, nil } +func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return p.txFeed.Subscribe(ch) +} + // newTestTransaction create a new dummy transaction. func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction { tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize)) diff --git a/eth/protocol.go b/eth/protocol.go index 376e4663e..2c41376fa 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -22,7 +22,9 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" ) @@ -100,6 +102,10 @@ type txPool interface { // Pending should return pending transactions. // The slice should be modifiable by the caller. Pending() (map[common.Address]types.Transactions, error) + + // SubscribeTxPreEvent should return an event subscription of + // TxPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription } // statusData is the network packet for the status message. diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index b75c5e6da..bb03dc72b 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -44,9 +44,27 @@ import ( "golang.org/x/net/websocket" ) -// historyUpdateRange is the number of blocks a node should report upon login or -// history request. -const historyUpdateRange = 50 +const ( + // historyUpdateRange is the number of blocks a node should report upon login or + // history request. + historyUpdateRange = 50 + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 +) + +type txPool interface { + // SubscribeTxPreEvent should return an event subscription of + // TxPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription +} + +type blockChain interface { + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription +} // Service implements an Ethereum netstats reporting daemon that pushes local // chain statistics up to a monitoring server. @@ -118,16 +136,22 @@ func (s *Service) Stop() error { // until termination. func (s *Service) loop() { // Subscribe to chain events to execute updates on - var emux *event.TypeMux + var blockchain blockChain + var txpool txPool if s.eth != nil { - emux = s.eth.EventMux() + blockchain = s.eth.BlockChain() + txpool = s.eth.TxPool() } else { - emux = s.les.EventMux() + blockchain = s.les.BlockChain() + txpool = s.les.TxPool() } - headSub := emux.Subscribe(core.ChainHeadEvent{}) + + chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize) + headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh) defer headSub.Unsubscribe() - txSub := emux.Subscribe(core.TxPreEvent{}) + txEventCh := make(chan core.TxPreEvent, txChanSize) + txSub := txpool.SubscribeTxPreEvent(txEventCh) defer txSub.Unsubscribe() // Start a goroutine that exhausts the subsciptions to avoid events piling up @@ -139,25 +163,18 @@ func (s *Service) loop() { go func() { var lastTx mclock.AbsTime + HandleLoop: for { select { // Notify of chain head events, but drop if too frequent - case head, ok := <-headSub.Chan(): - if !ok { // node stopped - close(quitCh) - return - } + case head := <-chainHeadCh: select { - case headCh <- head.Data.(core.ChainHeadEvent).Block: + case headCh <- head.Block: default: } // Notify of new transaction events, but drop if too frequent - case _, ok := <-txSub.Chan(): - if !ok { // node stopped - close(quitCh) - return - } + case <-txEventCh: if time.Duration(mclock.Now()-lastTx) < time.Second { continue } @@ -167,8 +184,16 @@ func (s *Service) loop() { case txCh <- struct{}{}: default: } + + // node stopped + case <-txSub.Err(): + break HandleLoop + case <-headSub.Err(): + break HandleLoop } } + close(quitCh) + return }() // Loop reporting until termination for { diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index d122b7915..be17ffeae 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -53,6 +53,9 @@ type Backend interface { GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetTd(blockHash common.Hash) *big.Int GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmCfg vm.Config) (*vm.EVM, func() error, error) + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription // TxPool API SendTx(ctx context.Context, signedTx *types.Transaction) error @@ -62,6 +65,7 @@ type Backend interface { GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) Stats() (pending int, queued int) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription ChainConfig() *params.ChainConfig CurrentBlock() *types.Block diff --git a/les/api_backend.go b/les/api_backend.go index 7a3c2447c..1323e8864 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -124,6 +124,30 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.txPool.Content() } +func (b *LesApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return b.eth.txPool.SubscribeTxPreEvent(ch) +} + +func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.eth.blockchain.SubscribeChainEvent(ch) +} + +func (b *LesApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return b.eth.blockchain.SubscribeChainHeadEvent(ch) +} + +func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return b.eth.blockchain.SubscribeChainSideEvent(ch) +} + +func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.eth.blockchain.SubscribeLogsEvent(ch) +} + +func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.eth.blockchain.SubscribeRemovedLogsEvent(ch) +} + func (b *LesApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } diff --git a/les/backend.go b/les/backend.go index a4e772671..4c33417c0 100644 --- a/les/backend.go +++ b/les/backend.go @@ -103,7 +103,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { eth.serverPool = newServerPool(chainDb, quitSync, ð.wg) eth.retriever = newRetrieveManager(peers, eth.reqDist, eth.serverPool) eth.odr = NewLesOdr(chainDb, eth.retriever) - if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil { + if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine); err != nil { return nil, err } // Rewind the chain in case of an incompatible config upgrade. @@ -113,7 +113,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { core.WriteChainConfig(chainDb, genesisHash, chainConfig) } - eth.txPool = light.NewTxPool(eth.chainConfig, eth.eventMux, eth.blockchain, eth.relay) + eth.txPool = light.NewTxPool(eth.chainConfig, eth.blockchain, eth.relay) if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, true, config.NetworkId, eth.eventMux, eth.engine, eth.peers, eth.blockchain, nil, chainDb, eth.odr, eth.relay, quitSync, ð.wg); err != nil { return nil, err } diff --git a/les/handler.go b/les/handler.go index 234b6e998..1a75cd369 100644 --- a/les/handler.go +++ b/les/handler.go @@ -82,6 +82,7 @@ type BlockChain interface { GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash LastBlockHash() common.Hash Genesis() *types.Block + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription } type txPool interface { diff --git a/les/helper_test.go b/les/helper_test.go index 7dccfc458..b33454e1d 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -146,9 +146,9 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor } if lightSync { - chain, _ = light.NewLightChain(odr, gspec.Config, engine, evmux) + chain, _ = light.NewLightChain(odr, gspec.Config, engine) } else { - blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{}) + blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, vm.Config{}) gchain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) diff --git a/les/server.go b/les/server.go index 39ef0efff..8b2730714 100644 --- a/les/server.go +++ b/les/server.go @@ -271,7 +271,8 @@ func (s *requestCostStats) update(msgCode, reqCnt, cost uint64) { func (pm *ProtocolManager) blockLoop() { pm.wg.Add(1) - sub := pm.eventMux.Subscribe(core.ChainHeadEvent{}) + headCh := make(chan core.ChainHeadEvent, 10) + headSub := pm.blockchain.SubscribeChainHeadEvent(headCh) newCht := make(chan struct{}, 10) newCht <- struct{}{} go func() { @@ -280,10 +281,10 @@ func (pm *ProtocolManager) blockLoop() { lastBroadcastTd := common.Big0 for { select { - case ev := <-sub.Chan(): + case ev := <-headCh: peers := pm.peers.AllPeers() if len(peers) > 0 { - header := ev.Data.(core.ChainHeadEvent).Block.Header() + header := ev.Block.Header() hash := header.Hash() number := header.Number.Uint64() td := core.GetTd(pm.chainDb, hash, number) @@ -319,7 +320,7 @@ func (pm *ProtocolManager) blockLoop() { } }() case <-pm.quitSync: - sub.Unsubscribe() + headSub.Unsubscribe() pm.wg.Done() return } diff --git a/light/lightchain.go b/light/lightchain.go index a51043975..df194ecad 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -44,11 +44,14 @@ var ( // headers, downloading block bodies and receipts on demand through an ODR // interface. It only does header validation during chain insertion. type LightChain struct { - hc *core.HeaderChain - chainDb ethdb.Database - odr OdrBackend - eventMux *event.TypeMux - genesisBlock *types.Block + hc *core.HeaderChain + chainDb ethdb.Database + odr OdrBackend + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex chainmu sync.RWMutex @@ -69,7 +72,7 @@ type LightChain struct { // NewLightChain returns a fully initialised light chain using information // available in the database. It initialises the default Ethereum header // validator. -func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux) (*LightChain, error) { +func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine) (*LightChain, error) { bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -77,7 +80,6 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. bc := &LightChain{ chainDb: odr.Database(), odr: odr, - eventMux: mux, quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -316,16 +318,18 @@ func (self *LightChain) Rollback(chain []common.Hash) { } // postChainEvents iterates over the events generated by a chain insertion and -// posts them into the event mux. +// posts them into the event feed. func (self *LightChain) postChainEvents(events []interface{}) { for _, event := range events { - if event, ok := event.(core.ChainEvent); ok { - if self.LastBlockHash() == event.Hash { - self.eventMux.Post(core.ChainHeadEvent{Block: event.Block}) + switch ev := event.(type) { + case core.ChainEvent: + if self.LastBlockHash() == ev.Hash { + self.chainHeadFeed.Send(core.ChainHeadEvent{Block: ev.Block}) } + self.chainFeed.Send(ev) + case core.ChainSideEvent: + self.chainSideFeed.Send(ev) } - // Fire the insertion events individually too - self.eventMux.Post(event) } } @@ -467,3 +471,30 @@ func (self *LightChain) LockChain() { func (self *LightChain) UnlockChain() { self.chainmu.RUnlock() } + +// SubscribeChainEvent registers a subscription of ChainEvent. +func (self *LightChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return self.scope.Track(self.chainFeed.Subscribe(ch)) +} + +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (self *LightChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return self.scope.Track(self.chainHeadFeed.Subscribe(ch)) +} + +// SubscribeChainSideEvent registers a subscription of ChainSideEvent. +func (self *LightChain) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return self.scope.Track(self.chainSideFeed.Subscribe(ch)) +} + +// SubscribeLogsEvent implements the interface of filters.Backend +// LightChain does not send logs events, so return an empty subscription. +func (self *LightChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) +} + +// SubscribeRemovedLogsEvent implements the interface of filters.Backend +// LightChain does not send core.RemovedLogsEvent, so return an empty subscription. +func (self *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) +} diff --git a/light/lightchain_test.go b/light/lightchain_test.go index 0ad640525..40a4d396a 100644 --- a/light/lightchain_test.go +++ b/light/lightchain_test.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -55,7 +54,7 @@ func newCanonical(n int) (ethdb.Database, *LightChain, error) { db, _ := ethdb.NewMemDatabase() gspec := core.Genesis{Config: params.TestChainConfig} genesis := gspec.MustCommit(db) - blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker(), new(event.TypeMux)) + blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker()) // Create and inject the requested chain if n == 0 { @@ -75,7 +74,7 @@ func newTestLightChain() *LightChain { Config: params.TestChainConfig, } gspec.MustCommit(db) - lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker(), new(event.TypeMux)) + lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker()) if err != nil { panic(err) } @@ -339,7 +338,7 @@ func TestReorgBadHeaderHashes(t *testing.T) { defer func() { delete(core.BadHashes, headers[3].Hash()) }() // Create a new LightChain and check that it rolled back the state. - ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux)) + ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker()) if err != nil { t.Fatalf("failed to create new chain manager: %v", err) } diff --git a/light/odr_test.go b/light/odr_test.go index 544b64eff..bd1e976e8 100644 --- a/light/odr_test.go +++ b/light/odr_test.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -233,7 +232,6 @@ func testChainGen(i int, block *core.BlockGen) { func testChainOdr(t *testing.T, protocol int, fn odrTestFn) { var ( - evmux = new(event.TypeMux) sdb, _ = ethdb.NewMemDatabase() ldb, _ = ethdb.NewMemDatabase() gspec = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}} @@ -241,14 +239,14 @@ func testChainOdr(t *testing.T, protocol int, fn odrTestFn) { ) gspec.MustCommit(ldb) // Assemble the test environment - blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{}) + blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, 4, testChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { t.Fatal(err) } odr := &testOdr{sdb: sdb, ldb: ldb} - lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux) + lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker()) if err != nil { t.Fatal(err) } diff --git a/light/trie_test.go b/light/trie_test.go index 9b2cf7c2b..5f45c01af 100644 --- a/light/trie_test.go +++ b/light/trie_test.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" ) @@ -41,7 +40,7 @@ func TestNodeIterator(t *testing.T) { genesis = gspec.MustCommit(fulldb) ) gspec.MustCommit(lightdb) - blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{}) + blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, fulldb, 4, testChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) diff --git a/light/txpool.go b/light/txpool.go index 7cbb991e8..bd215b992 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -33,6 +33,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 +) + // txPermanent is the number of mined blocks after a mined transaction is // considered permanent and no rollback is expected var txPermanent = uint64(500) @@ -43,21 +48,23 @@ var txPermanent = uint64(500) // always receive all locally signed transactions in the same order as they are // created. type TxPool struct { - config *params.ChainConfig - signer types.Signer - quit chan bool - eventMux *event.TypeMux - events *event.TypeMuxSubscription - mu sync.RWMutex - chain *LightChain - odr OdrBackend - chainDb ethdb.Database - relay TxRelayBackend - head common.Hash - nonce map[common.Address]uint64 // "pending" nonce - pending map[common.Hash]*types.Transaction // pending transactions by tx hash - mined map[common.Hash][]*types.Transaction // mined transactions by block hash - clearIdx uint64 // earliest block nr that can contain mined tx info + config *params.ChainConfig + signer types.Signer + quit chan bool + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + mu sync.RWMutex + chain *LightChain + odr OdrBackend + chainDb ethdb.Database + relay TxRelayBackend + head common.Hash + nonce map[common.Address]uint64 // "pending" nonce + pending map[common.Hash]*types.Transaction // pending transactions by tx hash + mined map[common.Hash][]*types.Transaction // mined transactions by block hash + clearIdx uint64 // earliest block nr that can contain mined tx info homestead bool } @@ -78,23 +85,24 @@ type TxRelayBackend interface { } // NewTxPool creates a new light transaction pool -func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, chain *LightChain, relay TxRelayBackend) *TxPool { +func NewTxPool(config *params.ChainConfig, chain *LightChain, relay TxRelayBackend) *TxPool { pool := &TxPool{ - config: config, - signer: types.NewEIP155Signer(config.ChainId), - nonce: make(map[common.Address]uint64), - pending: make(map[common.Hash]*types.Transaction), - mined: make(map[common.Hash][]*types.Transaction), - quit: make(chan bool), - eventMux: eventMux, - events: eventMux.Subscribe(core.ChainHeadEvent{}), - chain: chain, - relay: relay, - odr: chain.Odr(), - chainDb: chain.Odr().Database(), - head: chain.CurrentHeader().Hash(), - clearIdx: chain.CurrentHeader().Number.Uint64(), - } + config: config, + signer: types.NewEIP155Signer(config.ChainId), + nonce: make(map[common.Address]uint64), + pending: make(map[common.Hash]*types.Transaction), + mined: make(map[common.Hash][]*types.Transaction), + quit: make(chan bool), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chain: chain, + relay: relay, + odr: chain.Odr(), + chainDb: chain.Odr().Database(), + head: chain.CurrentHeader().Hash(), + clearIdx: chain.CurrentHeader().Number.Uint64(), + } + // Subscribe events from blockchain + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) go pool.eventLoop() return pool @@ -274,13 +282,17 @@ const blockCheckTimeout = time.Second * 3 // eventLoop processes chain head events and also notifies the tx relay backend // about the new head hash and tx state changes func (pool *TxPool) eventLoop() { - for ev := range pool.events.Chan() { - switch ev.Data.(type) { - case core.ChainHeadEvent: - pool.setNewHead(ev.Data.(core.ChainHeadEvent).Block.Header()) + for { + select { + case ev := <-pool.chainHeadCh: + pool.setNewHead(ev.Block.Header()) // hack in order to avoid hogging the lock; this part will // be replaced by a subsequent PR. time.Sleep(time.Millisecond) + + // System stopped + case <-pool.chainHeadSub.Err(): + return } } } @@ -301,11 +313,20 @@ func (pool *TxPool) setNewHead(head *types.Header) { // Stop stops the light transaction pool func (pool *TxPool) Stop() { + // Unsubscribe all subscriptions registered from txpool + pool.scope.Close() + // Unsubscribe subscriptions registered from blockchain + pool.chainHeadSub.Unsubscribe() close(pool.quit) - pool.events.Unsubscribe() log.Info("Transaction pool stopped") } +// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and +// starts sending event to the given channel. +func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return pool.scope.Track(pool.txFeed.Subscribe(ch)) +} + // Stats returns the number of currently pending (locally created) transactions func (pool *TxPool) Stats() (pending int) { pool.mu.RLock() @@ -388,7 +409,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error { // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. - go self.eventMux.Post(core.TxPreEvent{Tx: tx}) + go self.txFeed.Send(core.TxPreEvent{Tx: tx}) } // Print a log message if low enough level is set diff --git a/light/txpool_test.go b/light/txpool_test.go index f23832a41..fe7936ac2 100644 --- a/light/txpool_test.go +++ b/light/txpool_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -82,7 +81,6 @@ func TestTxPool(t *testing.T) { } var ( - evmux = new(event.TypeMux) sdb, _ = ethdb.NewMemDatabase() ldb, _ = ethdb.NewMemDatabase() gspec = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}} @@ -90,7 +88,7 @@ func TestTxPool(t *testing.T) { ) gspec.MustCommit(ldb) // Assemble the test environment - blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{}) + blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{}) gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, poolTestBlocks, txPoolTestChainGen) if _, err := blockchain.InsertChain(gchain); err != nil { panic(err) @@ -102,9 +100,9 @@ func TestTxPool(t *testing.T) { discard: make(chan int, 1), mined: make(chan int, 1), } - lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux) + lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker()) txPermanent = 50 - pool := NewTxPool(params.TestChainConfig, evmux, lightchain, relay) + pool := NewTxPool(params.TestChainConfig, lightchain, relay) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() diff --git a/metrics/metrics.go b/metrics/metrics.go index 9906bb8ad..da2cdbc53 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -29,7 +29,7 @@ import ( ) // MetricsEnabledFlag is the CLI flag name to use to enable metrics collections. -var MetricsEnabledFlag = "metrics" +const MetricsEnabledFlag = "metrics" // Enabled is the flag specifying if metrics are enable or not. var Enabled = false diff --git a/miner/worker.go b/miner/worker.go index dab192c24..24e03be60 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -41,6 +41,14 @@ import ( const ( resultQueueSize = 10 miningLogAtDepth = 5 + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // chainSideChanSize is the size of channel listening to ChainSideEvent. + chainSideChanSize = 10 ) // Agent can register themself with the worker @@ -87,9 +95,14 @@ type worker struct { mu sync.Mutex // update loop - mux *event.TypeMux - events *event.TypeMuxSubscription - wg sync.WaitGroup + mux *event.TypeMux + txCh chan core.TxPreEvent + txSub event.Subscription + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + chainSideCh chan core.ChainSideEvent + chainSideSub event.Subscription + wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -123,6 +136,9 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com engine: engine, eth: eth, mux: mux, + txCh: make(chan core.TxPreEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), recv: make(chan *Result, resultQueueSize), chain: eth.BlockChain(), @@ -133,7 +149,11 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), fullValidation: false, } - worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) + // Subscribe TxPreEvent for tx pool + worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) + // Subscribe events for blockchain + worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) + worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) go worker.update() go worker.wait() @@ -225,20 +245,28 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - for event := range self.events.Chan() { + defer self.txSub.Unsubscribe() + defer self.chainHeadSub.Unsubscribe() + defer self.chainSideSub.Unsubscribe() + + for { // A real event arrived, process interesting content - switch ev := event.Data.(type) { - case core.ChainHeadEvent: + select { + // Handle ChainHeadEvent + case <-self.chainHeadCh: self.commitNewWork() - case core.ChainSideEvent: + + // Handle ChainSideEvent + case ev := <-self.chainSideCh: self.uncleMu.Lock() self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() - case core.TxPreEvent: + + // Handle TxPreEvent + case ev := <-self.txCh: // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - acc, _ := types.Sender(self.current.signer, ev.Tx) txs := map[common.Address]types.Transactions{acc: {ev.Tx}} txset := types.NewTransactionsByPriceAndNonce(txs) @@ -246,6 +274,14 @@ func (self *worker) update() { self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.currentMu.Unlock() } + + // System stopped + case <-self.txSub.Err(): + return + case <-self.chainHeadSub.Err(): + return + case <-self.chainSideSub.Err(): + return } } } @@ -298,12 +334,18 @@ func (self *worker) wait() { // broadcast before waiting for validation go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) { self.mux.Post(core.NewMinedBlockEvent{Block: block}) - self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + var ( + events []interface{} + coalescedLogs []*types.Log + ) + events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if stat == core.CanonStatTy { - self.mux.Post(core.ChainHeadEvent{Block: block}) - self.mux.Post(logs) + events = append(events, core.ChainHeadEvent{Block: block}) + coalescedLogs = logs } + // post blockchain events + self.chain.PostChainEvents(events, coalescedLogs) if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil { log.Warn("Failed writing block receipts", "err", err) } diff --git a/tests/block_test_util.go b/tests/block_test_util.go index c1d433ff3..a789e6d88 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -111,7 +110,7 @@ func (t *BlockTest) Run() error { return fmt.Errorf("genesis block state root does not match test: computed=%x, test=%x", gblock.Root().Bytes()[:6], t.json.Genesis.StateRoot[:6]) } - chain, err := core.NewBlockChain(db, config, ethash.NewShared(), new(event.TypeMux), vm.Config{}) + chain, err := core.NewBlockChain(db, config, ethash.NewShared(), vm.Config{}) if err != nil { return err } diff --git a/tests/state_test.go b/tests/state_test.go index ab6dc423f..00067c61a 100644 --- a/tests/state_test.go +++ b/tests/state_test.go @@ -40,6 +40,9 @@ func TestState(t *testing.T) { st.fails(`^stRevertTest/RevertDepthCreateAddressCollision\.json/EIP15[08]/[67]`, "bug in test") st.fails(`^stRevertTest/RevertPrecompiledTouch\.json/EIP158`, "bug in test") st.fails(`^stRevertTest/RevertPrefoundEmptyOOG\.json/EIP158`, "bug in test") + st.fails(`^stRevertTest/RevertDepthCreateAddressCollision\.json/Byzantium/[67]`, "bug in test") + st.fails(`^stRevertTest/RevertPrecompiledTouch\.json/Byzantium`, "bug in test") + st.fails(`^stRevertTest/RevertPrefoundEmptyOOG\.json/Byzantium`, "bug in test") st.walk(t, stateTestDir, func(t *testing.T, name string, test *StateTest) { for _, subtest := range test.Subtests() { @@ -47,8 +50,8 @@ func TestState(t *testing.T) { key := fmt.Sprintf("%s/%d", subtest.Fork, subtest.Index) name := name + "/" + key t.Run(key, func(t *testing.T) { - if subtest.Fork == "Constantinople" || subtest.Fork == "Byzantium" { - t.Skip("constantinople, byzantium not supported yet") + if subtest.Fork == "Constantinople" { + t.Skip("constantinople not supported yet") } withTrace(t, test.gasLimit(subtest), func(vmconfig vm.Config) error { return st.checkFailure(t, name, test.Run(subtest, vmconfig)) diff --git a/vendor/github.com/karalabe/hid/README.md b/vendor/github.com/karalabe/hid/README.md index 5c0213f3b..2851ffe42 100644 --- a/vendor/github.com/karalabe/hid/README.md +++ b/vendor/github.com/karalabe/hid/README.md @@ -24,6 +24,12 @@ and go-gettable. Supported platforms at the moment are Linux, macOS and Windows (exclude constraints are also specified for Android and iOS to allow smoother vendoring into cross platform projects). +## Cross-compiling + +Using `go get` the embedded C library is compiled into the binary format of your host OS. Cross compiling to a different platform or architecture entails disabling CGO by default in Go, causing device enumeration `hid.Enumerate()` to yield no results. + +To cross compile a functional version of this library, you'll need to enable CGO during cross compilation via `CGO_ENABLED=1` and you'll need to install and set a cross compilation enabled C toolkit via `CC=your-cross-gcc`. + ## Acknowledgements Although the `hid` package is an implementation from scratch, it was heavily inspired by the existing diff --git a/vendor/github.com/karalabe/hid/appveyor.yml b/vendor/github.com/karalabe/hid/appveyor.yml index 34623ddb0..f43958747 100644 --- a/vendor/github.com/karalabe/hid/appveyor.yml +++ b/vendor/github.com/karalabe/hid/appveyor.yml @@ -29,3 +29,4 @@ install: build_script: - go install ./... + - go test -v ./... diff --git a/vendor/github.com/karalabe/hid/hid.go b/vendor/github.com/karalabe/hid/hid.go index be75f949d..60a40b8c2 100644 --- a/vendor/github.com/karalabe/hid/hid.go +++ b/vendor/github.com/karalabe/hid/hid.go @@ -2,7 +2,7 @@ // Copyright (c) 2017 Péter Szilágyi. All rights reserved. // // This file is released under the 3-clause BSD license. Note however that Linux -// support depends on libusb, released under GNU GPL 2.1 or later. +// support depends on libusb, released under GNU LGPL 2.1 or later. // Package hid provides an interface for USB HID devices. package hid diff --git a/vendor/github.com/karalabe/hid/hid_disabled.go b/vendor/github.com/karalabe/hid/hid_disabled.go index d8ecc9d5a..1f4026379 100644 --- a/vendor/github.com/karalabe/hid/hid_disabled.go +++ b/vendor/github.com/karalabe/hid/hid_disabled.go @@ -2,7 +2,7 @@ // Copyright (c) 2017 Péter Szilágyi. All rights reserved. // // This file is released under the 3-clause BSD license. Note however that Linux -// support depends on libusb, released under GNU GPL 2.1 or later. +// support depends on libusb, released under GNU LGPL 2.1 or later. // +build !linux,!darwin,!windows ios !cgo diff --git a/vendor/github.com/karalabe/hid/hid_enabled.go b/vendor/github.com/karalabe/hid/hid_enabled.go index 3ceecce0d..419273be6 100644 --- a/vendor/github.com/karalabe/hid/hid_enabled.go +++ b/vendor/github.com/karalabe/hid/hid_enabled.go @@ -2,7 +2,7 @@ // Copyright (c) 2017 Péter Szilágyi. All rights reserved. // // This file is released under the 3-clause BSD license. Note however that Linux -// support depends on libusb, released under GNU GPL 2.1 or later. +// support depends on libusb, released under LGNU GPL 2.1 or later. // +build linux,cgo darwin,!ios,cgo windows,cgo @@ -48,6 +48,15 @@ import ( "unsafe" ) +// enumerateLock is a mutex serializing access to USB device enumeration needed +// by the macOS USB HID system calls, which require 2 consecutive method calls +// for enumeration, causing crashes if called concurrently. +// +// For more details, see: +// https://developer.apple.com/documentation/iokit/1438371-iohidmanagersetdevicematching +// > "subsequent calls will cause the hid manager to release previously enumerated devices" +var enumerateLock sync.Mutex + func init() { // Initialize the HIDAPI library C.hid_init() @@ -66,6 +75,9 @@ func Supported() bool { // - If the product id is set to 0 then any product matches. // - If the vendor and product id are both 0, all HID devices are returned. func Enumerate(vendorID uint16, productID uint16) []DeviceInfo { + enumerateLock.Lock() + defer enumerateLock.Unlock() + // Gather all device infos and ensure they are freed before returning head := C.hid_enumerate(C.ushort(vendorID), C.ushort(productID)) if head == nil { diff --git a/vendor/vendor.json b/vendor/vendor.json index b0bc88adc..56c95e341 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -183,10 +183,10 @@ "revisionTime": "2016-06-03T03:41:37Z" }, { - "checksumSHA1": "sGdhjz2N/DpfiZrMnHSzl2H/YX8=", + "checksumSHA1": "UpjhOUZ1+0zNt+iIvdtECSHXmTs=", "path": "github.com/karalabe/hid", - "revision": "875879887cf0560a993b57e04615944e1dd02a73", - "revisionTime": "2017-02-27T10:57:12Z", + "revision": "f00545f9f3748e591590be3732d913c77525b10f", + "revisionTime": "2017-08-21T10:38:37Z", "tree": true }, { diff --git a/whisper/whisperv5/api.go b/whisper/whisperv5/api.go index 5b84b99eb..ec22f0d1d 100644 --- a/whisper/whisperv5/api.go +++ b/whisper/whisperv5/api.go @@ -37,7 +37,7 @@ const ( ) var ( - ErrSymAsym = errors.New("specify either a symetric or a asymmetric key") + ErrSymAsym = errors.New("specify either a symmetric or an asymmetric key") ErrInvalidSymmetricKey = errors.New("invalid symmetric key") ErrInvalidPublicKey = errors.New("invalid public key") ErrInvalidSigningPubKey = errors.New("invalid signing public key") @@ -243,7 +243,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (bool, er err error ) - // user must specify either a symmetric or a asymmetric key + // user must specify either a symmetric or an asymmetric key if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { return false, ErrSymAsym } @@ -344,7 +344,7 @@ func (api *PublicWhisperAPI) Messages(ctx context.Context, crit Criteria) (*rpc. return nil, rpc.ErrNotificationsUnsupported } - // user must specify either a symmetric or a asymmetric key + // user must specify either a symmetric or an asymmetric key if (symKeyGiven && pubKeyGiven) || (!symKeyGiven && !pubKeyGiven) { return nil, ErrSymAsym } @@ -534,7 +534,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) { err error ) - // user must specify either a symmetric or a asymmetric key + // user must specify either a symmetric or an asymmetric key if (symKeyGiven && asymKeyGiven) || (!symKeyGiven && !asymKeyGiven) { return "", ErrSymAsym } |