diff options
Diffstat (limited to 'core/syncer/watch-cat.go')
-rw-r--r-- | core/syncer/watch-cat.go | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/core/syncer/watch-cat.go b/core/syncer/watch-cat.go index 5ee7f62..d08bff9 100644 --- a/core/syncer/watch-cat.go +++ b/core/syncer/watch-cat.go @@ -34,8 +34,9 @@ type configReader interface { // WatchCat is reponsible for signaling if syncer object should be terminated. type WatchCat struct { recovery core.Recovery + timeout time.Duration configReader configReader - pat chan types.Position + feed chan types.Position polling time.Duration ctx context.Context cancel context.CancelFunc @@ -47,11 +48,13 @@ func NewWatchCat( recovery core.Recovery, configReader configReader, polling time.Duration, + timeout time.Duration, logger common.Logger) *WatchCat { wc := &WatchCat{ recovery: recovery, + timeout: timeout, configReader: configReader, - pat: make(chan types.Position), + feed: make(chan types.Position), polling: polling, logger: logger, } @@ -60,11 +63,11 @@ func NewWatchCat( // Feed the WatchCat so it won't produce the termination signal. func (wc *WatchCat) Feed(position types.Position) { - wc.pat <- position + wc.feed <- position } // Start the WatchCat. -func (wc *WatchCat) Start(timeout time.Duration) { +func (wc *WatchCat) Start() { wc.Stop() wc.ctx, wc.cancel = context.WithCancel(context.Background()) go func() { @@ -79,14 +82,14 @@ func (wc *WatchCat) Start(timeout time.Duration) { select { case <-wc.ctx.Done(): return - case pos := <-wc.pat: + case pos := <-wc.feed: if !pos.Newer(lastPos) { wc.logger.Warn("Feed with older height", "pos", pos, "lastPos", lastPos) continue } lastPos = pos - case <-time.After(timeout): + case <-time.After(wc.timeout): break MonitorLoop } } @@ -95,7 +98,7 @@ func (wc *WatchCat) Start(timeout time.Duration) { select { case <-wc.ctx.Done(): return - case <-wc.pat: + case <-wc.feed: } } }() |