diff options
Diffstat (limited to 'ethreact/reactor.go')
-rw-r--r-- | ethreact/reactor.go | 181 |
1 files changed, 0 insertions, 181 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go deleted file mode 100644 index f42f71202..000000000 --- a/ethreact/reactor.go +++ /dev/null @@ -1,181 +0,0 @@ -package ethreact - -import ( - "github.com/ethereum/eth-go/ethlog" - "sync" -) - -var logger = ethlog.NewLogger("REACTOR") - -type EventHandler struct { - lock sync.RWMutex - name string - chans []chan Event -} - -// Post the Event with the reactor resource on the channels -// currently subscribed to the event -func (e *EventHandler) Post(event Event) { - e.lock.RLock() - defer e.lock.RUnlock() - - // if we want to preserve order pushing to subscibed channels - // dispatching should be syncrounous - // this means if subscribed event channel is blocked (closed or has fixed capacity) - // the reactor dispatch will be blocked, so we need to mitigate by skipping - // rogue blocking subscribers - for i, ch := range e.chans { - select { - case ch <- event: - default: - logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name) - } - } -} - -// Add a subscriber to this event -func (e *EventHandler) Add(ch chan Event) { - e.lock.Lock() - defer e.lock.Unlock() - - e.chans = append(e.chans, ch) -} - -// Remove a subscriber -func (e *EventHandler) Remove(ch chan Event) int { - e.lock.Lock() - defer e.lock.Unlock() - - for i, c := range e.chans { - if c == ch { - e.chans = append(e.chans[:i], e.chans[i+1:]...) - } - } - return len(e.chans) -} - -// Basic reactor resource -type Event struct { - Resource interface{} - Name string -} - -// The reactor basic engine. Acts as bridge -// between the events and the subscribers/posters -type ReactorEngine struct { - lock sync.RWMutex - eventChannel chan Event - eventHandlers map[string]*EventHandler - quit chan bool - shutdownChannel chan bool - running bool - drained chan bool -} - -func New() *ReactorEngine { - return &ReactorEngine{ - eventHandlers: make(map[string]*EventHandler), - eventChannel: make(chan Event), - quit: make(chan bool, 1), - drained: make(chan bool, 1), - shutdownChannel: make(chan bool, 1), - } -} - -func (reactor *ReactorEngine) Start() { - reactor.lock.Lock() - defer reactor.lock.Unlock() - if !reactor.running { - go func() { - out: - for { - select { - case <-reactor.quit: - break out - case event := <-reactor.eventChannel: - // needs to be called syncronously to keep order of events - reactor.dispatch(event) - case reactor.drained <- true: - default: - reactor.drained <- true // blocking till message is coming in - } - } - reactor.lock.Lock() - defer reactor.lock.Unlock() - reactor.running = false - logger.Infoln("stopped") - close(reactor.shutdownChannel) - }() - reactor.running = true - logger.Infoln("started") - } -} - -func (reactor *ReactorEngine) Stop() { - reactor.lock.RLock() - if reactor.running { - reactor.quit <- true - select { - case <-reactor.drained: - } - } - reactor.lock.RUnlock() - <-reactor.shutdownChannel -} - -func (reactor *ReactorEngine) Flush() { - <-reactor.drained -} - -// Subscribe a channel to the specified event -func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { - reactor.lock.Lock() - defer reactor.lock.Unlock() - - eventHandler := reactor.eventHandlers[event] - // Create a new event handler if one isn't available - if eventHandler == nil { - eventHandler = &EventHandler{name: event} - reactor.eventHandlers[event] = eventHandler - } - // Add the events channel to reactor event handler - eventHandler.Add(eventChannel) - logger.Debugf("added new subscription to %s", event) -} - -func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { - reactor.lock.Lock() - defer reactor.lock.Unlock() - - eventHandler := reactor.eventHandlers[event] - if eventHandler != nil { - len := eventHandler.Remove(eventChannel) - if len == 0 { - reactor.eventHandlers[event] = nil - } - logger.Debugf("removed subscription to %s", event) - } -} - -func (reactor *ReactorEngine) Post(event string, resource interface{}) { - reactor.lock.Lock() - defer reactor.lock.Unlock() - - if reactor.running { - reactor.eventChannel <- Event{Resource: resource, Name: event} - select { - case <-reactor.drained: - } - } -} - -func (reactor *ReactorEngine) dispatch(event Event) { - name := event.Name - eventHandler := reactor.eventHandlers[name] - // if no subscriptions to this event type - no event handler created - // then noone to notify - if eventHandler != nil { - // needs to be called syncronously - eventHandler.Post(event) - } -} |