aboutsummaryrefslogtreecommitdiffstats
path: root/event/feed_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'event/feed_test.go')
-rw-r--r--event/feed_test.go68
1 files changed, 68 insertions, 0 deletions
diff --git a/event/feed_test.go b/event/feed_test.go
index 4f897c162..a82c10303 100644
--- a/event/feed_test.go
+++ b/event/feed_test.go
@@ -167,6 +167,74 @@ func TestFeedSubscribeSameChannel(t *testing.T) {
done.Wait()
}
+func TestFeedSubscribeBlockedPost(t *testing.T) {
+ var (
+ feed Feed
+ nsends = 2000
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ wg sync.WaitGroup
+ )
+ defer wg.Wait()
+
+ feed.Subscribe(ch1)
+ wg.Add(nsends)
+ for i := 0; i < nsends; i++ {
+ go func() {
+ feed.Send(99)
+ wg.Done()
+ }()
+ }
+
+ sub2 := feed.Subscribe(ch2)
+ defer sub2.Unsubscribe()
+
+ // We're done when ch1 has received N times.
+ // The number of receives on ch2 depends on scheduling.
+ for i := 0; i < nsends; {
+ select {
+ case <-ch1:
+ i++
+ case <-ch2:
+ }
+ }
+}
+
+func TestFeedUnsubscribeBlockedPost(t *testing.T) {
+ var (
+ feed Feed
+ nsends = 200
+ chans = make([]chan int, 2000)
+ subs = make([]Subscription, len(chans))
+ bchan = make(chan int)
+ bsub = feed.Subscribe(bchan)
+ wg sync.WaitGroup
+ )
+ for i := range chans {
+ chans[i] = make(chan int, nsends)
+ }
+
+ // Queue up some Sends. None of these can make progress while bchan isn't read.
+ wg.Add(nsends)
+ for i := 0; i < nsends; i++ {
+ go func() {
+ feed.Send(99)
+ wg.Done()
+ }()
+ }
+ // Subscribe the other channels.
+ for i, ch := range chans {
+ subs[i] = feed.Subscribe(ch)
+ }
+ // Unsubscribe them again.
+ for _, sub := range subs {
+ sub.Unsubscribe()
+ }
+ // Unblock the Sends.
+ bsub.Unsubscribe()
+ wg.Wait()
+}
+
func TestFeedUnsubscribeFromInbox(t *testing.T) {
var (
feed Feed