[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PULL 09/32] python/aqmp: add AsyncProtocol.accept() method
From: |
John Snow |
Subject: |
[PULL 09/32] python/aqmp: add AsyncProtocol.accept() method |
Date: |
Mon, 27 Sep 2021 15:24:50 -0400 |
It's a little messier than connect, because it wasn't designed to accept
*precisely one* connection. Such is life.
Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Message-id: 20210915162955.333025-10-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
---
python/qemu/aqmp/protocol.py | 89 ++++++++++++++++++++++++++++++++++--
1 file changed, 85 insertions(+), 4 deletions(-)
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 1dfd12895dc..62c26ede5a4 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -243,6 +243,24 @@ async def runstate_changed(self) -> Runstate:
await self._runstate_event.wait()
return self.runstate
+ @upper_half
+ @require(Runstate.IDLE)
+ async def accept(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Accept a connection and begin processing message queues.
+
+ If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+ :param address:
+ Address to listen to; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise StateError: When the `Runstate` is not `IDLE`.
+ :raise ConnectError: If a connection could not be accepted.
+ """
+ await self._new_session(address, ssl, accept=True)
+
@upper_half
@require(Runstate.IDLE)
async def connect(self, address: Union[str, Tuple[str, int]],
@@ -308,7 +326,8 @@ def _set_state(self, state: Runstate) -> None:
@upper_half
async def _new_session(self,
address: Union[str, Tuple[str, int]],
- ssl: Optional[SSLContext] = None) -> None:
+ ssl: Optional[SSLContext] = None,
+ accept: bool = False) -> None:
"""
Establish a new connection and initialize the session.
@@ -317,9 +336,10 @@ async def _new_session(self,
to be set back to `IDLE`.
:param address:
- Address to connect to;
+ Address to connect to/listen on;
UNIX socket path or TCP address/port.
:param ssl: SSL context to use, if any.
+ :param accept: Accept a connection instead of connecting when `True`.
:raise ConnectError:
When a connection or session cannot be established.
@@ -333,7 +353,7 @@ async def _new_session(self,
try:
phase = "connection"
- await self._establish_connection(address, ssl)
+ await self._establish_connection(address, ssl, accept)
phase = "session"
await self._establish_session()
@@ -367,6 +387,7 @@ async def _establish_connection(
self,
address: Union[str, Tuple[str, int]],
ssl: Optional[SSLContext] = None,
+ accept: bool = False
) -> None:
"""
Establish a new connection.
@@ -375,6 +396,7 @@ async def _establish_connection(
Address to connect to/listen on;
UNIX socket path or TCP address/port.
:param ssl: SSL context to use, if any.
+ :param accept: Accept a connection instead of connecting when `True`.
"""
assert self.runstate == Runstate.IDLE
self._set_state(Runstate.CONNECTING)
@@ -384,7 +406,66 @@ async def _establish_connection(
# otherwise yield.
await asyncio.sleep(0)
- await self._do_connect(address, ssl)
+ if accept:
+ await self._do_accept(address, ssl)
+ else:
+ await self._do_connect(address, ssl)
+
+ @upper_half
+ async def _do_accept(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Acting as the transport server, accept a single connection.
+
+ :param address:
+ Address to listen on; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise OSError: For stream-related errors.
+ """
+ self.logger.debug("Awaiting connection on %s ...", address)
+ connected = asyncio.Event()
+ server: Optional[asyncio.AbstractServer] = None
+
+ async def _client_connected_cb(reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter) -> None:
+ """Used to accept a single incoming connection, see below."""
+ nonlocal server
+ nonlocal connected
+
+ # A connection has been accepted; stop listening for new ones.
+ assert server is not None
+ server.close()
+ await server.wait_closed()
+ server = None
+
+ # Register this client as being connected
+ self._reader, self._writer = (reader, writer)
+
+ # Signal back: We've accepted a client!
+ connected.set()
+
+ if isinstance(address, tuple):
+ coro = asyncio.start_server(
+ _client_connected_cb,
+ host=address[0],
+ port=address[1],
+ ssl=ssl,
+ backlog=1,
+ )
+ else:
+ coro = asyncio.start_unix_server(
+ _client_connected_cb,
+ path=address,
+ ssl=ssl,
+ backlog=1,
+ )
+
+ server = await coro # Starts listening
+ await connected.wait() # Waits for the callback to fire (and finish)
+ assert server is None
+
+ self.logger.debug("Connection accepted.")
@upper_half
async def _do_connect(self, address: Union[str, Tuple[str, int]],
--
2.31.1
- [PULL 14/32] python/aqmp: add well-known QMP object models, (continued)
- [PULL 14/32] python/aqmp: add well-known QMP object models, John Snow, 2021/09/27
- [PULL 15/32] python/aqmp: add QMP event support, John Snow, 2021/09/27
- [PULL 16/32] python/pylint: disable too-many-function-args, John Snow, 2021/09/27
- [PULL 17/32] python/aqmp: add QMP protocol support, John Snow, 2021/09/27
- [PULL 18/32] python/pylint: disable no-member check, John Snow, 2021/09/27
- [PULL 19/32] python/aqmp: Add message routing to QMP protocol, John Snow, 2021/09/27
- [PULL 21/32] python/aqmp: add _raw() execution interface, John Snow, 2021/09/27
- [PULL 20/32] python/aqmp: add execute() interfaces, John Snow, 2021/09/27
- [PULL 22/32] python/aqmp: add asyncio_run compatibility wrapper, John Snow, 2021/09/27
- [PULL 23/32] python/aqmp: add scary message, John Snow, 2021/09/27
- [PULL 09/32] python/aqmp: add AsyncProtocol.accept() method,
John Snow <=
- [PULL 24/32] python: bump avocado to v90.0, John Snow, 2021/09/27
- [PULL 25/32] python/aqmp: add AsyncProtocol unit tests, John Snow, 2021/09/27
- [PULL 26/32] python/aqmp: add LineProtocol tests, John Snow, 2021/09/27
- [PULL 27/32] python/aqmp: Add Coverage.py support, John Snow, 2021/09/27
- [PULL 28/32] python: Add dependencies for AQMP TUI, John Snow, 2021/09/27
- [PULL 29/32] python/aqmp-tui: Add AQMP TUI, John Snow, 2021/09/27
- [PULL 30/32] python: Add entry point for aqmp-tui, John Snow, 2021/09/27
- [PULL 31/32] python: add optional pygments dependency, John Snow, 2021/09/27
- [PULL 32/32] python/aqmp-tui: Add syntax highlighting, John Snow, 2021/09/27
- Re: [PULL 00/32] Python patches, Peter Maydell, 2021/09/28