diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-08-21 16:43:37 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-21 16:43:37 +0800 |
commit | 2c816b5d636b8f7decd234582470a3d4c6b4a93a (patch) | |
tree | 5eff9d5f035dda8e3b2632ecce41f3c192e90f21 | |
parent | e8f99372159a89fb3128b870de1733a4777a5144 (diff) | |
download | tangerine-consensus-2c816b5d636b8f7decd234582470a3d4c6b4a93a.tar.gz tangerine-consensus-2c816b5d636b8f7decd234582470a3d4c6b4a93a.tar.zst tangerine-consensus-2c816b5d636b8f7decd234582470a3d4c6b4a93a.zip |
simulation: add simulation with scheduler (#71)
- Add new field in test.Event: HistoryIndex
HistoryIndex allow us to access them by their position in event history.
- Record local time in test.App when receiving events.
- Add statisitics module for slices of test.Event.
- add new command line utility *dexcon-simulation-with-scheduler
to verify the execution time of core.Consensus.
-rw-r--r-- | GNUmakefile | 3 | ||||
-rw-r--r-- | README.md | 19 | ||||
-rw-r--r-- | cmd/dexcon-simulation-with-scheduler/main.go | 100 | ||||
-rw-r--r-- | core/consensus_test.go | 2 | ||||
-rw-r--r-- | core/test/app.go | 74 | ||||
-rw-r--r-- | core/test/app_test.go | 10 | ||||
-rw-r--r-- | core/test/scheduler-event.go | 15 | ||||
-rw-r--r-- | core/test/scheduler.go | 13 | ||||
-rw-r--r-- | core/test/scheduler_test.go | 3 | ||||
-rw-r--r-- | integration_test/latency.go | 16 | ||||
-rw-r--r-- | integration_test/non-byzantine_test.go | 41 | ||||
-rw-r--r-- | integration_test/stats.go | 176 | ||||
-rw-r--r-- | integration_test/stats_test.go | 60 | ||||
-rw-r--r-- | integration_test/utils.go | 72 | ||||
-rw-r--r-- | integration_test/validator.go | 29 |
15 files changed, 544 insertions, 89 deletions
diff --git a/GNUmakefile b/GNUmakefile index 6eceb31..ed296e9 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -40,7 +40,8 @@ endef COMPONENTS = \ dexcon-simulation \ - dexcon-simulation-peer-server + dexcon-simulation-peer-server \ + dexcon-simulation-with-scheduler .PHONY: clean default @@ -42,6 +42,8 @@ make pre-submit ## Simulation +### Simulation with Nodes connected by HTTP + 1. Setup the configuration under `./test.toml` 2. Compile and install the cmd `dexon-simulation` @@ -49,8 +51,23 @@ make pre-submit make ``` -4. Run simulation: +3. Run simulation: ``` dexcon-simulation -config test.toml -init ``` + +### Simulation with test.Scheduler + +1. Setup the configuration under `./test.toml` +2. Compile and install the cmd `dexon-simulation-with-scheduler` + +``` +make +``` + +3. Run simulation with 10 workers: + +``` +dexcon-simulation-with-scheduler -config test.toml -workercount 10 +``` diff --git a/cmd/dexcon-simulation-with-scheduler/main.go b/cmd/dexcon-simulation-with-scheduler/main.go new file mode 100644 index 0000000..3ed71c3 --- /dev/null +++ b/cmd/dexcon-simulation-with-scheduler/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "flag" + "log" + "math" + "math/rand" + "os" + "runtime" + "runtime/pprof" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core/test" + integration "github.com/dexon-foundation/dexon-consensus-core/integration_test" + "github.com/dexon-foundation/dexon-consensus-core/simulation/config" +) + +var ( + configFile = flag.String("config", "", "path to simulation config file") + cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") + memprofile = flag.String("memprofile", "", "write memory profile to `file`") + workerCount = flag.Int("workercount", 1, "count of concurrent workers") +) + +func main() { + flag.Parse() + rand.Seed(time.Now().UnixNano()) + + if *configFile == "" { + log.Fatal("error: no configuration file specified") + } + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal("could not create CPU profile: ", err) + } + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatal("could not start CPU profile: ", err) + } + defer pprof.StopCPUProfile() + } + + cfg, err := config.Read(*configFile) + if err != nil { + log.Fatal("unable to read config: ", err) + } + // Setup latencies, validators. + networkLatency := &integration.NormalLatencyModel{ + Sigma: cfg.Networking.Sigma, + Mean: cfg.Networking.Mean, + } + proposingLatency := &integration.NormalLatencyModel{ + Sigma: cfg.Validator.ProposeIntervalSigma, + Mean: cfg.Validator.ProposeIntervalMean, + } + // Setup validators and other consensus related stuffs. + apps, dbs, validators, err := integration.PrepareValidators( + cfg.Validator.Num, networkLatency, proposingLatency) + if err != nil { + log.Fatal("could not setup validators: ", err) + } + blockPerValidator := int(math.Ceil( + float64(cfg.Validator.MaxBlock) / float64(cfg.Validator.Num))) + sch := test.NewScheduler( + test.NewStopByConfirmedBlocks(blockPerValidator, apps, dbs)) + for vID, v := range validators { + sch.RegisterEventHandler(vID, v) + if err = sch.Seed(integration.NewProposeBlockEvent( + vID, time.Now().UTC())); err != nil { + + log.Fatal("unable to set seed simulation events: ", err) + } + } + // Run the simulation. + sch.Run(*workerCount) + if err = integration.VerifyApps(apps); err != nil { + log.Fatal("consensus result is not incorrect: ", err) + } + // Prepare statistics. + stats, err := integration.NewStats(sch.CloneExecutionHistory(), apps) + if err != nil { + log.Fatal("could not generate statistics: ", err) + } + if *memprofile != "" { + f, err := os.Create(*memprofile) + if err != nil { + log.Fatal("could not create memory profile: ", err) + } + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + log.Fatal("could not write memory profile: ", err) + } + f.Close() + } + + log.Printf("BPS: %v\n", stats.BPS) + log.Printf("ExecutionTime: %v\n", stats.ExecutionTime) + log.Printf("Prepare: %v\n", time.Duration(stats.All.PrepareExecLatency)) + log.Printf("Process: %v\n", time.Duration(stats.All.ProcessExecLatency)) +} diff --git a/core/consensus_test.go b/core/consensus_test.go index 2887d66..cd7ff02 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -243,7 +243,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { // its ConsensusTimestamp is not interpolated. t, err := getMedianTime(b11) req.Nil(err) - req.Equal(t, app.Delivered[b11.Hash]) + req.Equal(t, app.Delivered[b11.Hash].ConsensusTime) } for _, obj := range objs { app := *obj.app diff --git a/core/test/app.go b/core/test/app.go index 0242ca5..ddce31a 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -52,29 +52,47 @@ var ( "mismatch total ordering and delivered sequence") ) -type totalOrderDeliver struct { +// AppAckedRecord caches information when this application received +// a strongly-acked notification. +type AppAckedRecord struct { + When time.Time +} + +// AppTotalOrderRecord caches information when this application received +// a total-ordering deliver notification. +type AppTotalOrderRecord struct { BlockHashes common.Hashes Early bool + When time.Time +} + +// AppDeliveredRecord caches information when this application received +// a block delivered notification. +type AppDeliveredRecord struct { + ConsensusTime time.Time + When time.Time } // App implements Application interface for testing purpose. type App struct { - Acked map[common.Hash]struct{} - ackedLock sync.RWMutex - TotalOrdered []*totalOrderDeliver - totalOrderedLock sync.RWMutex - Delivered map[common.Hash]time.Time - DeliverSequence common.Hashes - deliveredLock sync.RWMutex + Acked map[common.Hash]*AppAckedRecord + ackedLock sync.RWMutex + TotalOrdered []*AppTotalOrderRecord + TotalOrderedByHash map[common.Hash]*AppTotalOrderRecord + totalOrderedLock sync.RWMutex + Delivered map[common.Hash]*AppDeliveredRecord + DeliverSequence common.Hashes + deliveredLock sync.RWMutex } // NewApp constructs a TestApp instance. func NewApp() *App { return &App{ - Acked: make(map[common.Hash]struct{}), - TotalOrdered: []*totalOrderDeliver{}, - Delivered: make(map[common.Hash]time.Time), - DeliverSequence: common.Hashes{}, + Acked: make(map[common.Hash]*AppAckedRecord), + TotalOrdered: []*AppTotalOrderRecord{}, + TotalOrderedByHash: make(map[common.Hash]*AppTotalOrderRecord), + Delivered: make(map[common.Hash]*AppDeliveredRecord), + DeliverSequence: common.Hashes{}, } } @@ -83,7 +101,7 @@ func (app *App) StronglyAcked(blockHash common.Hash) { app.ackedLock.Lock() defer app.ackedLock.Unlock() - app.Acked[blockHash] = struct{}{} + app.Acked[blockHash] = &AppAckedRecord{When: time.Now().UTC()} } // TotalOrderingDeliver implements Application interface. @@ -91,10 +109,18 @@ func (app *App) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { app.totalOrderedLock.Lock() defer app.totalOrderedLock.Unlock() - app.TotalOrdered = append(app.TotalOrdered, &totalOrderDeliver{ + rec := &AppTotalOrderRecord{ BlockHashes: blockHashes, Early: early, - }) + When: time.Now().UTC(), + } + app.TotalOrdered = append(app.TotalOrdered, rec) + for _, h := range blockHashes { + if _, exists := app.TotalOrderedByHash[h]; exists { + panic(fmt.Errorf("deliver duplicated blocks from total ordering")) + } + app.TotalOrderedByHash[h] = rec + } } // DeliverBlock implements Application interface. @@ -102,7 +128,10 @@ func (app *App) DeliverBlock(blockHash common.Hash, timestamp time.Time) { app.deliveredLock.Lock() defer app.deliveredLock.Unlock() - app.Delivered[blockHash] = timestamp + app.Delivered[blockHash] = &AppDeliveredRecord{ + ConsensusTime: timestamp, + When: time.Now().UTC(), + } app.DeliverSequence = append(app.DeliverSequence, blockHash) } @@ -132,7 +161,7 @@ func (app *App) Compare(other *App) error { if hOther != h { return ErrMismatchBlockHashSequence } - if app.Delivered[h] != other.Delivered[h] { + if app.Delivered[h].ConsensusTime != other.Delivered[h].ConsensusTime { return ErrMismatchConsensusTime } } @@ -160,16 +189,17 @@ func (app *App) Verify() error { if _, acked := app.Acked[h]; !acked { return ErrDeliveredBlockNotAcked } - t, exists := app.Delivered[h] + rec, exists := app.Delivered[h] if !exists { return ErrApplicationIntegrityFailed } // Make sure the consensus time is incremental. - ok := prevTime.Before(t) || prevTime.Equal(t) + ok := prevTime.Before(rec.ConsensusTime) || + prevTime.Equal(rec.ConsensusTime) if !ok { return ErrConsensusTimestampOutOfOrder } - prevTime = t + prevTime = rec.ConsensusTime } // Make sure the order of delivered and total ordering are the same by // comparing the concated string. @@ -178,8 +208,8 @@ func (app *App) Verify() error { hashSequenceIdx := 0 Loop: - for _, totalOrderDeliver := range app.TotalOrdered { - for _, h := range totalOrderDeliver.BlockHashes { + for _, rec := range app.TotalOrdered { + for _, h := range rec.BlockHashes { if hashSequenceIdx >= len(app.DeliverSequence) { break Loop } diff --git a/core/test/app_test.go b/core/test/app_test.go index 285773e..f4c4a74 100644 --- a/core/test/app_test.go +++ b/core/test/app_test.go @@ -11,18 +11,18 @@ import ( type AppTestSuite struct { suite.Suite - to1, to2, to3 *totalOrderDeliver + to1, to2, to3 *AppTotalOrderRecord } func (s *AppTestSuite) SetupSuite() { - s.to1 = &totalOrderDeliver{ + s.to1 = &AppTotalOrderRecord{ BlockHashes: common.Hashes{ common.NewRandomHash(), common.NewRandomHash(), }, Early: false, } - s.to2 = &totalOrderDeliver{ + s.to2 = &AppTotalOrderRecord{ BlockHashes: common.Hashes{ common.NewRandomHash(), common.NewRandomHash(), @@ -30,7 +30,7 @@ func (s *AppTestSuite) SetupSuite() { }, Early: false, } - s.to3 = &totalOrderDeliver{ + s.to3 = &AppTotalOrderRecord{ BlockHashes: common.Hashes{ common.NewRandomHash(), }, @@ -39,7 +39,7 @@ func (s *AppTestSuite) SetupSuite() { } func (s *AppTestSuite) setupAppByTotalOrderDeliver( - app *App, to *totalOrderDeliver) { + app *App, to *AppTotalOrderRecord) { for _, h := range to.BlockHashes { app.StronglyAcked(h) diff --git a/core/test/scheduler-event.go b/core/test/scheduler-event.go index 60411b4..85968c5 100644 --- a/core/test/scheduler-event.go +++ b/core/test/scheduler-event.go @@ -25,6 +25,8 @@ import ( // Event defines a scheduler event. type Event struct { + // HistoryIndex is the index of this event in history. + HistoryIndex int // ValidatorID is the ID of handler that this event deginated to. ValidatorID types.ValidatorID // Time is the expected execution time of this event. @@ -33,9 +35,8 @@ type Event struct { ExecError error // Payload is application specific data carried by this event. Payload interface{} - // ParentTime is the time of parent event, this field is essential when - // we need to calculate the latency the handler assigned. - ParentTime time.Time + // ParentHistoryIndex is the index of parent event in history. + ParentHistoryIndex int // ExecInterval is the latency to execute this event ExecInterval time.Duration } @@ -69,8 +70,10 @@ func NewEvent( vID types.ValidatorID, when time.Time, payload interface{}) *Event { return &Event{ - ValidatorID: vID, - Time: when, - Payload: payload, + HistoryIndex: -1, + ParentHistoryIndex: -1, + ValidatorID: vID, + Time: when, + Payload: payload, } } diff --git a/core/test/scheduler.go b/core/test/scheduler.go index 023d09f..6a3a40a 100644 --- a/core/test/scheduler.go +++ b/core/test/scheduler.go @@ -165,12 +165,6 @@ func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) { if sch.stopper.ShouldStop(e.ValidatorID) { sch.cancelFunc() } - // Include the execution interval of parent event to the expected time - // to execute child events. - for _, newEvent := range newEvents { - newEvent.ParentTime = e.Time - newEvent.Time = newEvent.Time.Add(e.ExecInterval) - } return newEvents }() // Record executed events as history. @@ -178,8 +172,15 @@ func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) { sch.historyLock.Lock() defer sch.historyLock.Unlock() + e.HistoryIndex = len(sch.history) sch.history = append(sch.history, e) }() + // Include the execution interval of parent event to the expected time + // to execute child events. + for _, newEvent := range newEvents { + newEvent.ParentHistoryIndex = e.HistoryIndex + newEvent.Time = newEvent.Time.Add(e.ExecInterval) + } // Add derivated events back to event queue. func() { sch.eventsLock.Lock() diff --git a/core/test/scheduler_test.go b/core/test/scheduler_test.go index c67240f..5aef36e 100644 --- a/core/test/scheduler_test.go +++ b/core/test/scheduler_test.go @@ -165,7 +165,8 @@ func (s *SchedulerTestSuite) TestChildEvent() { req.True(e.Time.Sub(curEvent.Time) >= 1300*time.Millisecond) // Make sure ParentTime field is set and is equal to parent event's // time. - req.Equal(e.ParentTime, curEvent.Time) + req.NotEqual(-1, e.ParentHistoryIndex) + req.Equal(e.ParentHistoryIndex, curEvent.HistoryIndex) curEvent = e } } diff --git a/integration_test/latency.go b/integration_test/latency.go index 383d069..8f06084 100644 --- a/integration_test/latency.go +++ b/integration_test/latency.go @@ -28,17 +28,27 @@ type LatencyModel interface { Delay() time.Duration } -// normalLatencyModel would return latencies in normal distribution. -type normalLatencyModel struct { +// NormalLatencyModel would return latencies in normal distribution. +type NormalLatencyModel struct { Sigma float64 Mean float64 } // Delay implements LatencyModel interface. -func (m *normalLatencyModel) Delay() time.Duration { +func (m *NormalLatencyModel) Delay() time.Duration { delay := rand.NormFloat64()*m.Sigma + m.Mean if delay < 0 { delay = m.Sigma / 2 } return time.Duration(delay) * time.Millisecond } + +// FixedLatencyModel return fixed latencies. +type FixedLatencyModel struct { + Latency float64 +} + +// Delay implements LatencyModel interface. +func (m *FixedLatencyModel) Delay() time.Duration { + return time.Duration(m.Latency) * time.Millisecond +} diff --git a/integration_test/non-byzantine_test.go b/integration_test/non-byzantine_test.go index 111dcd0..827d2ad 100644 --- a/integration_test/non-byzantine_test.go +++ b/integration_test/non-byzantine_test.go @@ -33,11 +33,11 @@ type NonByzantineTestSuite struct { func (s *NonByzantineTestSuite) TestNonByzantine() { var ( - networkLatency = &normalLatencyModel{ + networkLatency = &NormalLatencyModel{ Sigma: 20, Mean: 250, } - proposingLatency = &normalLatencyModel{ + proposingLatency = &NormalLatencyModel{ Sigma: 30, Mean: 500, } @@ -46,42 +46,19 @@ func (s *NonByzantineTestSuite) TestNonByzantine() { req = s.Require() ) - gov, err := test.NewGovernance(25, 700) + apps, dbs, validators, err := PrepareValidators( + 25, networkLatency, proposingLatency) req.Nil(err) now := time.Now().UTC() - for vID := range gov.GetValidatorSet() { - apps[vID] = test.NewApp() - - db, err := blockdb.NewMemBackedBlockDB() - req.Nil(err) - dbs[vID] = db - } - stopper := test.NewStopByConfirmedBlocks(50, apps, dbs) - sch := test.NewScheduler(stopper) - for vID := range gov.GetValidatorSet() { - key, err := gov.GetPrivateKey(vID) - req.Nil(err) - v := newValidator( - apps[vID], - gov, - dbs[vID], - key, - vID, - networkLatency, - proposingLatency) + sch := test.NewScheduler(test.NewStopByConfirmedBlocks(50, apps, dbs)) + for vID, v := range validators { sch.RegisterEventHandler(vID, v) - req.Nil(sch.Seed(newProposeBlockEvent(vID, now))) + req.Nil(sch.Seed(NewProposeBlockEvent(vID, now))) } sch.Run(10) // Check results by comparing test.App instances. - for vFrom := range gov.GetValidatorSet() { - req.Nil(apps[vFrom].Verify()) - for vTo := range gov.GetValidatorSet() { - if vFrom == vTo { - continue - } - req.Nil(apps[vFrom].Compare(apps[vTo])) - } + if err = VerifyApps(apps); err != nil { + panic(err) } } diff --git a/integration_test/stats.go b/integration_test/stats.go new file mode 100644 index 0000000..ae8ded4 --- /dev/null +++ b/integration_test/stats.go @@ -0,0 +1,176 @@ +package integration + +import ( + "fmt" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core/test" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// Errors when calculating statistics for events. +var ( + ErrUnknownEvent = fmt.Errorf("unknown event") + ErrUnknownConsensusEventType = fmt.Errorf("unknown consensus event type") +) + +// StatsSet represents accumulatee result of a group of related events +// (ex. All events from one validator). +type StatsSet struct { + ProposedBlockCount int + ReceivedBlockCount int + StronglyAckedBlockCount int + TotalOrderedBlockCount int + DeliveredBlockCount int + ProposingLatency time.Duration + ReceivingLatency time.Duration + PrepareExecLatency time.Duration + ProcessExecLatency time.Duration +} + +// newBlockProposeEvent accumulates a block proposing event. +func (s *StatsSet) newBlockProposeEvent( + e *test.Event, payload *consensusEventPayload, history []*test.Event) { + + // Find previous block proposing event. + if e.ParentHistoryIndex != -1 { + parentEvent := history[e.ParentHistoryIndex] + s.ProposingLatency += + e.Time.Sub(parentEvent.Time) - parentEvent.ExecInterval + } + s.PrepareExecLatency += e.ExecInterval + s.ProposedBlockCount++ +} + +// newBlockReceiveEvent accumulates a block received event. +func (s *StatsSet) newBlockReceiveEvent( + e *test.Event, + payload *consensusEventPayload, + history []*test.Event, + app *test.App) { + + // Find previous block proposing event. + parentEvent := history[e.ParentHistoryIndex] + s.ReceivingLatency += + e.Time.Sub(parentEvent.Time) - parentEvent.ExecInterval + s.ProcessExecLatency += e.ExecInterval + s.ReceivedBlockCount++ + + // Find statistics from test.App + block := payload.PiggyBack.(*types.Block) + app.Check(func(app *test.App) { + // Is this block strongly acked? + if _, exists := app.Acked[block.Hash]; !exists { + return + } + s.StronglyAckedBlockCount++ + + // Is this block total ordered? + if _, exists := app.TotalOrderedByHash[block.Hash]; !exists { + return + } + s.TotalOrderedBlockCount++ + + // Is this block delivered? + if _, exists := app.Delivered[block.Hash]; !exists { + return + } + s.DeliveredBlockCount++ + }) +} + +// done would divide the latencies we cached with related event count. This way +// to calculate average latency is more accurate. +func (s *StatsSet) done(validatorCount int) { + s.ProposingLatency /= time.Duration(s.ProposedBlockCount - validatorCount) + s.ReceivingLatency /= time.Duration(s.ReceivedBlockCount) + s.PrepareExecLatency /= time.Duration(s.ProposedBlockCount) + s.ProcessExecLatency /= time.Duration(s.ReceivedBlockCount) +} + +// Stats is statistics of a slice of test.Event generated by validators. +type Stats struct { + ByValidator map[types.ValidatorID]*StatsSet + All *StatsSet + BPS float64 + ExecutionTime time.Duration +} + +// NewStats constructs an Stats instance by providing a slice of +// test.Event. +func NewStats( + history []*test.Event, apps map[types.ValidatorID]*test.App) ( + stats *Stats, err error) { + + stats = &Stats{ + ByValidator: make(map[types.ValidatorID]*StatsSet), + All: &StatsSet{}, + } + if err = stats.calculate(history, apps); err != nil { + stats = nil + } + stats.summary(history) + return +} + +func (stats *Stats) calculate( + history []*test.Event, apps map[types.ValidatorID]*test.App) error { + + defer func() { + stats.All.done(len(stats.ByValidator)) + for _, set := range stats.ByValidator { + set.done(1) + } + }() + + for _, e := range history { + payload, ok := e.Payload.(*consensusEventPayload) + if !ok { + return ErrUnknownEvent + } + switch payload.Type { + case evtProposeBlock: + stats.All.newBlockProposeEvent( + e, payload, history) + stats.getStatsSetByValidator(e.ValidatorID).newBlockProposeEvent( + e, payload, history) + case evtReceiveBlock: + stats.All.newBlockReceiveEvent( + e, payload, history, apps[e.ValidatorID]) + stats.getStatsSetByValidator(e.ValidatorID).newBlockReceiveEvent( + e, payload, history, apps[e.ValidatorID]) + default: + return ErrUnknownConsensusEventType + } + } + return nil +} + +func (stats *Stats) getStatsSetByValidator( + vID types.ValidatorID) (s *StatsSet) { + + s = stats.ByValidator[vID] + if s == nil { + s = &StatsSet{} + stats.ByValidator[vID] = s + } + return +} + +func (stats *Stats) summary(history []*test.Event) { + // Find average delivered block count among all blocks. + totalConfirmedBlocks := 0 + for _, s := range stats.ByValidator { + totalConfirmedBlocks += s.DeliveredBlockCount + } + averageConfirmedBlocks := totalConfirmedBlocks / len(stats.ByValidator) + + // Find execution time. + // Note: it's a simplified way to calculate the execution time: + // the latest event might not be at the end of history when + // the number of worker routine is larger than 1. + stats.ExecutionTime = history[len(history)-1].Time.Sub(history[0].Time) + // Calculate BPS. + latencyAsSecond := stats.ExecutionTime.Nanoseconds() / (1000 * 1000 * 1000) + stats.BPS = float64(averageConfirmedBlocks) / float64(latencyAsSecond) +} diff --git a/integration_test/stats_test.go b/integration_test/stats_test.go new file mode 100644 index 0000000..e0be126 --- /dev/null +++ b/integration_test/stats_test.go @@ -0,0 +1,60 @@ +package integration + +import ( + "testing" + "time" + + "github.com/dexon-foundation/dexon-consensus-core/core/test" + "github.com/stretchr/testify/suite" +) + +type EventStatsTestSuite struct { + suite.Suite +} + +func (s *EventStatsTestSuite) TestCalculate() { + // Setup a test with fixed latency in proposing and network, + // and make sure the calculated statistics is expected. + var ( + networkLatency = &FixedLatencyModel{Latency: 100} + proposingLatency = &FixedLatencyModel{Latency: 300} + req = s.Require() + ) + + apps, dbs, validators, err := PrepareValidators( + 7, networkLatency, proposingLatency) + req.Nil(err) + + sch := test.NewScheduler(test.NewStopByConfirmedBlocks(50, apps, dbs)) + now := time.Now().UTC() + for vID, v := range validators { + sch.RegisterEventHandler(vID, v) + req.Nil(sch.Seed(NewProposeBlockEvent(vID, now))) + } + sch.Run(10) + req.Nil(VerifyApps(apps)) + // Check total statistics result. + stats, err := NewStats(sch.CloneExecutionHistory(), apps) + req.Nil(err) + req.True(stats.All.ProposedBlockCount > 350) + req.True(stats.All.ReceivedBlockCount > 350) + req.True(stats.All.StronglyAckedBlockCount > 350) + req.True(stats.All.TotalOrderedBlockCount >= 350) + req.True(stats.All.DeliveredBlockCount >= 350) + req.Equal(stats.All.ProposingLatency, 300*time.Millisecond) + req.Equal(stats.All.ReceivingLatency, 100*time.Millisecond) + // Check statistics for each validator. + for _, vStats := range stats.ByValidator { + req.True(vStats.ProposedBlockCount > 50) + req.True(vStats.ReceivedBlockCount > 50) + req.True(vStats.StronglyAckedBlockCount > 50) + req.True(vStats.TotalOrderedBlockCount >= 50) + req.True(vStats.DeliveredBlockCount >= 50) + req.Equal(vStats.ProposingLatency, 300*time.Millisecond) + req.Equal(vStats.ReceivingLatency, 100*time.Millisecond) + } +} + +func TestEventStats(t *testing.T) { + suite.Run(t, new(EventStatsTestSuite)) +} diff --git a/integration_test/utils.go b/integration_test/utils.go new file mode 100644 index 0000000..c1eafb7 --- /dev/null +++ b/integration_test/utils.go @@ -0,0 +1,72 @@ +package integration + +import ( + "github.com/dexon-foundation/dexon-consensus-core/blockdb" + "github.com/dexon-foundation/dexon-consensus-core/core/test" + "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon-consensus-core/crypto" +) + +// PrepareValidators setups validators for testing. +func PrepareValidators( + validatorCount int, + networkLatency, proposingLatency LatencyModel) ( + apps map[types.ValidatorID]*test.App, + dbs map[types.ValidatorID]blockdb.BlockDatabase, + validators map[types.ValidatorID]*Validator, + err error) { + + var ( + db blockdb.BlockDatabase + key crypto.PrivateKey + ) + + apps = make(map[types.ValidatorID]*test.App) + dbs = make(map[types.ValidatorID]blockdb.BlockDatabase) + validators = make(map[types.ValidatorID]*Validator) + + gov, err := test.NewGovernance(validatorCount, 700) + if err != nil { + return + } + for vID := range gov.GetValidatorSet() { + apps[vID] = test.NewApp() + + if db, err = blockdb.NewMemBackedBlockDB(); err != nil { + return + } + dbs[vID] = db + } + for vID := range gov.GetValidatorSet() { + if key, err = gov.GetPrivateKey(vID); err != nil { + return + } + validators[vID] = NewValidator( + apps[vID], + gov, + dbs[vID], + key, + vID, + networkLatency, + proposingLatency) + } + return +} + +// VerifyApps is a helper to check delivery between test.Apps +func VerifyApps(apps map[types.ValidatorID]*test.App) (err error) { + for vFrom, fromApp := range apps { + if err = fromApp.Verify(); err != nil { + return + } + for vTo, toApp := range apps { + if vFrom == vTo { + continue + } + if err = fromApp.Compare(toApp); err != nil { + return + } + } + } + return +} diff --git a/integration_test/validator.go b/integration_test/validator.go index 00ffff2..fd7a7ad 100644 --- a/integration_test/validator.go +++ b/integration_test/validator.go @@ -41,13 +41,17 @@ type consensusEventPayload struct { PiggyBack interface{} } -func newProposeBlockEvent(vID types.ValidatorID, when time.Time) *test.Event { +// NewProposeBlockEvent constructs an test.Event that would trigger +// block proposing. +func NewProposeBlockEvent(vID types.ValidatorID, when time.Time) *test.Event { return test.NewEvent(vID, when, &consensusEventPayload{ Type: evtProposeBlock, }) } -func newReceiveBlockEvent( +// NewReceiveBlockEvent constructs an test.Event that would trigger +// block received. +func NewReceiveBlockEvent( vID types.ValidatorID, when time.Time, block *types.Block) *test.Event { return test.NewEvent(vID, when, &consensusEventPayload{ @@ -56,7 +60,8 @@ func newReceiveBlockEvent( }) } -type validator struct { +// Validator is designed to work with test.Scheduler. +type Validator struct { ID types.ValidatorID cons *core.Consensus gov core.Governance @@ -64,16 +69,17 @@ type validator struct { proposingLatency LatencyModel } -func newValidator( +// NewValidator constructs an instance of Validator. +func NewValidator( app core.Application, gov core.Governance, db blockdb.BlockDatabase, privateKey crypto.PrivateKey, vID types.ValidatorID, networkLatency LatencyModel, - proposingLatency LatencyModel) *validator { + proposingLatency LatencyModel) *Validator { - return &validator{ + return &Validator{ ID: vID, gov: gov, networkLatency: networkLatency, @@ -83,7 +89,8 @@ func newValidator( } } -func (v *validator) Handle(e *test.Event) (events []*test.Event) { +// Handle implements test.EventHandler interface. +func (v *Validator) Handle(e *test.Event) (events []*test.Event) { payload := e.Payload.(*consensusEventPayload) switch payload.Type { case evtProposeBlock: @@ -96,7 +103,7 @@ func (v *validator) Handle(e *test.Event) (events []*test.Event) { return } -func (v *validator) handleProposeBlock(when time.Time, piggyback interface{}) ( +func (v *Validator) handleProposeBlock(when time.Time, piggyback interface{}) ( events []*test.Event, err error) { b := &types.Block{ProposerID: v.ID} @@ -111,16 +118,16 @@ func (v *validator) handleProposeBlock(when time.Time, piggyback interface{}) ( if vID == v.ID { continue } - events = append(events, newReceiveBlockEvent( + events = append(events, NewReceiveBlockEvent( vID, when.Add(v.networkLatency.Delay()), b.Clone())) } // Create next 'block proposing' event for this validators. - events = append(events, newProposeBlockEvent( + events = append(events, NewProposeBlockEvent( v.ID, when.Add(v.proposingLatency.Delay()))) return } -func (v *validator) handleReceiveBlock(piggyback interface{}) ( +func (v *Validator) handleReceiveBlock(piggyback interface{}) ( events []*test.Event, err error) { err = v.cons.ProcessBlock(piggyback.(*types.Block)) |