aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/server.go')
-rw-r--r--rpc/server.go60
1 files changed, 46 insertions, 14 deletions
diff --git a/rpc/server.go b/rpc/server.go
index f42ee2d37..22448f8e3 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -117,14 +117,12 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return nil
}
-// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
-// response back using the given codec. It will block until the codec is closed.
-//
-// This server will:
-// 1. allow for asynchronous and parallel request execution
-// 2. supports notifications (pub/sub)
-// 3. supports request batches
-func (s *Server) ServeCodec(codec ServerCodec) {
+// serveRequest will reads requests from the codec, calls the RPC callback and
+// writes the response to the given codec.
+// If singleShot is true it will process a single request, otherwise it will handle
+// requests until the codec returns an error when reading a request (in most cases
+// an EOF). It executes requests in parallel when singleShot is false.
+func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
@@ -132,7 +130,12 @@ func (s *Server) ServeCodec(codec ServerCodec) {
buf = buf[:runtime.Stack(buf, false)]
glog.Errorln(string(buf))
}
- codec.Close()
+
+ s.codecsMu.Lock()
+ s.codecs.Remove(codec)
+ s.codecsMu.Unlock()
+
+ return
}()
ctx, cancel := context.WithCancel(context.Background())
@@ -141,20 +144,22 @@ func (s *Server) ServeCodec(codec ServerCodec) {
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
s.codecsMu.Unlock()
- return
+ return &shutdownError{}
}
s.codecs.Add(codec)
s.codecsMu.Unlock()
+ // test if the server is ordered to stop
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
-
if err != nil {
glog.V(logger.Debug).Infof("%v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
- break
+ return nil
}
+ // check if server is ordered to shutdown and return an error
+ // telling the client that his request failed.
if atomic.LoadInt32(&s.run) != 1 {
err = &shutdownError{}
if batch {
@@ -166,15 +171,42 @@ func (s *Server) ServeCodec(codec ServerCodec) {
} else {
codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
}
- break
+ return nil
}
- if batch {
+ if singleShot && batch {
+ s.execBatch(ctx, codec, reqs)
+ return nil
+ } else if singleShot && !batch {
+ s.exec(ctx, codec, reqs[0])
+ return nil
+ } else if !singleShot && batch {
go s.execBatch(ctx, codec, reqs)
} else {
go s.exec(ctx, codec, reqs[0])
}
}
+
+ return nil
+}
+
+// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
+// response back using the given codec. It will block until the codec is closed or the server is
+// stopped. In either case the codec is closed.
+//
+// This server will:
+// 1. allow for asynchronous and parallel request execution
+// 2. supports notifications (pub/sub)
+// 3. supports request batches
+func (s *Server) ServeCodec(codec ServerCodec) {
+ defer codec.Close()
+ s.serveRequest(codec, false)
+}
+
+// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not
+// close the codec unless a non-recoverable error has occurred.
+func (s *Server) ServeSingleRequest(codec ServerCodec) {
+ s.serveRequest(codec, true)
}
// Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish,