aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/scheduler.go')
-rw-r--r--core/test/scheduler.go215
1 files changed, 0 insertions, 215 deletions
diff --git a/core/test/scheduler.go b/core/test/scheduler.go
deleted file mode 100644
index f6c7eed..0000000
--- a/core/test/scheduler.go
+++ /dev/null
@@ -1,215 +0,0 @@
-// Copyright 2018 The dexon-consensus Authors
-// This file is part of the dexon-consensus library.
-//
-// The dexon-consensus library is free software: you can redistribute it
-// and/or modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// The dexon-consensus library is distributed in the hope that it will be
-// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
-// General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus library. If not, see
-// <http://www.gnu.org/licenses/>.
-
-package test
-
-import (
- "container/heap"
- "context"
- "fmt"
- "sync"
- "time"
-
- "github.com/dexon-foundation/dexon-consensus/core/types"
-)
-
-var (
- // ErrSchedulerAlreadyStarted means callers attempt to insert some
- // seed events after calling 'Run'.
- ErrSchedulerAlreadyStarted = fmt.Errorf("scheduler already started")
- // errNilEventWhenNotified is an internal error which means a worker routine
- // can't get an event when notified.
- errNilEventWhenNotified = fmt.Errorf("nil event when notified")
-)
-
-type schedulerHandlerRecord struct {
- handler EventHandler
- lock sync.Mutex
-}
-
-// Scheduler is an event scheduler.
-type Scheduler struct {
- events eventQueue
- eventsLock sync.Mutex
- history []*Event
- historyLock sync.RWMutex
- isStarted bool
- handlers map[types.NodeID]*schedulerHandlerRecord
- handlersLock sync.RWMutex
- eventNotification chan struct{}
- ctx context.Context
- cancelFunc context.CancelFunc
- stopper Stopper
-}
-
-// NewScheduler constructs an Scheduler instance.
-func NewScheduler(stopper Stopper) *Scheduler {
- ctx, cancel := context.WithCancel(context.Background())
- return &Scheduler{
- events: eventQueue{},
- history: []*Event{},
- handlers: make(map[types.NodeID]*schedulerHandlerRecord),
- eventNotification: make(chan struct{}, 100000),
- ctx: ctx,
- cancelFunc: cancel,
- stopper: stopper,
- }
-}
-
-// Run would run the scheduler. If you need strict incrememtal execution order
-// of events based on their 'Time' field, assign 'numWorkers' as 1. If you need
-// faster execution, assign 'numWorkers' a larger number.
-func (sch *Scheduler) Run(numWorkers int) {
- var wg sync.WaitGroup
-
- sch.isStarted = true
- for i := 0; i < numWorkers; i++ {
- wg.Add(1)
- go sch.workerRoutine(&wg)
- }
- // Blocks until all routines are finished.
- wg.Wait()
-}
-
-// Seed is used to provide the scheduler some seed events.
-func (sch *Scheduler) Seed(e *Event) error {
- sch.eventsLock.Lock()
- defer sch.eventsLock.Unlock()
-
- if sch.isStarted {
- return ErrSchedulerAlreadyStarted
- }
- sch.addEvent(e)
- return nil
-}
-
-// RegisterEventHandler register an event handler by providing ID of
-// corresponding node.
-func (sch *Scheduler) RegisterEventHandler(
- nID types.NodeID,
- handler EventHandler) {
-
- sch.handlersLock.Lock()
- defer sch.handlersLock.Unlock()
-
- sch.handlers[nID] = &schedulerHandlerRecord{handler: handler}
-}
-
-// nextTick would pick the oldest event from eventQueue.
-func (sch *Scheduler) nextTick() (e *Event) {
- sch.eventsLock.Lock()
- defer sch.eventsLock.Unlock()
-
- if len(sch.events) == 0 {
- return nil
- }
- return heap.Pop(&sch.events).(*Event)
-}
-
-// addEvent is an helper function to add events into eventQueue sorted by
-// their 'Time' field.
-func (sch *Scheduler) addEvent(e *Event) {
- // Perform sorted insertion.
- heap.Push(&sch.events, e)
- sch.eventNotification <- struct{}{}
-}
-
-// CloneExecutionHistory returns a cloned event execution history.
-func (sch *Scheduler) CloneExecutionHistory() (cloned []*Event) {
- sch.historyLock.RLock()
- defer sch.historyLock.RUnlock()
-
- cloned = make([]*Event, len(sch.history))
- copy(cloned, sch.history)
- return
-}
-
-// workerRoutine is the mainloop when handling events.
-func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) {
- defer wg.Done()
-
- handleEvent := func(e *Event) {
- // Find correspond handler record.
- hRec := func(nID types.NodeID) *schedulerHandlerRecord {
- sch.handlersLock.RLock()
- defer sch.handlersLock.RUnlock()
-
- return sch.handlers[nID]
- }(e.NodeID)
-
- newEvents := func() []*Event {
- // This lock makes sure there would be no concurrent access
- // against each handler.
- hRec.lock.Lock()
- defer hRec.lock.Unlock()
-
- // Handle incoming event, and record its execution time.
- beforeExecution := time.Now().UTC()
- newEvents := hRec.handler.Handle(e)
- e.ExecInterval = time.Now().UTC().Sub(beforeExecution)
- // It's safe to check status of that node under 'hRec.lock'.
- if sch.stopper.ShouldStop(e.NodeID) {
- sch.cancelFunc()
- }
- return newEvents
- }()
- // Record executed events as history.
- func() {
- sch.historyLock.Lock()
- defer sch.historyLock.Unlock()
-
- e.HistoryIndex = len(sch.history)
- sch.history = append(sch.history, e)
- }()
- // Include the execution interval of parent event to the expected time
- // to execute child events.
- for _, newEvent := range newEvents {
- newEvent.ParentHistoryIndex = e.HistoryIndex
- newEvent.Time = newEvent.Time.Add(e.ExecInterval)
- }
- // Add derivated events back to event queue.
- func() {
- sch.eventsLock.Lock()
- defer sch.eventsLock.Unlock()
-
- for _, newEvent := range newEvents {
- sch.addEvent(newEvent)
- }
- }()
- }
-
-Done:
- for {
- // We favor scheduler-shutdown signal than other events.
- select {
- case <-sch.ctx.Done():
- break Done
- default:
- }
- // Block until new event arrival or scheduler shutdown.
- select {
- case <-sch.eventNotification:
- e := sch.nextTick()
- if e == nil {
- panic(errNilEventWhenNotified)
- }
- handleEvent(e)
- case <-sch.ctx.Done():
- break Done
- }
- }
-}