aboutsummaryrefslogtreecommitdiffstats
path: root/ethreact/reactor.go
diff options
context:
space:
mode:
authorJeffrey Wilcke <obscuren@users.noreply.github.com>2014-07-07 16:59:16 +0800
committerJeffrey Wilcke <obscuren@users.noreply.github.com>2014-07-07 16:59:16 +0800
commit6fe9b4ab5e839be96eb1c4a619bc14fab622d8d1 (patch)
treeb8b1218e456d3daaad4a36e6ba022323c3762038 /ethreact/reactor.go
parent9dab7dcc3c99d387ad2c0a1623b574d3023bdfae (diff)
downloaddexon-6fe9b4ab5e839be96eb1c4a619bc14fab622d8d1.tar.gz
dexon-6fe9b4ab5e839be96eb1c4a619bc14fab622d8d1.tar.zst
dexon-6fe9b4ab5e839be96eb1c4a619bc14fab622d8d1.zip
Revert "ethreact - Feature/ethutil refactor"
Diffstat (limited to 'ethreact/reactor.go')
-rw-r--r--ethreact/reactor.go181
1 files changed, 0 insertions, 181 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
deleted file mode 100644
index f42f71202..000000000
--- a/ethreact/reactor.go
+++ /dev/null
@@ -1,181 +0,0 @@
-package ethreact
-
-import (
- "github.com/ethereum/eth-go/ethlog"
- "sync"
-)
-
-var logger = ethlog.NewLogger("REACTOR")
-
-type EventHandler struct {
- lock sync.RWMutex
- name string
- chans []chan Event
-}
-
-// Post the Event with the reactor resource on the channels
-// currently subscribed to the event
-func (e *EventHandler) Post(event Event) {
- e.lock.RLock()
- defer e.lock.RUnlock()
-
- // if we want to preserve order pushing to subscibed channels
- // dispatching should be syncrounous
- // this means if subscribed event channel is blocked (closed or has fixed capacity)
- // the reactor dispatch will be blocked, so we need to mitigate by skipping
- // rogue blocking subscribers
- for i, ch := range e.chans {
- select {
- case ch <- event:
- default:
- logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
- }
- }
-}
-
-// Add a subscriber to this event
-func (e *EventHandler) Add(ch chan Event) {
- e.lock.Lock()
- defer e.lock.Unlock()
-
- e.chans = append(e.chans, ch)
-}
-
-// Remove a subscriber
-func (e *EventHandler) Remove(ch chan Event) int {
- e.lock.Lock()
- defer e.lock.Unlock()
-
- for i, c := range e.chans {
- if c == ch {
- e.chans = append(e.chans[:i], e.chans[i+1:]...)
- }
- }
- return len(e.chans)
-}
-
-// Basic reactor resource
-type Event struct {
- Resource interface{}
- Name string
-}
-
-// The reactor basic engine. Acts as bridge
-// between the events and the subscribers/posters
-type ReactorEngine struct {
- lock sync.RWMutex
- eventChannel chan Event
- eventHandlers map[string]*EventHandler
- quit chan bool
- shutdownChannel chan bool
- running bool
- drained chan bool
-}
-
-func New() *ReactorEngine {
- return &ReactorEngine{
- eventHandlers: make(map[string]*EventHandler),
- eventChannel: make(chan Event),
- quit: make(chan bool, 1),
- drained: make(chan bool, 1),
- shutdownChannel: make(chan bool, 1),
- }
-}
-
-func (reactor *ReactorEngine) Start() {
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
- if !reactor.running {
- go func() {
- out:
- for {
- select {
- case <-reactor.quit:
- break out
- case event := <-reactor.eventChannel:
- // needs to be called syncronously to keep order of events
- reactor.dispatch(event)
- case reactor.drained <- true:
- default:
- reactor.drained <- true // blocking till message is coming in
- }
- }
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
- reactor.running = false
- logger.Infoln("stopped")
- close(reactor.shutdownChannel)
- }()
- reactor.running = true
- logger.Infoln("started")
- }
-}
-
-func (reactor *ReactorEngine) Stop() {
- reactor.lock.RLock()
- if reactor.running {
- reactor.quit <- true
- select {
- case <-reactor.drained:
- }
- }
- reactor.lock.RUnlock()
- <-reactor.shutdownChannel
-}
-
-func (reactor *ReactorEngine) Flush() {
- <-reactor.drained
-}
-
-// Subscribe a channel to the specified event
-func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
-
- eventHandler := reactor.eventHandlers[event]
- // Create a new event handler if one isn't available
- if eventHandler == nil {
- eventHandler = &EventHandler{name: event}
- reactor.eventHandlers[event] = eventHandler
- }
- // Add the events channel to reactor event handler
- eventHandler.Add(eventChannel)
- logger.Debugf("added new subscription to %s", event)
-}
-
-func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
-
- eventHandler := reactor.eventHandlers[event]
- if eventHandler != nil {
- len := eventHandler.Remove(eventChannel)
- if len == 0 {
- reactor.eventHandlers[event] = nil
- }
- logger.Debugf("removed subscription to %s", event)
- }
-}
-
-func (reactor *ReactorEngine) Post(event string, resource interface{}) {
- reactor.lock.Lock()
- defer reactor.lock.Unlock()
-
- if reactor.running {
- reactor.eventChannel <- Event{Resource: resource, Name: event}
- select {
- case <-reactor.drained:
- }
- }
-}
-
-func (reactor *ReactorEngine) dispatch(event Event) {
- name := event.Name
- eventHandler := reactor.eventHandlers[name]
- // if no subscriptions to this event type - no event handler created
- // then noone to notify
- if eventHandler != nil {
- // needs to be called syncronously
- eventHandler.Post(event)
- }
-}