diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/notification.go | 297 | ||||
-rw-r--r-- | rpc/server.go | 12 | ||||
-rw-r--r-- | rpc/server_test.go | 2 | ||||
-rw-r--r-- | rpc/subscription.go | 135 | ||||
-rw-r--r-- | rpc/subscription_test.go (renamed from rpc/notification_test.go) | 34 | ||||
-rw-r--r-- | rpc/types.go | 4 | ||||
-rw-r--r-- | rpc/utils.go | 42 |
7 files changed, 200 insertions, 326 deletions
diff --git a/rpc/notification.go b/rpc/notification.go deleted file mode 100644 index 875433071..000000000 --- a/rpc/notification.go +++ /dev/null @@ -1,297 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package rpc - -import ( - "errors" - "sync" - "time" - - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "golang.org/x/net/context" -) - -var ( - // ErrNotificationsUnsupported is returned when the connection doesn't support notifications - ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport") - - // ErrNotificationNotFound is returned when the notification for the given id is not found - ErrNotificationNotFound = errors.New("notification not found") - - // errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed) - errNotifierStopped = errors.New("unable to send notification") - - // errNotificationQueueFull is returns when there are too many notifications in the queue - errNotificationQueueFull = errors.New("too many pending notifications") -) - -// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered -// notifications that might be pending in the internal queue. -var unsubSignal = new(struct{}) - -// UnsubscribeCallback defines a callback that is called when a subcription ends. -// It receives the subscription id as argument. -type UnsubscribeCallback func(id string) - -// notification is a helper object that holds event data for a subscription -type notification struct { - sub *bufferedSubscription // subscription id - data interface{} // event data -} - -// A Notifier type describes the interface for objects that can send create subscriptions -type Notifier interface { - // Create a new subscription. The given callback is called when this subscription - // is cancelled (e.g. client send an unsubscribe, connection closed). - NewSubscription(UnsubscribeCallback) (Subscription, error) - // Cancel subscription - Unsubscribe(id string) error -} - -type notifierKey struct{} - -// NotifierFromContext returns the Notifier value stored in ctx, if any. -func NotifierFromContext(ctx context.Context) (Notifier, bool) { - n, ok := ctx.Value(notifierKey{}).(Notifier) - return n, ok -} - -// Subscription defines the interface for objects that can notify subscribers -type Subscription interface { - // Inform client of an event - Notify(data interface{}) error - // Unique identifier - ID() string - // Cancel subscription - Cancel() error -} - -// bufferedSubscription is a subscription that uses a bufferedNotifier to send -// notifications to subscribers. -type bufferedSubscription struct { - id string - unsubOnce sync.Once // call unsub method once - unsub UnsubscribeCallback // called on Unsubscribed - notifier *bufferedNotifier // forward notifications to - pending chan interface{} // closed when active - flushed chan interface{} // closed when all buffered notifications are send - lastNotification time.Time // last time a notification was send -} - -// ID returns the subscription identifier that the client uses to refer to this instance. -func (s *bufferedSubscription) ID() string { - return s.id -} - -// Cancel informs the notifier that this subscription is cancelled by the API -func (s *bufferedSubscription) Cancel() error { - return s.notifier.Unsubscribe(s.id) -} - -// Notify the subscriber of a particular event. -func (s *bufferedSubscription) Notify(data interface{}) error { - return s.notifier.send(s.id, data) -} - -// bufferedNotifier is a notifier that queues notifications in an internal queue and -// send them as fast as possible to the client from this queue. It will stop if the -// queue grows past a given size. -type bufferedNotifier struct { - codec ServerCodec // underlying connection - mu sync.Mutex // guard internal state - subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec - queueSize int // max number of items in queue - queue chan *notification // notification queue - stopped bool // indication if this notifier is ordered to stop -} - -// newBufferedNotifier returns a notifier that queues notifications in an internal queue -// from which notifications are send as fast as possible to the client. If the queue size -// limit is reached (client is unable to keep up) it will stop and closes the codec. -func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier { - notifier := &bufferedNotifier{ - codec: codec, - subscriptions: make(map[string]*bufferedSubscription), - queue: make(chan *notification, size), - queueSize: size, - } - - go notifier.run() - - return notifier -} - -// NewSubscription creates a new subscription that forwards events to this instance internal -// queue. The given callback is called when the subscription is unsubscribed/cancelled. -func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) { - id, err := newSubscriptionID() - if err != nil { - return nil, err - } - - n.mu.Lock() - defer n.mu.Unlock() - - if n.stopped { - return nil, errNotifierStopped - } - - sub := &bufferedSubscription{ - id: id, - unsub: callback, - notifier: n, - pending: make(chan interface{}), - flushed: make(chan interface{}), - lastNotification: time.Now(), - } - - n.subscriptions[id] = sub - - return sub, nil -} - -// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned. -func (n *bufferedNotifier) Unsubscribe(subid string) error { - n.mu.Lock() - sub, found := n.subscriptions[subid] - n.mu.Unlock() - - if found { - // send the unsubscribe signal, this will cause the notifier not to accept new events - // for this subscription and will close the flushed channel after the last (buffered) - // notification was send to the client. - if err := n.send(subid, unsubSignal); err != nil { - return err - } - - // wait for confirmation that all (buffered) events are send for this subscription. - // this ensures that the unsubscribe method response is not send before all buffered - // events for this subscription are send. - <-sub.flushed - - return nil - } - - return ErrNotificationNotFound -} - -// Send enques the given data for the subscription with public ID on the internal queue. t returns -// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it -// will remove the subscription with the given id from the subscription collection. -func (n *bufferedNotifier) send(id string, data interface{}) error { - n.mu.Lock() - defer n.mu.Unlock() - - if n.stopped { - return errNotifierStopped - } - - var ( - subscription *bufferedSubscription - found bool - ) - - // check if subscription is associated with this connection, it might be cancelled - // (subscribe/connection closed) - if subscription, found = n.subscriptions[id]; !found { - glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id) - return ErrNotificationNotFound - } - - // received the unsubscribe signal. Add it to the queue to make sure any pending notifications - // for this subscription are send. When the run loop receives this singal it will signal that - // all pending subscriptions are flushed and that the confirmation of the unsubscribe can be - // send to the user. Remove the subscriptions to make sure new notifications are not accepted. - if data == unsubSignal { - delete(n.subscriptions, id) - if subscription.unsub != nil { - subscription.unsubOnce.Do(func() { subscription.unsub(id) }) - } - } - - subscription.lastNotification = time.Now() - - if len(n.queue) >= n.queueSize { - glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection") - n.codec.Close() - return errNotificationQueueFull - } - - n.queue <- ¬ification{subscription, data} - return nil -} - -// run reads notifications from the internal queue and sends them to the client. In case of an -// error, or when the codec is closed it will cancel all active subscriptions and returns. -func (n *bufferedNotifier) run() { - defer func() { - n.mu.Lock() - defer n.mu.Unlock() - - n.stopped = true - close(n.queue) - - // on exit call unsubscribe callback - for id, sub := range n.subscriptions { - if sub.unsub != nil { - sub.unsubOnce.Do(func() { sub.unsub(id) }) - } - close(sub.flushed) - delete(n.subscriptions, id) - } - }() - - for { - select { - case notification := <-n.queue: - // It can happen that an event is raised before the RPC server was able to send the sub - // id to the client. Therefore subscriptions are marked as pending until the sub id was - // send. The RPC server will activate the subscription by closing the pending chan. - <-notification.sub.pending - - if notification.data == unsubSignal { - // unsubSignal is the last accepted message for this subscription. Raise the signal - // that all buffered notifications are sent by closing the flushed channel. This - // indicates that the response for the unsubscribe can be send to the client. - close(notification.sub.flushed) - } else { - msg := n.codec.CreateNotification(notification.sub.id, notification.data) - if err := n.codec.Write(msg); err != nil { - n.codec.Close() - // unable to send notification to client, unsubscribe all subscriptions - glog.V(logger.Warn).Infof("unable to send notification - %v\n", err) - return - } - } - case <-n.codec.Closed(): // connection was closed - glog.V(logger.Debug).Infoln("codec closed, stop subscriptions") - return - } - } -} - -// Marks the subscription as active. This will causes the notifications for this subscription to be -// forwarded to the client. -func (n *bufferedNotifier) activate(subid string) { - n.mu.Lock() - defer n.mu.Unlock() - - if sub, found := n.subscriptions[subid]; found { - close(sub.pending) - } -} diff --git a/rpc/server.go b/rpc/server.go index 040805a5c..996c63700 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -166,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO // to send notification to clients. It is thight to the codec/connection. If the // connection is closed the notifier will stop and cancels all active subscriptions. if options&OptionSubscriptions == OptionSubscriptions { - ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize)) + ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec)) } s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped @@ -247,7 +247,7 @@ func (s *Server) Stop() { } // createSubscription will call the subscription callback and returns the subscription id or error. -func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) { +func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) { // subscription have as first argument the context following optional arguments args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)} args = append(args, req.args...) @@ -257,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser return "", reply[1].Interface().(error) } - return reply[0].Interface().(Subscription).ID(), nil + return reply[0].Interface().(*Subscription).ID, nil } // handle executes a request and returns the response from the callback. @@ -273,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil } - subid := req.args[0].String() - if err := notifier.Unsubscribe(subid); err != nil { + subid := ID(req.args[0].String()) + if err := notifier.unsubscribe(subid); err != nil { return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil } @@ -292,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque // active the subscription after the sub id was successfully sent to the client activateSub := func() { notifier, _ := NotifierFromContext(ctx) - notifier.(*bufferedNotifier).activate(subid) + notifier.activate(subid) } return codec.CreateResponse(req.id, subid), activateSub diff --git a/rpc/server_test.go b/rpc/server_test.go index e6840bde4..c3c88fab7 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -72,7 +72,7 @@ func (s *Service) InvalidRets3() (string, string, error) { return "", "", nil } -func (s *Service) Subscription(ctx context.Context) (Subscription, error) { +func (s *Service) Subscription(ctx context.Context) (*Subscription, error) { return nil, nil } diff --git a/rpc/subscription.go b/rpc/subscription.go new file mode 100644 index 000000000..863d34b20 --- /dev/null +++ b/rpc/subscription.go @@ -0,0 +1,135 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package rpc + +import ( + "errors" + "sync" + + "golang.org/x/net/context" +) + +var ( + // ErrNotificationsUnsupported is returned when the connection doesn't support notifications + ErrNotificationsUnsupported = errors.New("notifications not supported") + // ErrNotificationNotFound is returned when the notification for the given id is not found + ErrSubscriptionNotFound = errors.New("subscription not found") +) + +// ID defines a psuedo random number that is used to identify RPC subscriptions. +type ID string + +// a Subscription is created by a notifier and tight to that notifier. The client can use +// this subscription to wait for an unsubscribe request for the client, see Err(). +type Subscription struct { + ID ID + err chan error // closed on unsubscribe +} + +// Err returns a channel that is closed when the client send an unsubscribe request. +func (s *Subscription) Err() <-chan error { + return s.err +} + +// notifierKey is used to store a notifier within the connection context. +type notifierKey struct{} + +// Notifier is tight to a RPC connection that supports subscriptions. +// Server callbacks use the notifier to send notifications. +type Notifier struct { + codec ServerCodec + subMu sync.RWMutex // guards active and inactive maps + stopped bool + active map[ID]*Subscription + inactive map[ID]*Subscription +} + +// newNotifier creates a new notifier that can be used to send subscription +// notifications to the client. +func newNotifier(codec ServerCodec) *Notifier { + return &Notifier{ + codec: codec, + active: make(map[ID]*Subscription), + inactive: make(map[ID]*Subscription), + } +} + +// NotifierFromContext returns the Notifier value stored in ctx, if any. +func NotifierFromContext(ctx context.Context) (*Notifier, bool) { + n, ok := ctx.Value(notifierKey{}).(*Notifier) + return n, ok +} + +// CreateSubscription returns a new subscription that is coupled to the +// RPC connection. By default subscriptions are inactive and notifications +// are dropped until the subscription is marked as active. This is done +// by the RPC server after the subscription ID is send to the client. +func (n *Notifier) CreateSubscription() *Subscription { + s := &Subscription{NewID(), make(chan error)} + n.subMu.Lock() + n.inactive[s.ID] = s + n.subMu.Unlock() + return s +} + +// Notify sends a notification to the client with the given data as payload. +// If an error occurs the RPC connection is closed and the error is returned. +func (n *Notifier) Notify(id ID, data interface{}) error { + n.subMu.RLock() + defer n.subMu.RUnlock() + + _, active := n.active[id] + if active { + notification := n.codec.CreateNotification(string(id), data) + if err := n.codec.Write(notification); err != nil { + n.codec.Close() + return err + } + } + return nil +} + +// Closed returns a channel that is closed when the RPC connection is closed. +func (n *Notifier) Closed() <-chan interface{} { + return n.codec.Closed() +} + +// unsubscribe a subscription. +// If the subscription could not be found ErrSubscriptionNotFound is returned. +func (n *Notifier) unsubscribe(id ID) error { + n.subMu.Lock() + defer n.subMu.Unlock() + if s, found := n.active[id]; found { + close(s.err) + delete(n.active, id) + return nil + } + return ErrSubscriptionNotFound +} + +// activate enables a subscription. Until a subscription is enabled all +// notifications are dropped. This method is called by the RPC server after +// the subscription ID was sent to client. This prevents notifications being +// send to the client before the subscription ID is send to the client. +func (n *Notifier) activate(id ID) { + n.subMu.Lock() + defer n.subMu.Unlock() + if sub, found := n.inactive[id]; found { + n.active[id] = sub + delete(n.inactive, id) + } +} diff --git a/rpc/notification_test.go b/rpc/subscription_test.go index 52352848c..8bb341694 100644 --- a/rpc/notification_test.go +++ b/rpc/subscription_test.go @@ -50,7 +50,7 @@ func (s *NotificationTestService) Unsubscribe(subid string) { s.mu.Unlock() } -func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) { +func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) { notifier, supported := NotifierFromContext(ctx) if !supported { return nil, ErrNotificationsUnsupported @@ -59,17 +59,29 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i // by explicitly creating an subscription we make sure that the subscription id is send back to the client // before the first subscription.Notify is called. Otherwise the events might be send before the response // for the eth_subscribe method. - subscription, err := notifier.NewSubscription(s.Unsubscribe) - if err != nil { - return nil, err - } + subscription := notifier.CreateSubscription() go func() { + // test expects n events, if we begin sending event immediatly some events + // will probably be dropped since the subscription ID might not be send to + // the client. + time.Sleep(5 * time.Second) for i := 0; i < n; i++ { - if err := subscription.Notify(val + i); err != nil { + if err := notifier.Notify(subscription.ID, val+i); err != nil { return } } + + select { + case <-notifier.Closed(): + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() + case <-subscription.Err(): + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() + } }() return subscription, nil @@ -77,7 +89,7 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i // HangSubscription blocks on s.unblockHangSubscription before // sending anything. -func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) { +func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) { notifier, supported := NotifierFromContext(ctx) if !supported { return nil, ErrNotificationsUnsupported @@ -85,12 +97,10 @@ func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) s.gotHangSubscriptionReq <- struct{}{} <-s.unblockHangSubscription - subscription, err := notifier.NewSubscription(s.Unsubscribe) - if err != nil { - return nil, err - } + subscription := notifier.CreateSubscription() + go func() { - subscription.Notify(val) + notifier.Notify(subscription.ID, val) }() return subscription, nil } diff --git a/rpc/types.go b/rpc/types.go index 2a7268ad8..89c5b5bc9 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -269,6 +269,6 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error { return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber) } -func (bn *BlockNumber) Int64() int64 { - return (int64)(*bn) +func (bn BlockNumber) Int64() int64 { + return (int64)(bn) } diff --git a/rpc/utils.go b/rpc/utils.go index 1ac6698f5..b590ba62f 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -17,17 +17,26 @@ package rpc import ( - "crypto/rand" + "bufio" + crand "crypto/rand" + "encoding/binary" "encoding/hex" - "errors" "math/big" + "math/rand" "reflect" + "sync" + "time" "unicode" "unicode/utf8" "golang.org/x/net/context" ) +var ( + subscriptionIDGenMu sync.Mutex + subscriptionIDGen = idGenerator() +) + // Is this an exported - upper case - name? func isExported(name string) bool { rune, _ := utf8.DecodeRuneInString(name) @@ -218,11 +227,28 @@ METHODS: return callbacks, subscriptions } -func newSubscriptionID() (string, error) { - var subid [16]byte - n, _ := rand.Read(subid[:]) - if n != 16 { - return "", errors.New("Unable to generate subscription id") +// idGenerator helper utility that generates a (pseudo) random sequence of +// bytes that are used to generate identifiers. +func idGenerator() *rand.Rand { + if seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)); err == nil { + return rand.New(rand.NewSource(seed)) + } + return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) +} + +// NewID generates a identifier that can be used as an identifier in the RPC interface. +// e.g. filter and subscription identifier. +func NewID() ID { + subscriptionIDGenMu.Lock() + defer subscriptionIDGenMu.Unlock() + + id := make([]byte, 16) + for i := 0; i < len(id); i += 7 { + val := subscriptionIDGen.Int63() + for j := 0; i+j < len(id) && j < 7; j++ { + id[i+j] = byte(val) + val >>= 8 + } } - return "0x" + hex.EncodeToString(subid[:]), nil + return ID("0x" + hex.EncodeToString(id)) } |