diff options
Diffstat (limited to 'core/test/scheduler.go')
-rw-r--r-- | core/test/scheduler.go | 215 |
1 files changed, 0 insertions, 215 deletions
diff --git a/core/test/scheduler.go b/core/test/scheduler.go deleted file mode 100644 index f6c7eed..0000000 --- a/core/test/scheduler.go +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// <http://www.gnu.org/licenses/>. - -package test - -import ( - "container/heap" - "context" - "fmt" - "sync" - "time" - - "github.com/dexon-foundation/dexon-consensus/core/types" -) - -var ( - // ErrSchedulerAlreadyStarted means callers attempt to insert some - // seed events after calling 'Run'. - ErrSchedulerAlreadyStarted = fmt.Errorf("scheduler already started") - // errNilEventWhenNotified is an internal error which means a worker routine - // can't get an event when notified. - errNilEventWhenNotified = fmt.Errorf("nil event when notified") -) - -type schedulerHandlerRecord struct { - handler EventHandler - lock sync.Mutex -} - -// Scheduler is an event scheduler. -type Scheduler struct { - events eventQueue - eventsLock sync.Mutex - history []*Event - historyLock sync.RWMutex - isStarted bool - handlers map[types.NodeID]*schedulerHandlerRecord - handlersLock sync.RWMutex - eventNotification chan struct{} - ctx context.Context - cancelFunc context.CancelFunc - stopper Stopper -} - -// NewScheduler constructs an Scheduler instance. -func NewScheduler(stopper Stopper) *Scheduler { - ctx, cancel := context.WithCancel(context.Background()) - return &Scheduler{ - events: eventQueue{}, - history: []*Event{}, - handlers: make(map[types.NodeID]*schedulerHandlerRecord), - eventNotification: make(chan struct{}, 100000), - ctx: ctx, - cancelFunc: cancel, - stopper: stopper, - } -} - -// Run would run the scheduler. If you need strict incrememtal execution order -// of events based on their 'Time' field, assign 'numWorkers' as 1. If you need -// faster execution, assign 'numWorkers' a larger number. -func (sch *Scheduler) Run(numWorkers int) { - var wg sync.WaitGroup - - sch.isStarted = true - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go sch.workerRoutine(&wg) - } - // Blocks until all routines are finished. - wg.Wait() -} - -// Seed is used to provide the scheduler some seed events. -func (sch *Scheduler) Seed(e *Event) error { - sch.eventsLock.Lock() - defer sch.eventsLock.Unlock() - - if sch.isStarted { - return ErrSchedulerAlreadyStarted - } - sch.addEvent(e) - return nil -} - -// RegisterEventHandler register an event handler by providing ID of -// corresponding node. -func (sch *Scheduler) RegisterEventHandler( - nID types.NodeID, - handler EventHandler) { - - sch.handlersLock.Lock() - defer sch.handlersLock.Unlock() - - sch.handlers[nID] = &schedulerHandlerRecord{handler: handler} -} - -// nextTick would pick the oldest event from eventQueue. -func (sch *Scheduler) nextTick() (e *Event) { - sch.eventsLock.Lock() - defer sch.eventsLock.Unlock() - - if len(sch.events) == 0 { - return nil - } - return heap.Pop(&sch.events).(*Event) -} - -// addEvent is an helper function to add events into eventQueue sorted by -// their 'Time' field. -func (sch *Scheduler) addEvent(e *Event) { - // Perform sorted insertion. - heap.Push(&sch.events, e) - sch.eventNotification <- struct{}{} -} - -// CloneExecutionHistory returns a cloned event execution history. -func (sch *Scheduler) CloneExecutionHistory() (cloned []*Event) { - sch.historyLock.RLock() - defer sch.historyLock.RUnlock() - - cloned = make([]*Event, len(sch.history)) - copy(cloned, sch.history) - return -} - -// workerRoutine is the mainloop when handling events. -func (sch *Scheduler) workerRoutine(wg *sync.WaitGroup) { - defer wg.Done() - - handleEvent := func(e *Event) { - // Find correspond handler record. - hRec := func(nID types.NodeID) *schedulerHandlerRecord { - sch.handlersLock.RLock() - defer sch.handlersLock.RUnlock() - - return sch.handlers[nID] - }(e.NodeID) - - newEvents := func() []*Event { - // This lock makes sure there would be no concurrent access - // against each handler. - hRec.lock.Lock() - defer hRec.lock.Unlock() - - // Handle incoming event, and record its execution time. - beforeExecution := time.Now().UTC() - newEvents := hRec.handler.Handle(e) - e.ExecInterval = time.Now().UTC().Sub(beforeExecution) - // It's safe to check status of that node under 'hRec.lock'. - if sch.stopper.ShouldStop(e.NodeID) { - sch.cancelFunc() - } - return newEvents - }() - // Record executed events as history. - func() { - sch.historyLock.Lock() - defer sch.historyLock.Unlock() - - e.HistoryIndex = len(sch.history) - sch.history = append(sch.history, e) - }() - // Include the execution interval of parent event to the expected time - // to execute child events. - for _, newEvent := range newEvents { - newEvent.ParentHistoryIndex = e.HistoryIndex - newEvent.Time = newEvent.Time.Add(e.ExecInterval) - } - // Add derivated events back to event queue. - func() { - sch.eventsLock.Lock() - defer sch.eventsLock.Unlock() - - for _, newEvent := range newEvents { - sch.addEvent(newEvent) - } - }() - } - -Done: - for { - // We favor scheduler-shutdown signal than other events. - select { - case <-sch.ctx.Done(): - break Done - default: - } - // Block until new event arrival or scheduler shutdown. - select { - case <-sch.eventNotification: - e := sch.nextTick() - if e == nil { - panic(errNilEventWhenNotified) - } - handleEvent(e) - case <-sch.ctx.Done(): - break Done - } - } -} |