aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-08-21 16:43:37 +0800
committerGitHub <noreply@github.com>2018-08-21 16:43:37 +0800
commit2c816b5d636b8f7decd234582470a3d4c6b4a93a (patch)
tree5eff9d5f035dda8e3b2632ecce41f3c192e90f21
parente8f99372159a89fb3128b870de1733a4777a5144 (diff)
downloadtangerine-consensus-2c816b5d636b8f7decd234582470a3d4c6b4a93a.tar.gz
tangerine-consensus-2c816b5d636b8f7decd234582470a3d4c6b4a93a.tar.zst
tangerine-consensus-2c816b5d636b8f7decd234582470a3d4c6b4a93a.zip
simulation: add simulation with scheduler (#71)
- Add new field in test.Event: HistoryIndex HistoryIndex allow us to access them by their position in event history. - Record local time in test.App when receiving events. - Add statisitics module for slices of test.Event. - add new command line utility *dexcon-simulation-with-scheduler to verify the execution time of core.Consensus.
-rw-r--r--GNUmakefile3
-rw-r--r--README.md19
-rw-r--r--cmd/dexcon-simulation-with-scheduler/main.go100
-rw-r--r--core/consensus_test.go2
-rw-r--r--core/test/app.go74
-rw-r--r--core/test/app_test.go10
-rw-r--r--core/test/scheduler-event.go15
-rw-r--r--core/test/scheduler.go13
-rw-r--r--core/test/scheduler_test.go3
-rw-r--r--integration_test/latency.go16
-rw-r--r--integration_test/non-byzantine_test.go41
-rw-r--r--integration_test/stats.go176
-rw-r--r--integration_test/stats_test.go60
-rw-r--r--integration_test/utils.go72
-rw-r--r--integration_test/validator.go29
15 files changed, 544 insertions, 89 deletions
diff --git a/GNUmakefile b/GNUmakefile
index 6eceb31..ed296e9 100644
--- a/GNUmakefile
+++ b/GNUmakefile
@@ -40,7 +40,8 @@ endef
COMPONENTS = \
dexcon-simulation \
- dexcon-simulation-peer-server
+ dexcon-simulation-peer-server \
+ dexcon-simulation-with-scheduler
.PHONY: clean default
diff --git a/README.md b/README.md
index 02a7d59..d7f54ac 100644
--- a/README.md
+++ b/README.md
@@ -42,6 +42,8 @@ make pre-submit
## Simulation
+### Simulation with Nodes connected by HTTP
+
1. Setup the configuration under `./test.toml`
2. Compile and install the cmd `dexon-simulation`
@@ -49,8 +51,23 @@ make pre-submit
make
```
-4. Run simulation:
+3. Run simulation:
```
dexcon-simulation -config test.toml -init
```
+
+### Simulation with test.Scheduler
+
+1. Setup the configuration under `./test.toml`
+2. Compile and install the cmd `dexon-simulation-with-scheduler`
+
+```
+make
+```
+
+3. Run simulation with 10 workers:
+
+```
+dexcon-simulation-with-scheduler -config test.toml -workercount 10
+```
diff --git a/cmd/dexcon-simulation-with-scheduler/main.go b/cmd/dexcon-simulation-with-scheduler/main.go
new file mode 100644
index 0000000..3ed71c3
--- /dev/null
+++ b/cmd/dexcon-simulation-with-scheduler/main.go
@@ -0,0 +1,100 @@
+package main
+
+import (
+ "flag"
+ "log"
+ "math"
+ "math/rand"
+ "os"
+ "runtime"
+ "runtime/pprof"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/test"
+ integration "github.com/dexon-foundation/dexon-consensus-core/integration_test"
+ "github.com/dexon-foundation/dexon-consensus-core/simulation/config"
+)
+
+var (
+ configFile = flag.String("config", "", "path to simulation config file")
+ cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
+ memprofile = flag.String("memprofile", "", "write memory profile to `file`")
+ workerCount = flag.Int("workercount", 1, "count of concurrent workers")
+)
+
+func main() {
+ flag.Parse()
+ rand.Seed(time.Now().UnixNano())
+
+ if *configFile == "" {
+ log.Fatal("error: no configuration file specified")
+ }
+ if *cpuprofile != "" {
+ f, err := os.Create(*cpuprofile)
+ if err != nil {
+ log.Fatal("could not create CPU profile: ", err)
+ }
+ if err := pprof.StartCPUProfile(f); err != nil {
+ log.Fatal("could not start CPU profile: ", err)
+ }
+ defer pprof.StopCPUProfile()
+ }
+
+ cfg, err := config.Read(*configFile)
+ if err != nil {
+ log.Fatal("unable to read config: ", err)
+ }
+ // Setup latencies, validators.
+ networkLatency := &integration.NormalLatencyModel{
+ Sigma: cfg.Networking.Sigma,
+ Mean: cfg.Networking.Mean,
+ }
+ proposingLatency := &integration.NormalLatencyModel{
+ Sigma: cfg.Validator.ProposeIntervalSigma,
+ Mean: cfg.Validator.ProposeIntervalMean,
+ }
+ // Setup validators and other consensus related stuffs.
+ apps, dbs, validators, err := integration.PrepareValidators(
+ cfg.Validator.Num, networkLatency, proposingLatency)
+ if err != nil {
+ log.Fatal("could not setup validators: ", err)
+ }
+ blockPerValidator := int(math.Ceil(
+ float64(cfg.Validator.MaxBlock) / float64(cfg.Validator.Num)))
+ sch := test.NewScheduler(
+ test.NewStopByConfirmedBlocks(blockPerValidator, apps, dbs))
+ for vID, v := range validators {
+ sch.RegisterEventHandler(vID, v)
+ if err = sch.Seed(integration.NewProposeBlockEvent(
+ vID, time.Now().UTC())); err != nil {
+
+ log.Fatal("unable to set seed simulation events: ", err)
+ }
+ }
+ // Run the simulation.
+ sch.Run(*workerCount)
+ if err = integration.VerifyApps(apps); err != nil {
+ log.Fatal("consensus result is not incorrect: ", err)
+ }
+ // Prepare statistics.
+ stats, err := integration.NewStats(sch.CloneExecutionHistory(), apps)
+ if err != nil {
+ log.Fatal("could not generate statistics: ", err)
+ }
+ if *memprofile != "" {
+ f, err := os.Create(*memprofile)
+ if err != nil {
+ log.Fatal("could not create memory profile: ", err)
+ }
+ runtime.GC() // get up-to-date statistics
+ if err := pprof.WriteHeapProfile(f); err != nil {
+ log.Fatal("could not write memory profile: ", err)
+ }
+ f.Close()
+ }
+
+ log.Printf("BPS: %v\n", stats.BPS)
+ log.Printf("ExecutionTime: %v\n", stats.ExecutionTime)
+ log.Printf("Prepare: %v\n", time.Duration(stats.All.PrepareExecLatency))
+ log.Printf("Process: %v\n", time.Duration(stats.All.ProcessExecLatency))
+}
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 2887d66..cd7ff02 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -243,7 +243,7 @@ func (s *ConsensusTestSuite) TestSimpleDeliverBlock() {
// its ConsensusTimestamp is not interpolated.
t, err := getMedianTime(b11)
req.Nil(err)
- req.Equal(t, app.Delivered[b11.Hash])
+ req.Equal(t, app.Delivered[b11.Hash].ConsensusTime)
}
for _, obj := range objs {
app := *obj.app
diff --git a/core/test/app.go b/core/test/app.go
index 0242ca5..ddce31a 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -52,29 +52,47 @@ var (
"mismatch total ordering and delivered sequence")
)
-type totalOrderDeliver struct {
+// AppAckedRecord caches information when this application received
+// a strongly-acked notification.
+type AppAckedRecord struct {
+ When time.Time
+}
+
+// AppTotalOrderRecord caches information when this application received
+// a total-ordering deliver notification.
+type AppTotalOrderRecord struct {
BlockHashes common.Hashes
Early bool
+ When time.Time
+}
+
+// AppDeliveredRecord caches information when this application received
+// a block delivered notification.
+type AppDeliveredRecord struct {
+ ConsensusTime time.Time
+ When time.Time
}
// App implements Application interface for testing purpose.
type App struct {
- Acked map[common.Hash]struct{}
- ackedLock sync.RWMutex
- TotalOrdered []*totalOrderDeliver
- totalOrderedLock sync.RWMutex
- Delivered map[common.Hash]time.Time
- DeliverSequence common.Hashes
- deliveredLock sync.RWMutex
+ Acked map[common.Hash]*AppAckedRecord
+ ackedLock sync.RWMutex
+ TotalOrdered []*AppTotalOrderRecord
+ TotalOrderedByHash map[common.Hash]*AppTotalOrderRecord
+ totalOrderedLock sync.RWMutex
+ Delivered map[common.Hash]*AppDeliveredRecord
+ DeliverSequence common.Hashes
+ deliveredLock sync.RWMutex
}
// NewApp constructs a TestApp instance.
func NewApp() *App {
return &App{
- Acked: make(map[common.Hash]struct{}),
- TotalOrdered: []*totalOrderDeliver{},
- Delivered: make(map[common.Hash]time.Time),
- DeliverSequence: common.Hashes{},
+ Acked: make(map[common.Hash]*AppAckedRecord),
+ TotalOrdered: []*AppTotalOrderRecord{},
+ TotalOrderedByHash: make(map[common.Hash]*AppTotalOrderRecord),
+ Delivered: make(map[common.Hash]*AppDeliveredRecord),
+ DeliverSequence: common.Hashes{},
}
}
@@ -83,7 +101,7 @@ func (app *App) StronglyAcked(blockHash common.Hash) {
app.ackedLock.Lock()
defer app.ackedLock.Unlock()
- app.Acked[blockHash] = struct{}{}
+ app.Acked[blockHash] = &AppAckedRecord{When: time.Now().UTC()}
}
// TotalOrderingDeliver implements Application interface.
@@ -91,10 +109,18 @@ func (app *App) TotalOrderingDeliver(blockHashes common.Hashes, early bool) {
app.totalOrderedLock.Lock()
defer app.totalOrderedLock.Unlock()
- app.TotalOrdered = append(app.TotalOrdered, &totalOrderDeliver{
+ rec := &AppTotalOrderRecord{
BlockHashes: blockHashes,
Early: early,
- })
+ When: time.Now().UTC(),
+ }
+ app.TotalOrdered = append(app.TotalOrdered, rec)
+ for _, h := range blockHashes {
+ if _, exists := app.TotalOrderedByHash[h]; exists {
+ panic(fmt.Errorf("deliver duplicated blocks from total ordering"))
+ }
+ app.TotalOrderedByHash[h] = rec
+ }
}
// DeliverBlock implements Application interface.
@@ -102,7 +128,10 @@ func (app *App) DeliverBlock(blockHash common.Hash, timestamp time.Time) {
app.deliveredLock.Lock()
defer app.deliveredLock.Unlock()
- app.Delivered[blockHash] = timestamp
+ app.Delivered[blockHash] = &AppDeliveredRecord{
+ ConsensusTime: timestamp,
+ When: time.Now().UTC(),
+ }
app.DeliverSequence = append(app.DeliverSequence, blockHash)
}
@@ -132,7 +161,7 @@ func (app *App) Compare(other *App) error {
if hOther != h {
return ErrMismatchBlockHashSequence
}
- if app.Delivered[h] != other.Delivered[h] {
+ if app.Delivered[h].ConsensusTime != other.Delivered[h].ConsensusTime {
return ErrMismatchConsensusTime
}
}
@@ -160,16 +189,17 @@ func (app *App) Verify() error {
if _, acked := app.Acked[h]; !acked {
return ErrDeliveredBlockNotAcked
}
- t, exists := app.Delivered[h]
+ rec, exists := app.Delivered[h]
if !exists {
return ErrApplicationIntegrityFailed
}
// Make sure the consensus time is incremental.
- ok := prevTime.Before(t) || prevTime.Equal(t)
+ ok := prevTime.Before(rec.ConsensusTime) ||
+ prevTime.Equal(rec.ConsensusTime)
if !ok {
return ErrConsensusTimestampOutOfOrder
}
- prevTime = t
+ prevTime = rec.ConsensusTime
}
// Make sure the order of delivered and total ordering are the same by
// comparing the concated string.
@@ -178,8 +208,8 @@ func (app *App) Verify() error {
hashSequenceIdx := 0
Loop:
- for _, totalOrderDeliver := range app.TotalOrdered {
- for _, h := range totalOrderDeliver.BlockHashes {
+ for _, rec := range app.TotalOrdered {
+ for _, h := range rec.BlockHashes {
if hashSequenceIdx >= len(app.DeliverSequence) {
break Loop
}
diff --git a/core/test/app_test.go b/core/test/app_test.go
index 285773e..f4c4a74 100644
--- a/core/test/app_test.go
+++ b/core/test/app_test.go
@@ -11,18 +11,18 @@ import (
type AppTestSuite struct {
suite.Suite
- to1, to2, to3 *totalOrderDeliver
+ to1, to2, to3 *AppTotalOrderRecord
}
func (s *AppTestSuite) SetupSuite() {
- s.to1 = &totalOrderDeliver{
+ s.to1 = &AppTotalOrderRecord{
BlockHashes: common.Hashes{
common.NewRandomHash(),
common.NewRandomHash(),
},
Early: false,
}
- s.to2 = &totalOrderDeliver{
+ s.to2 = &AppTotalOrderRecord{
BlockHashes: common.Hashes{
common.NewRandomHash(),
common.NewRandomHash(),
@@ -30,7 +30,7 @@ func (s *AppTestSuite) SetupSuite() {
},
Early: false,
}
- s.to3 = &totalOrderDeliver{
+ s.to3 = &AppTotalOrderRecord{
BlockHashes: common.Hashes{
common.NewRandomHash(),
},
@@ -39,7 +39,7 @@ func (s *AppTestSuite) SetupSuite() {
}
func (s *AppTestSuite) setupAppByTotalOrderDeliver(
- app *App, to *totalOrderDeliver) {
+ app *App, to *AppTotalOrderRecord) {
for _, h := range to.BlockHashes {
app.StronglyAcked(h)
diff --git a/core/test/scheduler-event.go b/core/test/scheduler-event.go
index 60411b4..85968c5 100644
--- a/core/test/scheduler-event.go
+++ b/core/test/scheduler-event.go
@@ -25,6 +25,8 @@ import (
// Event defines a scheduler event.
type Event struct {
+ // HistoryIndex is the index of this event in history.
+ HistoryIndex int
// ValidatorID is the ID of handler that this event deginated to.
ValidatorID types.ValidatorID
// Time is the expected execution time of this event.
@@ -33,9 +35,8 @@ type Event struct {
ExecError error
// Payload is application specific data carried by this event.
Payload interface{}
- // ParentTime is the time of parent event, this field is essential when
- // we need to calculate the latency the handler assigned.
- ParentTime time.Time
+ // ParentHistoryIndex is the index of parent event in history.
+ ParentHistoryIndex int
// ExecInterval is the latency to execute this event
ExecInterval time.Duration
}
@@ -69,8 +70,10 @@ func NewEvent(
vID types.ValidatorID, when time.Time, payload interface{}) *Event {
return &Event{
- ValidatorID: vID,
- Time: when,
- Payload: payload,
+ HistoryIndex: -1,
+ ParentHistoryIndex: -1,
+ ValidatorID: vID,
+ Time: when,
+ Payload: payload,
}
}
diff --git a/core/test/scheduler.go b/core/test/scheduler.go
index 023d09f..6a3a40a 100644
--- a/core/test/scheduler.go
+++ b/core/test/scheduler.go
@@ -165,12 +165,6 @@ func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) {
if sch.stopper.ShouldStop(e.ValidatorID) {
sch.cancelFunc()
}
- // Include the execution interval of parent event to the expected time
- // to execute child events.
- for _, newEvent := range newEvents {
- newEvent.ParentTime = e.Time
- newEvent.Time = newEvent.Time.Add(e.ExecInterval)
- }
return newEvents
}()
// Record executed events as history.
@@ -178,8 +172,15 @@ func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) {
sch.historyLock.Lock()
defer sch.historyLock.Unlock()
+ e.HistoryIndex = len(sch.history)
sch.history = append(sch.history, e)
}()
+ // Include the execution interval of parent event to the expected time
+ // to execute child events.
+ for _, newEvent := range newEvents {
+ newEvent.ParentHistoryIndex = e.HistoryIndex
+ newEvent.Time = newEvent.Time.Add(e.ExecInterval)
+ }
// Add derivated events back to event queue.
func() {
sch.eventsLock.Lock()
diff --git a/core/test/scheduler_test.go b/core/test/scheduler_test.go
index c67240f..5aef36e 100644
--- a/core/test/scheduler_test.go
+++ b/core/test/scheduler_test.go
@@ -165,7 +165,8 @@ func (s *SchedulerTestSuite) TestChildEvent() {
req.True(e.Time.Sub(curEvent.Time) >= 1300*time.Millisecond)
// Make sure ParentTime field is set and is equal to parent event's
// time.
- req.Equal(e.ParentTime, curEvent.Time)
+ req.NotEqual(-1, e.ParentHistoryIndex)
+ req.Equal(e.ParentHistoryIndex, curEvent.HistoryIndex)
curEvent = e
}
}
diff --git a/integration_test/latency.go b/integration_test/latency.go
index 383d069..8f06084 100644
--- a/integration_test/latency.go
+++ b/integration_test/latency.go
@@ -28,17 +28,27 @@ type LatencyModel interface {
Delay() time.Duration
}
-// normalLatencyModel would return latencies in normal distribution.
-type normalLatencyModel struct {
+// NormalLatencyModel would return latencies in normal distribution.
+type NormalLatencyModel struct {
Sigma float64
Mean float64
}
// Delay implements LatencyModel interface.
-func (m *normalLatencyModel) Delay() time.Duration {
+func (m *NormalLatencyModel) Delay() time.Duration {
delay := rand.NormFloat64()*m.Sigma + m.Mean
if delay < 0 {
delay = m.Sigma / 2
}
return time.Duration(delay) * time.Millisecond
}
+
+// FixedLatencyModel return fixed latencies.
+type FixedLatencyModel struct {
+ Latency float64
+}
+
+// Delay implements LatencyModel interface.
+func (m *FixedLatencyModel) Delay() time.Duration {
+ return time.Duration(m.Latency) * time.Millisecond
+}
diff --git a/integration_test/non-byzantine_test.go b/integration_test/non-byzantine_test.go
index 111dcd0..827d2ad 100644
--- a/integration_test/non-byzantine_test.go
+++ b/integration_test/non-byzantine_test.go
@@ -33,11 +33,11 @@ type NonByzantineTestSuite struct {
func (s *NonByzantineTestSuite) TestNonByzantine() {
var (
- networkLatency = &normalLatencyModel{
+ networkLatency = &NormalLatencyModel{
Sigma: 20,
Mean: 250,
}
- proposingLatency = &normalLatencyModel{
+ proposingLatency = &NormalLatencyModel{
Sigma: 30,
Mean: 500,
}
@@ -46,42 +46,19 @@ func (s *NonByzantineTestSuite) TestNonByzantine() {
req = s.Require()
)
- gov, err := test.NewGovernance(25, 700)
+ apps, dbs, validators, err := PrepareValidators(
+ 25, networkLatency, proposingLatency)
req.Nil(err)
now := time.Now().UTC()
- for vID := range gov.GetValidatorSet() {
- apps[vID] = test.NewApp()
-
- db, err := blockdb.NewMemBackedBlockDB()
- req.Nil(err)
- dbs[vID] = db
- }
- stopper := test.NewStopByConfirmedBlocks(50, apps, dbs)
- sch := test.NewScheduler(stopper)
- for vID := range gov.GetValidatorSet() {
- key, err := gov.GetPrivateKey(vID)
- req.Nil(err)
- v := newValidator(
- apps[vID],
- gov,
- dbs[vID],
- key,
- vID,
- networkLatency,
- proposingLatency)
+ sch := test.NewScheduler(test.NewStopByConfirmedBlocks(50, apps, dbs))
+ for vID, v := range validators {
sch.RegisterEventHandler(vID, v)
- req.Nil(sch.Seed(newProposeBlockEvent(vID, now)))
+ req.Nil(sch.Seed(NewProposeBlockEvent(vID, now)))
}
sch.Run(10)
// Check results by comparing test.App instances.
- for vFrom := range gov.GetValidatorSet() {
- req.Nil(apps[vFrom].Verify())
- for vTo := range gov.GetValidatorSet() {
- if vFrom == vTo {
- continue
- }
- req.Nil(apps[vFrom].Compare(apps[vTo]))
- }
+ if err = VerifyApps(apps); err != nil {
+ panic(err)
}
}
diff --git a/integration_test/stats.go b/integration_test/stats.go
new file mode 100644
index 0000000..ae8ded4
--- /dev/null
+++ b/integration_test/stats.go
@@ -0,0 +1,176 @@
+package integration
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/test"
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+// Errors when calculating statistics for events.
+var (
+ ErrUnknownEvent = fmt.Errorf("unknown event")
+ ErrUnknownConsensusEventType = fmt.Errorf("unknown consensus event type")
+)
+
+// StatsSet represents accumulatee result of a group of related events
+// (ex. All events from one validator).
+type StatsSet struct {
+ ProposedBlockCount int
+ ReceivedBlockCount int
+ StronglyAckedBlockCount int
+ TotalOrderedBlockCount int
+ DeliveredBlockCount int
+ ProposingLatency time.Duration
+ ReceivingLatency time.Duration
+ PrepareExecLatency time.Duration
+ ProcessExecLatency time.Duration
+}
+
+// newBlockProposeEvent accumulates a block proposing event.
+func (s *StatsSet) newBlockProposeEvent(
+ e *test.Event, payload *consensusEventPayload, history []*test.Event) {
+
+ // Find previous block proposing event.
+ if e.ParentHistoryIndex != -1 {
+ parentEvent := history[e.ParentHistoryIndex]
+ s.ProposingLatency +=
+ e.Time.Sub(parentEvent.Time) - parentEvent.ExecInterval
+ }
+ s.PrepareExecLatency += e.ExecInterval
+ s.ProposedBlockCount++
+}
+
+// newBlockReceiveEvent accumulates a block received event.
+func (s *StatsSet) newBlockReceiveEvent(
+ e *test.Event,
+ payload *consensusEventPayload,
+ history []*test.Event,
+ app *test.App) {
+
+ // Find previous block proposing event.
+ parentEvent := history[e.ParentHistoryIndex]
+ s.ReceivingLatency +=
+ e.Time.Sub(parentEvent.Time) - parentEvent.ExecInterval
+ s.ProcessExecLatency += e.ExecInterval
+ s.ReceivedBlockCount++
+
+ // Find statistics from test.App
+ block := payload.PiggyBack.(*types.Block)
+ app.Check(func(app *test.App) {
+ // Is this block strongly acked?
+ if _, exists := app.Acked[block.Hash]; !exists {
+ return
+ }
+ s.StronglyAckedBlockCount++
+
+ // Is this block total ordered?
+ if _, exists := app.TotalOrderedByHash[block.Hash]; !exists {
+ return
+ }
+ s.TotalOrderedBlockCount++
+
+ // Is this block delivered?
+ if _, exists := app.Delivered[block.Hash]; !exists {
+ return
+ }
+ s.DeliveredBlockCount++
+ })
+}
+
+// done would divide the latencies we cached with related event count. This way
+// to calculate average latency is more accurate.
+func (s *StatsSet) done(validatorCount int) {
+ s.ProposingLatency /= time.Duration(s.ProposedBlockCount - validatorCount)
+ s.ReceivingLatency /= time.Duration(s.ReceivedBlockCount)
+ s.PrepareExecLatency /= time.Duration(s.ProposedBlockCount)
+ s.ProcessExecLatency /= time.Duration(s.ReceivedBlockCount)
+}
+
+// Stats is statistics of a slice of test.Event generated by validators.
+type Stats struct {
+ ByValidator map[types.ValidatorID]*StatsSet
+ All *StatsSet
+ BPS float64
+ ExecutionTime time.Duration
+}
+
+// NewStats constructs an Stats instance by providing a slice of
+// test.Event.
+func NewStats(
+ history []*test.Event, apps map[types.ValidatorID]*test.App) (
+ stats *Stats, err error) {
+
+ stats = &Stats{
+ ByValidator: make(map[types.ValidatorID]*StatsSet),
+ All: &StatsSet{},
+ }
+ if err = stats.calculate(history, apps); err != nil {
+ stats = nil
+ }
+ stats.summary(history)
+ return
+}
+
+func (stats *Stats) calculate(
+ history []*test.Event, apps map[types.ValidatorID]*test.App) error {
+
+ defer func() {
+ stats.All.done(len(stats.ByValidator))
+ for _, set := range stats.ByValidator {
+ set.done(1)
+ }
+ }()
+
+ for _, e := range history {
+ payload, ok := e.Payload.(*consensusEventPayload)
+ if !ok {
+ return ErrUnknownEvent
+ }
+ switch payload.Type {
+ case evtProposeBlock:
+ stats.All.newBlockProposeEvent(
+ e, payload, history)
+ stats.getStatsSetByValidator(e.ValidatorID).newBlockProposeEvent(
+ e, payload, history)
+ case evtReceiveBlock:
+ stats.All.newBlockReceiveEvent(
+ e, payload, history, apps[e.ValidatorID])
+ stats.getStatsSetByValidator(e.ValidatorID).newBlockReceiveEvent(
+ e, payload, history, apps[e.ValidatorID])
+ default:
+ return ErrUnknownConsensusEventType
+ }
+ }
+ return nil
+}
+
+func (stats *Stats) getStatsSetByValidator(
+ vID types.ValidatorID) (s *StatsSet) {
+
+ s = stats.ByValidator[vID]
+ if s == nil {
+ s = &StatsSet{}
+ stats.ByValidator[vID] = s
+ }
+ return
+}
+
+func (stats *Stats) summary(history []*test.Event) {
+ // Find average delivered block count among all blocks.
+ totalConfirmedBlocks := 0
+ for _, s := range stats.ByValidator {
+ totalConfirmedBlocks += s.DeliveredBlockCount
+ }
+ averageConfirmedBlocks := totalConfirmedBlocks / len(stats.ByValidator)
+
+ // Find execution time.
+ // Note: it's a simplified way to calculate the execution time:
+ // the latest event might not be at the end of history when
+ // the number of worker routine is larger than 1.
+ stats.ExecutionTime = history[len(history)-1].Time.Sub(history[0].Time)
+ // Calculate BPS.
+ latencyAsSecond := stats.ExecutionTime.Nanoseconds() / (1000 * 1000 * 1000)
+ stats.BPS = float64(averageConfirmedBlocks) / float64(latencyAsSecond)
+}
diff --git a/integration_test/stats_test.go b/integration_test/stats_test.go
new file mode 100644
index 0000000..e0be126
--- /dev/null
+++ b/integration_test/stats_test.go
@@ -0,0 +1,60 @@
+package integration
+
+import (
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus-core/core/test"
+ "github.com/stretchr/testify/suite"
+)
+
+type EventStatsTestSuite struct {
+ suite.Suite
+}
+
+func (s *EventStatsTestSuite) TestCalculate() {
+ // Setup a test with fixed latency in proposing and network,
+ // and make sure the calculated statistics is expected.
+ var (
+ networkLatency = &FixedLatencyModel{Latency: 100}
+ proposingLatency = &FixedLatencyModel{Latency: 300}
+ req = s.Require()
+ )
+
+ apps, dbs, validators, err := PrepareValidators(
+ 7, networkLatency, proposingLatency)
+ req.Nil(err)
+
+ sch := test.NewScheduler(test.NewStopByConfirmedBlocks(50, apps, dbs))
+ now := time.Now().UTC()
+ for vID, v := range validators {
+ sch.RegisterEventHandler(vID, v)
+ req.Nil(sch.Seed(NewProposeBlockEvent(vID, now)))
+ }
+ sch.Run(10)
+ req.Nil(VerifyApps(apps))
+ // Check total statistics result.
+ stats, err := NewStats(sch.CloneExecutionHistory(), apps)
+ req.Nil(err)
+ req.True(stats.All.ProposedBlockCount > 350)
+ req.True(stats.All.ReceivedBlockCount > 350)
+ req.True(stats.All.StronglyAckedBlockCount > 350)
+ req.True(stats.All.TotalOrderedBlockCount >= 350)
+ req.True(stats.All.DeliveredBlockCount >= 350)
+ req.Equal(stats.All.ProposingLatency, 300*time.Millisecond)
+ req.Equal(stats.All.ReceivingLatency, 100*time.Millisecond)
+ // Check statistics for each validator.
+ for _, vStats := range stats.ByValidator {
+ req.True(vStats.ProposedBlockCount > 50)
+ req.True(vStats.ReceivedBlockCount > 50)
+ req.True(vStats.StronglyAckedBlockCount > 50)
+ req.True(vStats.TotalOrderedBlockCount >= 50)
+ req.True(vStats.DeliveredBlockCount >= 50)
+ req.Equal(vStats.ProposingLatency, 300*time.Millisecond)
+ req.Equal(vStats.ReceivingLatency, 100*time.Millisecond)
+ }
+}
+
+func TestEventStats(t *testing.T) {
+ suite.Run(t, new(EventStatsTestSuite))
+}
diff --git a/integration_test/utils.go b/integration_test/utils.go
new file mode 100644
index 0000000..c1eafb7
--- /dev/null
+++ b/integration_test/utils.go
@@ -0,0 +1,72 @@
+package integration
+
+import (
+ "github.com/dexon-foundation/dexon-consensus-core/blockdb"
+ "github.com/dexon-foundation/dexon-consensus-core/core/test"
+ "github.com/dexon-foundation/dexon-consensus-core/core/types"
+ "github.com/dexon-foundation/dexon-consensus-core/crypto"
+)
+
+// PrepareValidators setups validators for testing.
+func PrepareValidators(
+ validatorCount int,
+ networkLatency, proposingLatency LatencyModel) (
+ apps map[types.ValidatorID]*test.App,
+ dbs map[types.ValidatorID]blockdb.BlockDatabase,
+ validators map[types.ValidatorID]*Validator,
+ err error) {
+
+ var (
+ db blockdb.BlockDatabase
+ key crypto.PrivateKey
+ )
+
+ apps = make(map[types.ValidatorID]*test.App)
+ dbs = make(map[types.ValidatorID]blockdb.BlockDatabase)
+ validators = make(map[types.ValidatorID]*Validator)
+
+ gov, err := test.NewGovernance(validatorCount, 700)
+ if err != nil {
+ return
+ }
+ for vID := range gov.GetValidatorSet() {
+ apps[vID] = test.NewApp()
+
+ if db, err = blockdb.NewMemBackedBlockDB(); err != nil {
+ return
+ }
+ dbs[vID] = db
+ }
+ for vID := range gov.GetValidatorSet() {
+ if key, err = gov.GetPrivateKey(vID); err != nil {
+ return
+ }
+ validators[vID] = NewValidator(
+ apps[vID],
+ gov,
+ dbs[vID],
+ key,
+ vID,
+ networkLatency,
+ proposingLatency)
+ }
+ return
+}
+
+// VerifyApps is a helper to check delivery between test.Apps
+func VerifyApps(apps map[types.ValidatorID]*test.App) (err error) {
+ for vFrom, fromApp := range apps {
+ if err = fromApp.Verify(); err != nil {
+ return
+ }
+ for vTo, toApp := range apps {
+ if vFrom == vTo {
+ continue
+ }
+ if err = fromApp.Compare(toApp); err != nil {
+ return
+ }
+ }
+ }
+ return
+}
diff --git a/integration_test/validator.go b/integration_test/validator.go
index 00ffff2..fd7a7ad 100644
--- a/integration_test/validator.go
+++ b/integration_test/validator.go
@@ -41,13 +41,17 @@ type consensusEventPayload struct {
PiggyBack interface{}
}
-func newProposeBlockEvent(vID types.ValidatorID, when time.Time) *test.Event {
+// NewProposeBlockEvent constructs an test.Event that would trigger
+// block proposing.
+func NewProposeBlockEvent(vID types.ValidatorID, when time.Time) *test.Event {
return test.NewEvent(vID, when, &consensusEventPayload{
Type: evtProposeBlock,
})
}
-func newReceiveBlockEvent(
+// NewReceiveBlockEvent constructs an test.Event that would trigger
+// block received.
+func NewReceiveBlockEvent(
vID types.ValidatorID, when time.Time, block *types.Block) *test.Event {
return test.NewEvent(vID, when, &consensusEventPayload{
@@ -56,7 +60,8 @@ func newReceiveBlockEvent(
})
}
-type validator struct {
+// Validator is designed to work with test.Scheduler.
+type Validator struct {
ID types.ValidatorID
cons *core.Consensus
gov core.Governance
@@ -64,16 +69,17 @@ type validator struct {
proposingLatency LatencyModel
}
-func newValidator(
+// NewValidator constructs an instance of Validator.
+func NewValidator(
app core.Application,
gov core.Governance,
db blockdb.BlockDatabase,
privateKey crypto.PrivateKey,
vID types.ValidatorID,
networkLatency LatencyModel,
- proposingLatency LatencyModel) *validator {
+ proposingLatency LatencyModel) *Validator {
- return &validator{
+ return &Validator{
ID: vID,
gov: gov,
networkLatency: networkLatency,
@@ -83,7 +89,8 @@ func newValidator(
}
}
-func (v *validator) Handle(e *test.Event) (events []*test.Event) {
+// Handle implements test.EventHandler interface.
+func (v *Validator) Handle(e *test.Event) (events []*test.Event) {
payload := e.Payload.(*consensusEventPayload)
switch payload.Type {
case evtProposeBlock:
@@ -96,7 +103,7 @@ func (v *validator) Handle(e *test.Event) (events []*test.Event) {
return
}
-func (v *validator) handleProposeBlock(when time.Time, piggyback interface{}) (
+func (v *Validator) handleProposeBlock(when time.Time, piggyback interface{}) (
events []*test.Event, err error) {
b := &types.Block{ProposerID: v.ID}
@@ -111,16 +118,16 @@ func (v *validator) handleProposeBlock(when time.Time, piggyback interface{}) (
if vID == v.ID {
continue
}
- events = append(events, newReceiveBlockEvent(
+ events = append(events, NewReceiveBlockEvent(
vID, when.Add(v.networkLatency.Delay()), b.Clone()))
}
// Create next 'block proposing' event for this validators.
- events = append(events, newProposeBlockEvent(
+ events = append(events, NewProposeBlockEvent(
v.ID, when.Add(v.proposingLatency.Delay())))
return
}
-func (v *validator) handleReceiveBlock(piggyback interface{}) (
+func (v *Validator) handleReceiveBlock(piggyback interface{}) (
events []*test.Event, err error) {
err = v.cons.ProcessBlock(piggyback.(*types.Block))