aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'rpc')
-rw-r--r--rpc/notification.go297
-rw-r--r--rpc/server.go12
-rw-r--r--rpc/server_test.go2
-rw-r--r--rpc/subscription.go135
-rw-r--r--rpc/subscription_test.go (renamed from rpc/notification_test.go)34
-rw-r--r--rpc/types.go4
-rw-r--r--rpc/utils.go42
7 files changed, 200 insertions, 326 deletions
diff --git a/rpc/notification.go b/rpc/notification.go
deleted file mode 100644
index 875433071..000000000
--- a/rpc/notification.go
+++ /dev/null
@@ -1,297 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package rpc
-
-import (
- "errors"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "golang.org/x/net/context"
-)
-
-var (
- // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
- ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport")
-
- // ErrNotificationNotFound is returned when the notification for the given id is not found
- ErrNotificationNotFound = errors.New("notification not found")
-
- // errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed)
- errNotifierStopped = errors.New("unable to send notification")
-
- // errNotificationQueueFull is returns when there are too many notifications in the queue
- errNotificationQueueFull = errors.New("too many pending notifications")
-)
-
-// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered
-// notifications that might be pending in the internal queue.
-var unsubSignal = new(struct{})
-
-// UnsubscribeCallback defines a callback that is called when a subcription ends.
-// It receives the subscription id as argument.
-type UnsubscribeCallback func(id string)
-
-// notification is a helper object that holds event data for a subscription
-type notification struct {
- sub *bufferedSubscription // subscription id
- data interface{} // event data
-}
-
-// A Notifier type describes the interface for objects that can send create subscriptions
-type Notifier interface {
- // Create a new subscription. The given callback is called when this subscription
- // is cancelled (e.g. client send an unsubscribe, connection closed).
- NewSubscription(UnsubscribeCallback) (Subscription, error)
- // Cancel subscription
- Unsubscribe(id string) error
-}
-
-type notifierKey struct{}
-
-// NotifierFromContext returns the Notifier value stored in ctx, if any.
-func NotifierFromContext(ctx context.Context) (Notifier, bool) {
- n, ok := ctx.Value(notifierKey{}).(Notifier)
- return n, ok
-}
-
-// Subscription defines the interface for objects that can notify subscribers
-type Subscription interface {
- // Inform client of an event
- Notify(data interface{}) error
- // Unique identifier
- ID() string
- // Cancel subscription
- Cancel() error
-}
-
-// bufferedSubscription is a subscription that uses a bufferedNotifier to send
-// notifications to subscribers.
-type bufferedSubscription struct {
- id string
- unsubOnce sync.Once // call unsub method once
- unsub UnsubscribeCallback // called on Unsubscribed
- notifier *bufferedNotifier // forward notifications to
- pending chan interface{} // closed when active
- flushed chan interface{} // closed when all buffered notifications are send
- lastNotification time.Time // last time a notification was send
-}
-
-// ID returns the subscription identifier that the client uses to refer to this instance.
-func (s *bufferedSubscription) ID() string {
- return s.id
-}
-
-// Cancel informs the notifier that this subscription is cancelled by the API
-func (s *bufferedSubscription) Cancel() error {
- return s.notifier.Unsubscribe(s.id)
-}
-
-// Notify the subscriber of a particular event.
-func (s *bufferedSubscription) Notify(data interface{}) error {
- return s.notifier.send(s.id, data)
-}
-
-// bufferedNotifier is a notifier that queues notifications in an internal queue and
-// send them as fast as possible to the client from this queue. It will stop if the
-// queue grows past a given size.
-type bufferedNotifier struct {
- codec ServerCodec // underlying connection
- mu sync.Mutex // guard internal state
- subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec
- queueSize int // max number of items in queue
- queue chan *notification // notification queue
- stopped bool // indication if this notifier is ordered to stop
-}
-
-// newBufferedNotifier returns a notifier that queues notifications in an internal queue
-// from which notifications are send as fast as possible to the client. If the queue size
-// limit is reached (client is unable to keep up) it will stop and closes the codec.
-func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier {
- notifier := &bufferedNotifier{
- codec: codec,
- subscriptions: make(map[string]*bufferedSubscription),
- queue: make(chan *notification, size),
- queueSize: size,
- }
-
- go notifier.run()
-
- return notifier
-}
-
-// NewSubscription creates a new subscription that forwards events to this instance internal
-// queue. The given callback is called when the subscription is unsubscribed/cancelled.
-func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) {
- id, err := newSubscriptionID()
- if err != nil {
- return nil, err
- }
-
- n.mu.Lock()
- defer n.mu.Unlock()
-
- if n.stopped {
- return nil, errNotifierStopped
- }
-
- sub := &bufferedSubscription{
- id: id,
- unsub: callback,
- notifier: n,
- pending: make(chan interface{}),
- flushed: make(chan interface{}),
- lastNotification: time.Now(),
- }
-
- n.subscriptions[id] = sub
-
- return sub, nil
-}
-
-// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned.
-func (n *bufferedNotifier) Unsubscribe(subid string) error {
- n.mu.Lock()
- sub, found := n.subscriptions[subid]
- n.mu.Unlock()
-
- if found {
- // send the unsubscribe signal, this will cause the notifier not to accept new events
- // for this subscription and will close the flushed channel after the last (buffered)
- // notification was send to the client.
- if err := n.send(subid, unsubSignal); err != nil {
- return err
- }
-
- // wait for confirmation that all (buffered) events are send for this subscription.
- // this ensures that the unsubscribe method response is not send before all buffered
- // events for this subscription are send.
- <-sub.flushed
-
- return nil
- }
-
- return ErrNotificationNotFound
-}
-
-// Send enques the given data for the subscription with public ID on the internal queue. t returns
-// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it
-// will remove the subscription with the given id from the subscription collection.
-func (n *bufferedNotifier) send(id string, data interface{}) error {
- n.mu.Lock()
- defer n.mu.Unlock()
-
- if n.stopped {
- return errNotifierStopped
- }
-
- var (
- subscription *bufferedSubscription
- found bool
- )
-
- // check if subscription is associated with this connection, it might be cancelled
- // (subscribe/connection closed)
- if subscription, found = n.subscriptions[id]; !found {
- glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id)
- return ErrNotificationNotFound
- }
-
- // received the unsubscribe signal. Add it to the queue to make sure any pending notifications
- // for this subscription are send. When the run loop receives this singal it will signal that
- // all pending subscriptions are flushed and that the confirmation of the unsubscribe can be
- // send to the user. Remove the subscriptions to make sure new notifications are not accepted.
- if data == unsubSignal {
- delete(n.subscriptions, id)
- if subscription.unsub != nil {
- subscription.unsubOnce.Do(func() { subscription.unsub(id) })
- }
- }
-
- subscription.lastNotification = time.Now()
-
- if len(n.queue) >= n.queueSize {
- glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection")
- n.codec.Close()
- return errNotificationQueueFull
- }
-
- n.queue <- &notification{subscription, data}
- return nil
-}
-
-// run reads notifications from the internal queue and sends them to the client. In case of an
-// error, or when the codec is closed it will cancel all active subscriptions and returns.
-func (n *bufferedNotifier) run() {
- defer func() {
- n.mu.Lock()
- defer n.mu.Unlock()
-
- n.stopped = true
- close(n.queue)
-
- // on exit call unsubscribe callback
- for id, sub := range n.subscriptions {
- if sub.unsub != nil {
- sub.unsubOnce.Do(func() { sub.unsub(id) })
- }
- close(sub.flushed)
- delete(n.subscriptions, id)
- }
- }()
-
- for {
- select {
- case notification := <-n.queue:
- // It can happen that an event is raised before the RPC server was able to send the sub
- // id to the client. Therefore subscriptions are marked as pending until the sub id was
- // send. The RPC server will activate the subscription by closing the pending chan.
- <-notification.sub.pending
-
- if notification.data == unsubSignal {
- // unsubSignal is the last accepted message for this subscription. Raise the signal
- // that all buffered notifications are sent by closing the flushed channel. This
- // indicates that the response for the unsubscribe can be send to the client.
- close(notification.sub.flushed)
- } else {
- msg := n.codec.CreateNotification(notification.sub.id, notification.data)
- if err := n.codec.Write(msg); err != nil {
- n.codec.Close()
- // unable to send notification to client, unsubscribe all subscriptions
- glog.V(logger.Warn).Infof("unable to send notification - %v\n", err)
- return
- }
- }
- case <-n.codec.Closed(): // connection was closed
- glog.V(logger.Debug).Infoln("codec closed, stop subscriptions")
- return
- }
- }
-}
-
-// Marks the subscription as active. This will causes the notifications for this subscription to be
-// forwarded to the client.
-func (n *bufferedNotifier) activate(subid string) {
- n.mu.Lock()
- defer n.mu.Unlock()
-
- if sub, found := n.subscriptions[subid]; found {
- close(sub.pending)
- }
-}
diff --git a/rpc/server.go b/rpc/server.go
index 040805a5c..996c63700 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -166,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
// to send notification to clients. It is thight to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
if options&OptionSubscriptions == OptionSubscriptions {
- ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize))
+ ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
}
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
@@ -247,7 +247,7 @@ func (s *Server) Stop() {
}
// createSubscription will call the subscription callback and returns the subscription id or error.
-func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) {
+func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
// subscription have as first argument the context following optional arguments
args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
args = append(args, req.args...)
@@ -257,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser
return "", reply[1].Interface().(error)
}
- return reply[0].Interface().(Subscription).ID(), nil
+ return reply[0].Interface().(*Subscription).ID, nil
}
// handle executes a request and returns the response from the callback.
@@ -273,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
}
- subid := req.args[0].String()
- if err := notifier.Unsubscribe(subid); err != nil {
+ subid := ID(req.args[0].String())
+ if err := notifier.unsubscribe(subid); err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
@@ -292,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
- notifier.(*bufferedNotifier).activate(subid)
+ notifier.activate(subid)
}
return codec.CreateResponse(req.id, subid), activateSub
diff --git a/rpc/server_test.go b/rpc/server_test.go
index e6840bde4..c3c88fab7 100644
--- a/rpc/server_test.go
+++ b/rpc/server_test.go
@@ -72,7 +72,7 @@ func (s *Service) InvalidRets3() (string, string, error) {
return "", "", nil
}
-func (s *Service) Subscription(ctx context.Context) (Subscription, error) {
+func (s *Service) Subscription(ctx context.Context) (*Subscription, error) {
return nil, nil
}
diff --git a/rpc/subscription.go b/rpc/subscription.go
new file mode 100644
index 000000000..863d34b20
--- /dev/null
+++ b/rpc/subscription.go
@@ -0,0 +1,135 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+ "errors"
+ "sync"
+
+ "golang.org/x/net/context"
+)
+
+var (
+ // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
+ ErrNotificationsUnsupported = errors.New("notifications not supported")
+ // ErrNotificationNotFound is returned when the notification for the given id is not found
+ ErrSubscriptionNotFound = errors.New("subscription not found")
+)
+
+// ID defines a psuedo random number that is used to identify RPC subscriptions.
+type ID string
+
+// a Subscription is created by a notifier and tight to that notifier. The client can use
+// this subscription to wait for an unsubscribe request for the client, see Err().
+type Subscription struct {
+ ID ID
+ err chan error // closed on unsubscribe
+}
+
+// Err returns a channel that is closed when the client send an unsubscribe request.
+func (s *Subscription) Err() <-chan error {
+ return s.err
+}
+
+// notifierKey is used to store a notifier within the connection context.
+type notifierKey struct{}
+
+// Notifier is tight to a RPC connection that supports subscriptions.
+// Server callbacks use the notifier to send notifications.
+type Notifier struct {
+ codec ServerCodec
+ subMu sync.RWMutex // guards active and inactive maps
+ stopped bool
+ active map[ID]*Subscription
+ inactive map[ID]*Subscription
+}
+
+// newNotifier creates a new notifier that can be used to send subscription
+// notifications to the client.
+func newNotifier(codec ServerCodec) *Notifier {
+ return &Notifier{
+ codec: codec,
+ active: make(map[ID]*Subscription),
+ inactive: make(map[ID]*Subscription),
+ }
+}
+
+// NotifierFromContext returns the Notifier value stored in ctx, if any.
+func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
+ n, ok := ctx.Value(notifierKey{}).(*Notifier)
+ return n, ok
+}
+
+// CreateSubscription returns a new subscription that is coupled to the
+// RPC connection. By default subscriptions are inactive and notifications
+// are dropped until the subscription is marked as active. This is done
+// by the RPC server after the subscription ID is send to the client.
+func (n *Notifier) CreateSubscription() *Subscription {
+ s := &Subscription{NewID(), make(chan error)}
+ n.subMu.Lock()
+ n.inactive[s.ID] = s
+ n.subMu.Unlock()
+ return s
+}
+
+// Notify sends a notification to the client with the given data as payload.
+// If an error occurs the RPC connection is closed and the error is returned.
+func (n *Notifier) Notify(id ID, data interface{}) error {
+ n.subMu.RLock()
+ defer n.subMu.RUnlock()
+
+ _, active := n.active[id]
+ if active {
+ notification := n.codec.CreateNotification(string(id), data)
+ if err := n.codec.Write(notification); err != nil {
+ n.codec.Close()
+ return err
+ }
+ }
+ return nil
+}
+
+// Closed returns a channel that is closed when the RPC connection is closed.
+func (n *Notifier) Closed() <-chan interface{} {
+ return n.codec.Closed()
+}
+
+// unsubscribe a subscription.
+// If the subscription could not be found ErrSubscriptionNotFound is returned.
+func (n *Notifier) unsubscribe(id ID) error {
+ n.subMu.Lock()
+ defer n.subMu.Unlock()
+ if s, found := n.active[id]; found {
+ close(s.err)
+ delete(n.active, id)
+ return nil
+ }
+ return ErrSubscriptionNotFound
+}
+
+// activate enables a subscription. Until a subscription is enabled all
+// notifications are dropped. This method is called by the RPC server after
+// the subscription ID was sent to client. This prevents notifications being
+// send to the client before the subscription ID is send to the client.
+func (n *Notifier) activate(id ID) {
+ n.subMu.Lock()
+ defer n.subMu.Unlock()
+ if sub, found := n.inactive[id]; found {
+ n.active[id] = sub
+ delete(n.inactive, id)
+ }
+}
diff --git a/rpc/notification_test.go b/rpc/subscription_test.go
index 52352848c..8bb341694 100644
--- a/rpc/notification_test.go
+++ b/rpc/subscription_test.go
@@ -50,7 +50,7 @@ func (s *NotificationTestService) Unsubscribe(subid string) {
s.mu.Unlock()
}
-func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
+func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) {
notifier, supported := NotifierFromContext(ctx)
if !supported {
return nil, ErrNotificationsUnsupported
@@ -59,17 +59,29 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
// by explicitly creating an subscription we make sure that the subscription id is send back to the client
// before the first subscription.Notify is called. Otherwise the events might be send before the response
// for the eth_subscribe method.
- subscription, err := notifier.NewSubscription(s.Unsubscribe)
- if err != nil {
- return nil, err
- }
+ subscription := notifier.CreateSubscription()
go func() {
+ // test expects n events, if we begin sending event immediatly some events
+ // will probably be dropped since the subscription ID might not be send to
+ // the client.
+ time.Sleep(5 * time.Second)
for i := 0; i < n; i++ {
- if err := subscription.Notify(val + i); err != nil {
+ if err := notifier.Notify(subscription.ID, val+i); err != nil {
return
}
}
+
+ select {
+ case <-notifier.Closed():
+ s.mu.Lock()
+ s.unsubscribed = true
+ s.mu.Unlock()
+ case <-subscription.Err():
+ s.mu.Lock()
+ s.unsubscribed = true
+ s.mu.Unlock()
+ }
}()
return subscription, nil
@@ -77,7 +89,7 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
// HangSubscription blocks on s.unblockHangSubscription before
// sending anything.
-func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) {
+func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) {
notifier, supported := NotifierFromContext(ctx)
if !supported {
return nil, ErrNotificationsUnsupported
@@ -85,12 +97,10 @@ func (s *NotificationTestService) HangSubscription(ctx context.Context, val int)
s.gotHangSubscriptionReq <- struct{}{}
<-s.unblockHangSubscription
- subscription, err := notifier.NewSubscription(s.Unsubscribe)
- if err != nil {
- return nil, err
- }
+ subscription := notifier.CreateSubscription()
+
go func() {
- subscription.Notify(val)
+ notifier.Notify(subscription.ID, val)
}()
return subscription, nil
}
diff --git a/rpc/types.go b/rpc/types.go
index 2a7268ad8..89c5b5bc9 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -269,6 +269,6 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber)
}
-func (bn *BlockNumber) Int64() int64 {
- return (int64)(*bn)
+func (bn BlockNumber) Int64() int64 {
+ return (int64)(bn)
}
diff --git a/rpc/utils.go b/rpc/utils.go
index 1ac6698f5..b590ba62f 100644
--- a/rpc/utils.go
+++ b/rpc/utils.go
@@ -17,17 +17,26 @@
package rpc
import (
- "crypto/rand"
+ "bufio"
+ crand "crypto/rand"
+ "encoding/binary"
"encoding/hex"
- "errors"
"math/big"
+ "math/rand"
"reflect"
+ "sync"
+ "time"
"unicode"
"unicode/utf8"
"golang.org/x/net/context"
)
+var (
+ subscriptionIDGenMu sync.Mutex
+ subscriptionIDGen = idGenerator()
+)
+
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
@@ -218,11 +227,28 @@ METHODS:
return callbacks, subscriptions
}
-func newSubscriptionID() (string, error) {
- var subid [16]byte
- n, _ := rand.Read(subid[:])
- if n != 16 {
- return "", errors.New("Unable to generate subscription id")
+// idGenerator helper utility that generates a (pseudo) random sequence of
+// bytes that are used to generate identifiers.
+func idGenerator() *rand.Rand {
+ if seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)); err == nil {
+ return rand.New(rand.NewSource(seed))
+ }
+ return rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
+}
+
+// NewID generates a identifier that can be used as an identifier in the RPC interface.
+// e.g. filter and subscription identifier.
+func NewID() ID {
+ subscriptionIDGenMu.Lock()
+ defer subscriptionIDGenMu.Unlock()
+
+ id := make([]byte, 16)
+ for i := 0; i < len(id); i += 7 {
+ val := subscriptionIDGen.Int63()
+ for j := 0; i+j < len(id) && j < 7; j++ {
+ id[i+j] = byte(val)
+ val >>= 8
+ }
}
- return "0x" + hex.EncodeToString(subid[:]), nil
+ return ID("0x" + hex.EncodeToString(id))
}