qemu.aqmp.protocol module¶
Generic Asynchronous Message-based Protocol Support
This module provides a generic framework for sending and receiving
messages over an asyncio stream. AsyncProtocol
is an abstract class
that implements the core mechanisms of a simple send/receive protocol,
and is designed to be extended.
In this package, it is used as the implementation for the QMPClient
class.
- class qemu.aqmp.protocol.Runstate(value)[source]¶
Bases:
enum.Enum
Protocol session runstate.
- IDLE = 0¶
Fully quiesced and disconnected.
- CONNECTING = 1¶
In the process of connecting or establishing a session.
- RUNNING = 2¶
Fully connected and active session.
- DISCONNECTING = 3¶
In the process of disconnecting. Runstate may be returned to
IDLE
by callingdisconnect()
.
- exception qemu.aqmp.protocol.ConnectError(error_message: str, exc: Exception)[source]¶
Bases:
qemu.aqmp.error.AQMPError
Raised when the initial connection process has failed.
This Exception always wraps a “root cause” exception that can be interrogated for additional information.
- Parameters
error_message – Human-readable string describing the error.
exc – The root-cause exception.
- exception qemu.aqmp.protocol.StateError(error_message: str, state: qemu.aqmp.protocol.Runstate, required: qemu.aqmp.protocol.Runstate)[source]¶
Bases:
qemu.aqmp.error.AQMPError
An API command (connect, execute, etc) was issued at an inappropriate time.
This error is raised when a command like
connect()
is issued at an inappropriate time.
- qemu.aqmp.protocol.require(required_state: qemu.aqmp.protocol.Runstate) → Callable[[qemu.aqmp.protocol.F], qemu.aqmp.protocol.F][source]¶
Decorator: protect a method so it can only be run in a certain
Runstate
.- Parameters
required_state – The
Runstate
required to invoke this method.- Raises
StateError – When the required
Runstate
is not met.
- class qemu.aqmp.protocol.AsyncProtocol(name: Optional[str] = None)[source]¶
Bases:
Generic
[qemu.aqmp.protocol.T
]AsyncProtocol implements a generic async message-based protocol.
This protocol assumes the basic unit of information transfer between client and server is a “message”, the details of which are left up to the implementation. It assumes the sending and receiving of these messages is full-duplex and not necessarily correlated; i.e. it supports asynchronous inbound messages.
It is designed to be extended by a specific protocol which provides the implementations for how to read and send messages. These must be defined in
_do_recv()
and_do_send()
, respectively.Other callbacks have a default implementation, but are intended to be either extended or overridden:
_establish_session
:The base implementation starts the reader/writer tasks. A protocol implementation can override this call, inserting actions to be taken prior to starting the reader/writer tasks before the super() call; actions needing to occur afterwards can be written after the super() call.
_on_message
:Actions to be performed when a message is received.
_cb_outbound
:Logging/Filtering hook for all outbound messages.
_cb_inbound
:Logging/Filtering hook for all inbound messages. This hook runs before
_on_message()
.
- Parameters
name – Name used for logging messages, if any. By default, messages will log to ‘qemu.aqmp.protocol’, but each individual connection can be given its own logger by giving it a name; messages will then log to ‘qemu.aqmp.protocol.${name}’.
- _limit = 65536¶
- logger = <Logger qemu.aqmp.protocol (WARNING)>¶
Logger object for debugging messages from this connection.
- _dc_task: Optional[_asyncio.Future]¶
Disconnect task. The disconnect implementation runs in a task so that asynchronous disconnects (initiated by the reader/writer) are allowed to wait for the reader/writers to exit.
- async runstate_changed() → qemu.aqmp.protocol.Runstate[source]¶
Wait for the
runstate
to change, then return that runstate.
- accept(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) → None[source]¶
Accept a connection and begin processing message queues.
If this call fails,
runstate
is guaranteed to be set back toIDLE
.- Parameters
address – Address to listen to; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises
StateError – When the
Runstate
is notIDLE
.ConnectError – If a connection could not be accepted.
- connect(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) → None[source]¶
Connect to the server and begin processing message queues.
If this call fails,
runstate
is guaranteed to be set back toIDLE
.- Parameters
address – Address to connect to; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises
StateError – When the
Runstate
is notIDLE
.ConnectError – If a connection cannot be made to the server.
- async disconnect() → None[source]¶
Disconnect and wait for all tasks to fully stop.
If there was an exception that caused the reader/writers to terminate prematurely, it will be raised here.
- Raises
Exception – When the reader or writer terminate unexpectedly.
- property _runstate_event: asyncio.locks.Event¶
- _set_state(state: qemu.aqmp.protocol.Runstate) → None[source]¶
Change the
Runstate
of the protocol connection.Signals the
runstate_changed
event.
- async _new_session(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None, accept: bool = False) → None[source]¶
Establish a new connection and initialize the session.
Connect or accept a new connection, then begin the protocol session machinery. If this call fails,
runstate
is guaranteed to be set back toIDLE
.- Parameters
address – Address to connect to/listen on; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
accept – Accept a connection instead of connecting when
True
.
- Raises
When a connection or session cannot be established.
This exception will wrap a more concrete one. In most cases, the wrapped exception will be
OSError
orEOFError
. If a protocol-level failure occurs while establishing a new session, the wrapped error may also be anAQMPError
.
- async _establish_connection(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None, accept: bool = False) → None[source]¶
Establish a new connection.
- Parameters
address – Address to connect to/listen on; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
accept – Accept a connection instead of connecting when
True
.
- async _do_accept(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) → None[source]¶
Acting as the transport server, accept a single connection.
- Parameters
address – Address to listen on; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises
OSError – For stream-related errors.
- async _do_connect(address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) → None[source]¶
Acting as the transport client, initiate a connection to a server.
- Parameters
address – Address to connect to; UNIX socket path or TCP address/port.
ssl – SSL context to use, if any.
- Raises
OSError – For stream-related errors.
- async _establish_session() → None[source]¶
Establish a new session.
Starts the readers/writer tasks; subclasses may perform their own negotiations here. The Runstate will be RUNNING upon successful conclusion.
- _schedule_disconnect() → None[source]¶
Initiate a disconnect; idempotent.
This method is used both in the upper-half as a direct consequence of
disconnect()
, and in the bottom-half in the case of unhandled exceptions in the reader/writer tasks.It can be invoked no matter what the
runstate
is.
- async _wait_disconnect() → None[source]¶
Waits for a previously scheduled disconnect to finish.
This method will gather any bottom half exceptions and re-raise the one that occurred first; presuming it to be the root cause of any subsequent Exceptions. It is intended to be used in the upper half of the call chain.
- Raises
Exception – Arbitrary exception re-raised on behalf of the reader/writer.
- async _bh_disconnect() → None[source]¶
Disconnect and cancel all outstanding tasks.
It is designed to be called from its task context,
_dc_task
. By running in its own task, it is free to wait on any pending actions that may still need to occur in either the reader or writer tasks.
- async _bh_loop_forever(async_fn: Callable[], Awaitable[None]], name: str) → None[source]¶
Run one of the bottom-half methods in a loop forever.
If the bottom half ever raises any exception, schedule a disconnect that will terminate the entire loop.
- Parameters
async_fn – The bottom-half method to run in a loop.
name – The name of this task, used for logging.
- async _bh_send_message() → None[source]¶
Wait for an outgoing message, then send it.
Designed to be run in
_bh_loop_forever()
.
- async _bh_recv_message() → None[source]¶
Wait for an incoming message and call
_on_message
to route it.Designed to be run in
_bh_loop_forever()
.
- _cb_outbound(msg: qemu.aqmp.protocol.T) → qemu.aqmp.protocol.T[source]¶
Callback: outbound message hook.
This is intended for subclasses to be able to add arbitrary hooks to filter or manipulate outgoing messages. The base implementation does nothing but log the message without any manipulation of the message.
- Parameters
msg – raw outbound message
- Returns
final outbound message
- _cb_inbound(msg: qemu.aqmp.protocol.T) → qemu.aqmp.protocol.T[source]¶
Callback: inbound message hook.
This is intended for subclasses to be able to add arbitrary hooks to filter or manipulate incoming messages. The base implementation does nothing but log the message without any manipulation of the message.
This method does not “handle” incoming messages; it is a filter. The actual “endpoint” for incoming messages is
_on_message()
.- Parameters
msg – raw inbound message
- Returns
processed inbound message
- async _readline() → bytes[source]¶
Wait for a newline from the incoming reader.
This method is provided as a convenience for upper-layer protocols, as many are line-based.
This method may return a sequence of bytes without a trailing newline if EOF occurs, but some bytes were received. In this case, the next call will raise
EOFError
. It is assumed that the layer 5 protocol will decide if there is anything meaningful to be done with a partial message.
- async _do_recv() → qemu.aqmp.protocol.T[source]¶
Abstract: Read from the stream and return a message.
Very low-level; intended to only be called by
_recv()
.
- async _recv() → qemu.aqmp.protocol.T[source]¶
Read an arbitrary protocol message.
Warning
This method is intended primarily for
_bh_recv_message()
to use in an asynchronous task loop. Using it outside of this loop will “steal” messages from the normal routing mechanism. It is safe to use prior to_establish_session()
, but should not be used otherwise.This method uses
_do_recv()
to retrieve the raw message, and then transforms it using_cb_inbound()
.- Returns
A single (filtered, processed) protocol message.
- _do_send(msg: qemu.aqmp.protocol.T) → None[source]¶
Abstract: Write a message to the stream.
Very low-level; intended to only be called by
_send()
.
- async _send(msg: qemu.aqmp.protocol.T) → None[source]¶
Send an arbitrary protocol message.
This method will transform any outgoing messages according to
_cb_outbound()
.Warning
Like
_recv()
, this method is intended to be called by the writer task loop that processes outgoing messages. Calling it directly may circumvent logic implemented by the caller meant to correlate outgoing and incoming messages.- Raises
OSError – For problems with the underlying stream.
- async _on_message(msg: qemu.aqmp.protocol.T) → None[source]¶
Called to handle the receipt of a new message.
Caution
This is executed from within the reader loop, so be advised that waiting on either the reader or writer task will lead to deadlock. Additionally, any unhandled exceptions will directly cause the loop to halt, so logic may be best-kept to a minimum if at all possible.
- Parameters
msg – The incoming message, already logged/filtered.