qemu-img: add support for rate limit in qemu-img commit
[qemu.git] / python / qemu / console_socket.py
1 """
2 QEMU Console Socket Module:
3
4 This python module implements a ConsoleSocket object,
5 which can drain a socket and optionally dump the bytes to file.
6 """
7 # Copyright 2020 Linaro
8 #
9 # Authors:
10 # Robert Foley <robert.foley@linaro.org>
11 #
12 # This code is licensed under the GPL version 2 or later. See
13 # the COPYING file in the top-level directory.
14 #
15
16 from collections import deque
17 import socket
18 import threading
19 import time
20 from typing import Deque, Optional
21
22
23 class ConsoleSocket(socket.socket):
24 """
25 ConsoleSocket represents a socket attached to a char device.
26
27 Optionally (if drain==True), drains the socket and places the bytes
28 into an in memory buffer for later processing.
29
30 Optionally a file path can be passed in and we will also
31 dump the characters to this file for debugging purposes.
32 """
33 def __init__(self, address: str, file: Optional[str] = None,
34 drain: bool = False):
35 self._recv_timeout_sec = 300.0
36 self._sleep_time = 0.5
37 self._buffer: Deque[int] = deque()
38 socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
39 self.connect(address)
40 self._logfile = None
41 if file:
42 self._logfile = open(file, "bw")
43 self._open = True
44 self._drain_thread = None
45 if drain:
46 self._drain_thread = self._thread_start()
47
48 def _drain_fn(self) -> None:
49 """Drains the socket and runs while the socket is open."""
50 while self._open:
51 try:
52 self._drain_socket()
53 except socket.timeout:
54 # The socket is expected to timeout since we set a
55 # short timeout to allow the thread to exit when
56 # self._open is set to False.
57 time.sleep(self._sleep_time)
58
59 def _thread_start(self) -> threading.Thread:
60 """Kick off a thread to drain the socket."""
61 # Configure socket to not block and timeout.
62 # This allows our drain thread to not block
63 # on recieve and exit smoothly.
64 socket.socket.setblocking(self, False)
65 socket.socket.settimeout(self, 1)
66 drain_thread = threading.Thread(target=self._drain_fn)
67 drain_thread.daemon = True
68 drain_thread.start()
69 return drain_thread
70
71 def close(self) -> None:
72 """Close the base object and wait for the thread to terminate"""
73 if self._open:
74 self._open = False
75 if self._drain_thread is not None:
76 thread, self._drain_thread = self._drain_thread, None
77 thread.join()
78 socket.socket.close(self)
79 if self._logfile:
80 self._logfile.close()
81 self._logfile = None
82
83 def _drain_socket(self) -> None:
84 """process arriving characters into in memory _buffer"""
85 data = socket.socket.recv(self, 1)
86 if self._logfile:
87 self._logfile.write(data)
88 self._logfile.flush()
89 self._buffer.extend(data)
90
91 def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
92 """Return chars from in memory buffer.
93 Maintains the same API as socket.socket.recv.
94 """
95 if self._drain_thread is None:
96 # Not buffering the socket, pass thru to socket.
97 return socket.socket.recv(self, bufsize, flags)
98 assert not flags, "Cannot pass flags to recv() in drained mode"
99 start_time = time.time()
100 while len(self._buffer) < bufsize:
101 time.sleep(self._sleep_time)
102 elapsed_sec = time.time() - start_time
103 if elapsed_sec > self._recv_timeout_sec:
104 raise socket.timeout
105 return bytes((self._buffer.popleft() for i in range(bufsize)))
106
107 def setblocking(self, value: bool) -> None:
108 """When not draining we pass thru to the socket,
109 since when draining we control socket blocking.
110 """
111 if self._drain_thread is None:
112 socket.socket.setblocking(self, value)
113
114 def settimeout(self, value: Optional[float]) -> None:
115 """When not draining we pass thru to the socket,
116 since when draining we control the timeout.
117 """
118 if value is not None:
119 self._recv_timeout_sec = value
120 if self._drain_thread is None:
121 socket.socket.settimeout(self, value)