From f0998415ba9a73f0add32f9b5aed2aec98b9a7f3 Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Wed, 8 Aug 2018 12:15:08 +0300 Subject: cmd, consensus/ethash, eth: miner push notifications --- consensus/ethash/algorithm_test.go | 2 +- consensus/ethash/consensus.go | 2 +- consensus/ethash/ethash.go | 18 +++--- consensus/ethash/ethash_test.go | 43 +++++++------- consensus/ethash/sealer.go | 88 ++++++++++++++++++---------- consensus/ethash/sealer_test.go | 115 +++++++++++++++++++++++++++++++++++++ 6 files changed, 206 insertions(+), 62 deletions(-) create mode 100644 consensus/ethash/sealer_test.go (limited to 'consensus') diff --git a/consensus/ethash/algorithm_test.go b/consensus/ethash/algorithm_test.go index e7625f7c0..db22cccd0 100644 --- a/consensus/ethash/algorithm_test.go +++ b/consensus/ethash/algorithm_test.go @@ -729,7 +729,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) { go func(idx int) { defer pend.Done() - ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}) + ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}, nil) defer ethash.Close() if err := ethash.VerifySeal(nil, block.Header()); err != nil { t.Errorf("proc %d: block verification failed: %v", idx, err) diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index eb0f73d98..e18a06d52 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -493,7 +493,7 @@ func (ethash *Ethash) VerifySeal(chain consensus.ChainReader, header *types.Head if !bytes.Equal(header.MixDigest[:], digest) { return errInvalidMixDigest } - target := new(big.Int).Div(maxUint256, header.Difficulty) + target := new(big.Int).Div(two256, header.Difficulty) if new(big.Int).SetBytes(result).Cmp(target) > 0 { return errInvalidPoW } diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 0cb3059b9..19c94deb6 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -45,11 +45,11 @@ import ( var ErrInvalidDumpMagic = errors.New("invalid dump magic") var ( - // maxUint256 is a big integer representing 2^256-1 - maxUint256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0)) + // two256 is a big integer representing 2^256 + two256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0)) // sharedEthash is a full instance that can be shared between multiple users. - sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal}) + sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal}, nil) // algorithmRevision is the data structure version used for file naming. algorithmRevision = 23 @@ -447,8 +447,10 @@ type Ethash struct { exitCh chan chan error // Notification channel to exiting backend threads } -// New creates a full sized ethash PoW scheme and starts a background thread for remote mining. -func New(config Config) *Ethash { +// New creates a full sized ethash PoW scheme and starts a background thread for +// remote mining, also optionally notifying a batch of remote services of new work +// packages. +func New(config Config, notify []string) *Ethash { if config.CachesInMem <= 0 { log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) config.CachesInMem = 1 @@ -473,13 +475,13 @@ func New(config Config) *Ethash { submitRateCh: make(chan *hashrate), exitCh: make(chan chan error), } - go ethash.remote() + go ethash.remote(notify) return ethash } // NewTester creates a small sized ethash PoW scheme useful only for testing // purposes. -func NewTester() *Ethash { +func NewTester(notify []string) *Ethash { ethash := &Ethash{ config: Config{PowMode: ModeTest}, caches: newlru("cache", 1, newCache), @@ -494,7 +496,7 @@ func NewTester() *Ethash { submitRateCh: make(chan *hashrate), exitCh: make(chan chan error), } - go ethash.remote() + go ethash.remote(notify) return ethash } diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index ccdd30fb0..87ac17c2b 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -32,17 +32,18 @@ import ( // Tests that ethash works correctly in test mode. func TestTestMode(t *testing.T) { - head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} - ethash := NewTester() + ethash := NewTester(nil) defer ethash.Close() - block, err := ethash.Seal(nil, types.NewBlockWithHeader(head), nil) + + block, err := ethash.Seal(nil, types.NewBlockWithHeader(header), nil) if err != nil { t.Fatalf("failed to seal block: %v", err) } - head.Nonce = types.EncodeNonce(block.Nonce()) - head.MixDigest = block.MixDigest() - if err := ethash.VerifySeal(nil, head); err != nil { + header.Nonce = types.EncodeNonce(block.Nonce()) + header.MixDigest = block.MixDigest() + if err := ethash.VerifySeal(nil, header); err != nil { t.Fatalf("unexpected verification error: %v", err) } } @@ -55,7 +56,7 @@ func TestCacheFileEvict(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(tmpdir) - e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}) + e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}, nil) defer e.Close() workers := 8 @@ -78,21 +79,21 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { if block < 0 { block = 0 } - head := &types.Header{Number: big.NewInt(block), Difficulty: big.NewInt(100)} - e.VerifySeal(nil, head) + header := &types.Header{Number: big.NewInt(block), Difficulty: big.NewInt(100)} + e.VerifySeal(nil, header) } } func TestRemoteSealer(t *testing.T) { - ethash := NewTester() + ethash := NewTester(nil) defer ethash.Close() + api := &API{ethash} if _, err := api.GetWork(); err != errNoMiningWork { t.Error("expect to return an error indicate there is no mining work") } - - head := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} - block := types.NewBlockWithHeader(head) + header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(header) // Push new work. ethash.Seal(nil, block, nil) @@ -108,16 +109,14 @@ func TestRemoteSealer(t *testing.T) { if res := api.SubmitWork(types.BlockNonce{}, block.HashNoNonce(), common.Hash{}); res { t.Error("expect to return false when submit a fake solution") } - // Push new block with same block number to replace the original one. - head = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} - block = types.NewBlockWithHeader(head) + header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} + block = types.NewBlockWithHeader(header) ethash.Seal(nil, block, nil) if work, err = api.GetWork(); err != nil || work[0] != block.HashNoNonce().Hex() { t.Error("expect to return the latest pushed work") } - // Push block with higher block number. newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)} newBlock := types.NewBlockWithHeader(newHead) @@ -130,19 +129,18 @@ func TestRemoteSealer(t *testing.T) { func TestHashRate(t *testing.T) { var ( - ethash = NewTester() - api = &API{ethash} hashrate = []hexutil.Uint64{100, 200, 300} expect uint64 ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")} ) - + ethash := NewTester(nil) defer ethash.Close() if tot := ethash.Hashrate(); tot != 0 { t.Error("expect the result should be zero") } + api := &API{ethash} for i := 0; i < len(hashrate); i += 1 { if res := api.SubmitHashRate(hashrate[i], ids[i]); !res { t.Error("remote miner submit hashrate failed") @@ -155,9 +153,8 @@ func TestHashRate(t *testing.T) { } func TestClosedRemoteSealer(t *testing.T) { - ethash := NewTester() - // Make sure exit channel has been listened - time.Sleep(1 * time.Second) + ethash := NewTester(nil) + time.Sleep(1 * time.Second) // ensure exit channel is listening ethash.Close() api := &API{ethash} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index a9449d406..03d848473 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -17,11 +17,14 @@ package ethash import ( + "bytes" crand "crypto/rand" + "encoding/json" "errors" "math" "math/big" "math/rand" + "net/http" "runtime" "sync" "time" @@ -109,7 +112,7 @@ func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan s var ( header = block.Header() hash = header.HashNoNonce().Bytes() - target = new(big.Int).Div(maxUint256, header.Difficulty) + target = new(big.Int).Div(two256, header.Difficulty) number = header.Number.Uint64() dataset = ethash.dataset(number) ) @@ -161,40 +164,65 @@ search: runtime.KeepAlive(dataset) } -// remote starts a standalone goroutine to handle remote mining related stuff. -func (ethash *Ethash) remote() { +// remote is a standalone goroutine to handle remote mining related stuff. +func (ethash *Ethash) remote(notify []string) { var ( - works = make(map[common.Hash]*types.Block) - rates = make(map[common.Hash]hashrate) - currentWork *types.Block + works = make(map[common.Hash]*types.Block) + rates = make(map[common.Hash]hashrate) + + currentBlock *types.Block + currentWork [3]string + + notifyTransport = &http.Transport{} + notifyClient = &http.Client{ + Transport: notifyTransport, + Timeout: time.Second, + } + notifyReqs = make([]*http.Request, len(notify)) ) + // notifyWork notifies all the specified mining endpoints of the availability of + // new work to be processed. + notifyWork := func() { + work := currentWork + blob, _ := json.Marshal(work) - // getWork returns a work package for external miner. + for i, url := range notify { + // Terminate any previously pending request and create the new work + if notifyReqs[i] != nil { + notifyTransport.CancelRequest(notifyReqs[i]) + } + notifyReqs[i], _ = http.NewRequest("POST", url, bytes.NewReader(blob)) + notifyReqs[i].Header.Set("Content-Type", "application/json") + + // Push the new work concurrently to all the remote nodes + go func(req *http.Request, url string) { + res, err := notifyClient.Do(req) + if err != nil { + log.Warn("Failed to notify remote miner", "err", err) + } else { + log.Trace("Notified remote miner", "miner", url, "hash", log.Lazy{Fn: func() common.Hash { return common.HexToHash(work[0]) }}, "target", work[2]) + res.Body.Close() + } + }(notifyReqs[i], url) + } + } + // makeWork creates a work package for external miner. // // The work package consists of 3 strings: // result[0], 32 bytes hex encoded current block header pow-hash // result[1], 32 bytes hex encoded seed hash used for DAG // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty - getWork := func() ([3]string, error) { - var res [3]string - if currentWork == nil { - return res, errNoMiningWork - } - res[0] = currentWork.HashNoNonce().Hex() - res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex() + makeWork := func(block *types.Block) { + hash := block.HashNoNonce() - // Calculate the "target" to be returned to the external sealer. - n := big.NewInt(1) - n.Lsh(n, 255) - n.Div(n, currentWork.Difficulty()) - n.Lsh(n, 1) - res[2] = common.BytesToHash(n.Bytes()).Hex() + currentWork[0] = hash.Hex() + currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex() + currentWork[2] = common.BytesToHash(new(big.Int).Div(two256, block.Difficulty()).Bytes()).Hex() // Trace the seal work fetched by remote sealer. - works[currentWork.HashNoNonce()] = currentWork - return res, nil + currentBlock = block + works[hash] = block } - // submitWork verifies the submitted pow solution, returning // whether the solution was accepted or not (not can be both a bad pow as well as // any other error, like no pending work or stale mining result). @@ -238,21 +266,23 @@ func (ethash *Ethash) remote() { for { select { case block := <-ethash.workCh: - if currentWork != nil && block.ParentHash() != currentWork.ParentHash() { + if currentBlock != nil && block.ParentHash() != currentBlock.ParentHash() { // Start new round mining, throw out all previous work. works = make(map[common.Hash]*types.Block) } // Update current work with new received block. // Note same work can be past twice, happens when changing CPU threads. - currentWork = block + makeWork(block) + + // Notify and requested URLs of the new work availability + notifyWork() case work := <-ethash.fetchWorkCh: // Return current mining work to remote miner. - miningWork, err := getWork() - if err != nil { - work.errc <- err + if currentBlock == nil { + work.errc <- errNoMiningWork } else { - work.res <- miningWork + work.res <- currentWork } case result := <-ethash.submitWorkCh: diff --git a/consensus/ethash/sealer_test.go b/consensus/ethash/sealer_test.go new file mode 100644 index 000000000..6d8a77049 --- /dev/null +++ b/consensus/ethash/sealer_test.go @@ -0,0 +1,115 @@ +package ethash + +import ( + "encoding/json" + "io/ioutil" + "math/big" + "net" + "net/http" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// Tests whether remote HTTP servers are correctly notified of new work. +func TestRemoteNotify(t *testing.T) { + // Start a simple webserver to capture notifications + sink := make(chan [3]string) + + server := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + blob, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Fatalf("failed to read miner notification: %v", err) + } + var work [3]string + if err := json.Unmarshal(blob, &work); err != nil { + t.Fatalf("failed to unmarshal miner notification: %v", err) + } + sink <- work + }), + } + // Open a custom listener to extract its local address + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("failed to open notification server: %v", err) + } + defer listener.Close() + + go server.Serve(listener) + + // Create the custom ethash engine + ethash := NewTester([]string{"http://" + listener.Addr().String()}) + defer ethash.Close() + + // Stream a work task and ensure the notification bubbles out + header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(header) + + ethash.Seal(nil, block, nil) + select { + case work := <-sink: + if want := header.HashNoNonce().Hex(); work[0] != want { + t.Errorf("work packet hash mismatch: have %s, want %s", work[0], want) + } + if want := common.BytesToHash(SeedHash(header.Number.Uint64())).Hex(); work[1] != want { + t.Errorf("work packet seed mismatch: have %s, want %s", work[1], want) + } + target := new(big.Int).Div(new(big.Int).Lsh(big.NewInt(1), 256), header.Difficulty) + if want := common.BytesToHash(target.Bytes()).Hex(); work[2] != want { + t.Errorf("work packet target mismatch: have %s, want %s", work[2], want) + } + case <-time.After(time.Second): + t.Fatalf("notification timed out") + } +} + +// Tests that pushing work packages fast to the miner doesn't cause any daa race +// issues in the notifications. +func TestRemoteMultiNotify(t *testing.T) { + // Start a simple webserver to capture notifications + sink := make(chan [3]string, 1024) + + server := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + blob, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Fatalf("failed to read miner notification: %v", err) + } + var work [3]string + if err := json.Unmarshal(blob, &work); err != nil { + t.Fatalf("failed to unmarshal miner notification: %v", err) + } + sink <- work + }), + } + // Open a custom listener to extract its local address + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("failed to open notification server: %v", err) + } + defer listener.Close() + + go server.Serve(listener) + + // Create the custom ethash engine + ethash := NewTester([]string{"http://" + listener.Addr().String()}) + defer ethash.Close() + + // Stream a lot of work task and ensure all the notifications bubble out + for i := 0; i < cap(sink); i++ { + header := &types.Header{Number: big.NewInt(int64(i)), Difficulty: big.NewInt(100)} + block := types.NewBlockWithHeader(header) + + ethash.Seal(nil, block, nil) + } + for i := 0; i < cap(sink); i++ { + select { + case <-sink: + case <-time.After(250 * time.Millisecond): + t.Fatalf("notification %d timed out", i) + } + } +} -- cgit