aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--event/feed.go5
-rw-r--r--event/feed_test.go39
2 files changed, 43 insertions, 1 deletions
diff --git a/event/feed.go b/event/feed.go
index 78fa3d98d..f578f00c1 100644
--- a/event/feed.go
+++ b/event/feed.go
@@ -148,7 +148,9 @@ func (f *Feed) Send(value interface{}) (nsent int) {
f.sendCases[i].Send = rvalue
}
- // Send until all channels except removeSub have been chosen.
+ // Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
+ // of sendCases. When a send succeeds, the corresponding case moves to the end of
+ // 'cases' and it shrinks by one element.
cases := f.sendCases
for {
// Fast path: try sending without blocking before adding to the select set.
@@ -170,6 +172,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
+ // Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
diff --git a/event/feed_test.go b/event/feed_test.go
index a82c10303..be8876932 100644
--- a/event/feed_test.go
+++ b/event/feed_test.go
@@ -235,6 +235,45 @@ func TestFeedUnsubscribeBlockedPost(t *testing.T) {
wg.Wait()
}
+// Checks that unsubscribing a channel during Send works even if that
+// channel has already been sent on.
+func TestFeedUnsubscribeSentChan(t *testing.T) {
+ var (
+ feed Feed
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ sub1 = feed.Subscribe(ch1)
+ sub2 = feed.Subscribe(ch2)
+ wg sync.WaitGroup
+ )
+ defer sub2.Unsubscribe()
+
+ wg.Add(1)
+ go func() {
+ feed.Send(0)
+ wg.Done()
+ }()
+
+ // Wait for the value on ch1.
+ <-ch1
+ // Unsubscribe ch1, removing it from the send cases.
+ sub1.Unsubscribe()
+
+ // Receive ch2, finishing Send.
+ <-ch2
+ wg.Wait()
+
+ // Send again. This should send to ch2 only, so the wait group will unblock
+ // as soon as a value is received on ch2.
+ wg.Add(1)
+ go func() {
+ feed.Send(0)
+ wg.Done()
+ }()
+ <-ch2
+ wg.Wait()
+}
+
func TestFeedUnsubscribeFromInbox(t *testing.T) {
var (
feed Feed