Server

An RPC server exposes a number of endpoints, each of which contain a set of methods which may be invoked remotely by clients over a given transport.

To create an RPC server, you supply a transport, target and a list of endpoints.

A transport can be obtained simply by calling the get_transport() method:

transport = messaging.get_transport(conf)

which will load the appropriate transport driver according to the user’s messaging configuration. See get_transport() for more details.

The target supplied when creating an RPC server expresses the topic, server name and - optionally - the exchange to listen on. See Target for more details on these attributes.

Each endpoint object may have a target attribute which may have namespace and version fields set. By default, we use the ‘null namespace’ and version 1.0. Incoming method calls will be dispatched to the first endpoint with the requested method, a matching namespace and a compatible version number.

RPC servers have start(), stop() and wait() messages to begin handling requests, stop handling requests and wait for all in-process requests to complete.

A simple example of an RPC server with multiple endpoints might be:

from oslo_config import cfg
import oslo_messaging
import time

class ServerControlEndpoint(object):

    target = oslo_messaging.Target(namespace='control',
                                   version='2.0')

    def __init__(self, server):
        self.server = server

    def stop(self, ctx):
        if self.server:
            self.server.stop()

class TestEndpoint(object):

    def test(self, ctx, arg):
        return arg

transport = oslo_messaging.get_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
    ServerControlEndpoint(None),
    TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
                                       executor='blocking')
try:
    server.start()
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping server")

server.stop()
server.wait()

Clients can invoke methods on the server by sending the request to a topic and it gets sent to one of the servers listening on the topic, or by sending the request to a specific server listening on the topic, or by sending the request to all servers listening on the topic (known as fanout). These modes are chosen via the server and fanout attributes on Target but the mode used is transparent to the server.

The first parameter to method invocations is always the request context supplied by the client.

Parameters to the method invocation are primitive types and so must be the return values from the methods. By supplying a serializer object, a server can deserialize a request context and arguments from - and serialize return values to - primitive types.

oslo_messaging.get_rpc_server(transport, target, endpoints, executor='blocking', serializer=None)

Construct an RPC server.

The executor parameter controls how incoming messages will be received and dispatched. By default, the most simple executor is used - the blocking executor.

If the eventlet executor is used, the threading and time library need to be monkeypatched.

Parameters:
  • transport (Transport) – the messaging transport
  • target (Target) – the exchange, topic and server to listen on
  • endpoints (list) – a list of endpoint objects
  • executor (str) – name of a message executor - for example ‘eventlet’, ‘blocking’
  • serializer (Serializer) – an optional entity serializer
class oslo_messaging.RPCDispatcher(target, endpoints, serializer)

A message dispatcher which understands RPC messages.

A MessageHandlingServer is constructed by passing a callable dispatcher which is invoked with context and message dictionaries each time a message is received.

RPCDispatcher is one such dispatcher which understands the format of RPC messages. The dispatcher looks at the namespace, version and method values in the message and matches those against a list of available endpoints.

Endpoints may have a target attribute describing the namespace and version of the methods exposed by that object. All public methods on an endpoint object are remotely invokable by clients.

class oslo_messaging.MessageHandlingServer(transport, dispatcher, executor='blocking')

Server for handling messages.

Connect a transport to a dispatcher that knows how to process the message using an executor that knows how the app wants to create new tasks.

reset()

Reset service.

Called in case service running in daemon mode receives SIGHUP.

start(*args, **kwargs)

Start handling incoming messages.

This method causes the server to begin polling the transport for incoming messages and passing them to the dispatcher. Message processing will continue until the stop() method is called.

The executor controls how the server integrates with the applications I/O handling strategy - it may choose to poll for messages in a new process, thread or co-operatively scheduled coroutine or simply by registering a callback with an event loop. Similarly, the executor may choose to dispatch messages in a new thread, coroutine or simply the current thread.

stop(*args, **kwargs)

Stop handling incoming messages.

Once this method returns, no new incoming messages will be handled by the server. However, the server may still be in the process of handling some messages, and underlying driver resources associated to this server are still in use. See ‘wait’ for more details.

wait(*args, **kwargs)

Wait for message processing to complete.

After calling stop(), there may still be some existing messages which have not been completely processed. The wait() method blocks until all message processing has completed.

Once it’s finished, the underlying driver resources associated to this server are released (like closing useless network connections).

oslo_messaging.expected_exceptions(*exceptions)

Decorator for RPC endpoint methods that raise expected exceptions.

Marking an endpoint method with this decorator allows the declaration of expected exceptions that the RPC server should not consider fatal, and not log as if they were generated in a real error scenario.

Note that this will cause listed exceptions to be wrapped in an ExpectedException, which is used internally by the RPC sever. The RPC client will see the original exception type.

exception oslo_messaging.ExpectedException

Encapsulates an expected exception raised by an RPC endpoint

Merely instantiating this exception records the current exception information, which will be passed back to the RPC client without exceptional logging.

oslo_messaging.get_local_context(*args, **kwargs)

Retrieve the RPC endpoint request context for the current thread.

This method allows any code running in the context of a dispatched RPC endpoint method to retrieve the context for this request.

This is commonly used for logging so that, for example, you can include the request ID, user and tenant in every message logged from a RPC endpoint method.

Returns:the context for the request dispatched in the current thread

Previous topic

Target

Next topic

RPC Client

Project Source

This Page