diff options
author | Felix Lange <fjl@users.noreply.github.com> | 2018-05-10 18:26:36 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-05-10 18:26:36 +0800 |
commit | 53a18d2e2734d078200ec607055ae551245ae74b (patch) | |
tree | 0281d5d7a186cc6d79c237091b607d35a510746a /event | |
parent | 7beccb29becf439df7bf4c033a94c019ad25bead (diff) | |
download | dexon-53a18d2e2734d078200ec607055ae551245ae74b.tar.gz dexon-53a18d2e2734d078200ec607055ae551245ae74b.tar.zst dexon-53a18d2e2734d078200ec607055ae551245ae74b.zip |
event: document select case slice use and add edge case test (#16680)
Feed keeps active subscription channels in a slice called 'f.sendCases'.
The Send method tracks the active cases in a local variable 'cases'
whose value is f.sendCases initially. 'cases' shrinks to a shorter
prefix of f.sendCases every time a send succeeds, moving the successful
case out of range of the active case list.
This can be confusing because the two slices share a backing array. Add
more comments to document what is going on. Also add a test for removing
a case that is in 'f.sentCases' but not 'cases'.
Diffstat (limited to 'event')
-rw-r--r-- | event/feed.go | 5 | ||||
-rw-r--r-- | event/feed_test.go | 39 |
2 files changed, 43 insertions, 1 deletions
diff --git a/event/feed.go b/event/feed.go index 78fa3d98d..f578f00c1 100644 --- a/event/feed.go +++ b/event/feed.go @@ -148,7 +148,9 @@ func (f *Feed) Send(value interface{}) (nsent int) { f.sendCases[i].Send = rvalue } - // Send until all channels except removeSub have been chosen. + // Send until all channels except removeSub have been chosen. 'cases' tracks a prefix + // of sendCases. When a send succeeds, the corresponding case moves to the end of + // 'cases' and it shrinks by one element. cases := f.sendCases for { // Fast path: try sending without blocking before adding to the select set. @@ -170,6 +172,7 @@ func (f *Feed) Send(value interface{}) (nsent int) { index := f.sendCases.find(recv.Interface()) f.sendCases = f.sendCases.delete(index) if index >= 0 && index < len(cases) { + // Shrink 'cases' too because the removed case was still active. cases = f.sendCases[:len(cases)-1] } } else { diff --git a/event/feed_test.go b/event/feed_test.go index a82c10303..be8876932 100644 --- a/event/feed_test.go +++ b/event/feed_test.go @@ -235,6 +235,45 @@ func TestFeedUnsubscribeBlockedPost(t *testing.T) { wg.Wait() } +// Checks that unsubscribing a channel during Send works even if that +// channel has already been sent on. +func TestFeedUnsubscribeSentChan(t *testing.T) { + var ( + feed Feed + ch1 = make(chan int) + ch2 = make(chan int) + sub1 = feed.Subscribe(ch1) + sub2 = feed.Subscribe(ch2) + wg sync.WaitGroup + ) + defer sub2.Unsubscribe() + + wg.Add(1) + go func() { + feed.Send(0) + wg.Done() + }() + + // Wait for the value on ch1. + <-ch1 + // Unsubscribe ch1, removing it from the send cases. + sub1.Unsubscribe() + + // Receive ch2, finishing Send. + <-ch2 + wg.Wait() + + // Send again. This should send to ch2 only, so the wait group will unblock + // as soon as a value is received on ch2. + wg.Add(1) + go func() { + feed.Send(0) + wg.Done() + }() + <-ch2 + wg.Wait() +} + func TestFeedUnsubscribeFromInbox(t *testing.T) { var ( feed Feed |