aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzelig <viktor.tron@gmail.com>2014-07-06 02:56:01 +0800
committerzelig <viktor.tron@gmail.com>2014-07-06 02:56:01 +0800
commit5a2afc575485e2d651b9840f5d1ea080cdc72fa7 (patch)
tree9ea81946396fc584c8c8a0b64d9e9f9fff43b17a
parentd4300c406c5f98d35857b6e53b0427be5f45e3b2 (diff)
downloadgo-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.tar.gz
go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.tar.zst
go-tangerine-5a2afc575485e2d651b9840f5d1ea080cdc72fa7.zip
fix reactor engine main loop blocked to wait if drained
-rw-r--r--ethreact/reactor.go22
1 files changed, 14 insertions, 8 deletions
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
index 3802d95b3..f42f71202 100644
--- a/ethreact/reactor.go
+++ b/ethreact/reactor.go
@@ -28,7 +28,7 @@ func (e *EventHandler) Post(event Event) {
select {
case ch <- event:
default:
- logger.Warnln("subscribing channel %d to event %s blocked. skipping", i, event.Name)
+ logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
}
}
}
@@ -69,7 +69,7 @@ type ReactorEngine struct {
quit chan bool
shutdownChannel chan bool
running bool
- drained bool
+ drained chan bool
}
func New() *ReactorEngine {
@@ -77,6 +77,7 @@ func New() *ReactorEngine {
eventHandlers: make(map[string]*EventHandler),
eventChannel: make(chan Event),
quit: make(chan bool, 1),
+ drained: make(chan bool, 1),
shutdownChannel: make(chan bool, 1),
}
}
@@ -94,8 +95,9 @@ func (reactor *ReactorEngine) Start() {
case event := <-reactor.eventChannel:
// needs to be called syncronously to keep order of events
reactor.dispatch(event)
+ case reactor.drained <- true:
default:
- reactor.drained = true
+ reactor.drained <- true // blocking till message is coming in
}
}
reactor.lock.Lock()
@@ -113,14 +115,16 @@ func (reactor *ReactorEngine) Stop() {
reactor.lock.RLock()
if reactor.running {
reactor.quit <- true
+ select {
+ case <-reactor.drained:
+ }
}
reactor.lock.RUnlock()
<-reactor.shutdownChannel
}
func (reactor *ReactorEngine) Flush() {
- for !reactor.drained {
- }
+ <-reactor.drained
}
// Subscribe a channel to the specified event
@@ -136,7 +140,7 @@ func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
}
// Add the events channel to reactor event handler
eventHandler.Add(eventChannel)
- logger.Debugln("added new subscription to %s", event)
+ logger.Debugf("added new subscription to %s", event)
}
func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
@@ -149,7 +153,7 @@ func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event)
if len == 0 {
reactor.eventHandlers[event] = nil
}
- logger.Debugln("removed subscription to %s", event)
+ logger.Debugf("removed subscription to %s", event)
}
}
@@ -158,8 +162,10 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) {
defer reactor.lock.Unlock()
if reactor.running {
- reactor.drained = false
reactor.eventChannel <- Event{Resource: resource, Name: event}
+ select {
+ case <-reactor.drained:
+ }
}
}