diff options
author | obscuren <geffobscura@gmail.com> | 2014-08-01 16:30:19 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2014-08-01 16:30:19 +0800 |
commit | 2e7cf835222274a311302c33498cf83bb2593b7a (patch) | |
tree | 6689e7017a14d2198771fbe5148bb8ad13e9e8a1 /ethreact | |
parent | 5ede1224e48fd82961bd4a0b2ec1a3eda0b6d99b (diff) | |
parent | 8bed47a2d4377b7a49f34350ae5c5ea50464b95d (diff) | |
download | dexon-2e7cf835222274a311302c33498cf83bb2593b7a.tar.gz dexon-2e7cf835222274a311302c33498cf83bb2593b7a.tar.zst dexon-2e7cf835222274a311302c33498cf83bb2593b7a.zip |
Merge branch 'ethersphere-feature/ethutil-refactor' into develop
Diffstat (limited to 'ethreact')
-rw-r--r-- | ethreact/README.md | 28 | ||||
-rw-r--r-- | ethreact/reactor.go | 182 | ||||
-rw-r--r-- | ethreact/reactor_test.go | 63 |
3 files changed, 273 insertions, 0 deletions
diff --git a/ethreact/README.md b/ethreact/README.md new file mode 100644 index 000000000..61af8a572 --- /dev/null +++ b/ethreact/README.md @@ -0,0 +1,28 @@ +## Reactor + +Reactor is the internal broadcast engine that allows components to be notified of ethereum stack events such as finding new blocks or change in state. +Event notification is handled via subscription: + + var blockChan = make(chan ethreact.Event, 10) + reactor.Subscribe("newBlock", blockChan) + +ethreact.Event broadcast on the channel are + + type Event struct { + Resource interface{} + Name string + } + +Resource is polimorphic depending on the event type and should be typecast before use, e.g: + + b := <-blockChan: + block := b.Resource.(*ethchain.Block) + +Events are guaranteed to be broadcast in order but the broadcast never blocks or leaks which means while the subscribing event channel is blocked (e.g., full if buffered) further messages will be skipped. + +The engine allows arbitrary events to be posted and subscribed to. + + ethereum.Reactor().Post("newBlock", newBlock) + + +
\ No newline at end of file diff --git a/ethreact/reactor.go b/ethreact/reactor.go new file mode 100644 index 000000000..7fe2356db --- /dev/null +++ b/ethreact/reactor.go @@ -0,0 +1,182 @@ +package ethreact + +import ( + "github.com/ethereum/eth-go/ethlog" + "sync" +) + +var logger = ethlog.NewLogger("REACTOR") + +const ( + eventBufferSize int = 10 +) + +type EventHandler struct { + lock sync.RWMutex + name string + chans []chan Event +} + +// Post the Event with the reactor resource on the channels +// currently subscribed to the event +func (e *EventHandler) Post(event Event) { + e.lock.RLock() + defer e.lock.RUnlock() + + // if we want to preserve order pushing to subscibed channels + // dispatching should be syncrounous + // this means if subscribed event channel is blocked + // the reactor dispatch will be blocked, so we need to mitigate by skipping + // rogue blocking subscribers + for i, ch := range e.chans { + select { + case ch <- event: + default: + logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name) + } + } +} + +// Add a subscriber to this event +func (e *EventHandler) Add(ch chan Event) { + e.lock.Lock() + defer e.lock.Unlock() + + e.chans = append(e.chans, ch) +} + +// Remove a subscriber +func (e *EventHandler) Remove(ch chan Event) int { + e.lock.Lock() + defer e.lock.Unlock() + + for i, c := range e.chans { + if c == ch { + e.chans = append(e.chans[:i], e.chans[i+1:]...) + } + } + return len(e.chans) +} + +// Basic reactor event +type Event struct { + Resource interface{} + Name string +} + +// The reactor basic engine. Acts as bridge +// between the events and the subscribers/posters +type ReactorEngine struct { + lock sync.RWMutex + eventChannel chan Event + eventHandlers map[string]*EventHandler + quit chan chan error + running bool + drained chan bool +} + +func New() *ReactorEngine { + return &ReactorEngine{ + eventHandlers: make(map[string]*EventHandler), + eventChannel: make(chan Event, eventBufferSize), + quit: make(chan chan error, 1), + drained: make(chan bool, 1), + } +} + +func (reactor *ReactorEngine) Start() { + reactor.lock.Lock() + defer reactor.lock.Unlock() + if !reactor.running { + go func() { + for { + select { + case status := <-reactor.quit: + reactor.lock.Lock() + defer reactor.lock.Unlock() + reactor.running = false + logger.Infoln("stopped") + status <- nil + return + case event := <-reactor.eventChannel: + // needs to be called syncronously to keep order of events + reactor.dispatch(event) + default: + reactor.drained <- true // blocking till message is coming in + } + } + }() + reactor.running = true + logger.Infoln("started") + } +} + +func (reactor *ReactorEngine) Stop() { + if reactor.running { + status := make(chan error) + reactor.quit <- status + select { + case <-reactor.drained: + default: + } + <-status + } +} + +func (reactor *ReactorEngine) Flush() { + <-reactor.drained +} + +// Subscribe a channel to the specified event +func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + eventHandler := reactor.eventHandlers[event] + // Create a new event handler if one isn't available + if eventHandler == nil { + eventHandler = &EventHandler{name: event} + reactor.eventHandlers[event] = eventHandler + } + // Add the events channel to reactor event handler + eventHandler.Add(eventChannel) + logger.Debugf("added new subscription to %s", event) +} + +func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + eventHandler := reactor.eventHandlers[event] + if eventHandler != nil { + len := eventHandler.Remove(eventChannel) + if len == 0 { + reactor.eventHandlers[event] = nil + } + logger.Debugf("removed subscription to %s", event) + } +} + +func (reactor *ReactorEngine) Post(event string, resource interface{}) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + if reactor.running { + reactor.eventChannel <- Event{Resource: resource, Name: event} + select { + case <-reactor.drained: + default: + } + } +} + +func (reactor *ReactorEngine) dispatch(event Event) { + name := event.Name + eventHandler := reactor.eventHandlers[name] + // if no subscriptions to this event type - no event handler created + // then noone to notify + if eventHandler != nil { + // needs to be called syncronously + eventHandler.Post(event) + } +} diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go new file mode 100644 index 000000000..801a8abd0 --- /dev/null +++ b/ethreact/reactor_test.go @@ -0,0 +1,63 @@ +package ethreact + +import ( + "fmt" + "testing" +) + +func TestReactorAdd(t *testing.T) { + reactor := New() + ch := make(chan Event) + reactor.Subscribe("test", ch) + if reactor.eventHandlers["test"] == nil { + t.Error("Expected new eventHandler to be created") + } + reactor.Unsubscribe("test", ch) + if reactor.eventHandlers["test"] != nil { + t.Error("Expected eventHandler to be removed") + } +} + +func TestReactorEvent(t *testing.T) { + var name string + reactor := New() + // Buffer the channel, so it doesn't block for this test + cap := 20 + ch := make(chan Event, cap) + reactor.Subscribe("even", ch) + reactor.Subscribe("odd", ch) + reactor.Post("even", "disappears") // should not broadcast if engine not started + reactor.Start() + for i := 0; i < cap; i++ { + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + reactor.Post(name, i) + } + reactor.Post("test", cap) // this should not block + i := 0 + reactor.Flush() + close(ch) + for event := range ch { + fmt.Printf("%d: %v", i, event) + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + if val, ok := event.Resource.(int); ok { + if i != val || event.Name != name { + t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val) + } + } else { + t.Error("Unable to cast") + } + i++ + } + if i != cap { + t.Error("excpected exactly %d events, got ", i) + } + reactor.Stop() +} |