iotests: Allow supported and unsupported formats at the same time
[qemu.git] / tests / qemu-iotests / iotests.py
1 # Common utilities and Python wrappers for qemu-iotests
2 #
3 # Copyright (C) 2012 IBM Corp.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 import time
32 from typing import (Any, Callable, Dict, Iterable,
33 List, Optional, Sequence, Tuple, TypeVar)
34 import unittest
35
36 from contextlib import contextmanager
37
38 # pylint: disable=import-error, wrong-import-position
39 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40 from qemu import qtest
41 from qemu.qmp import QMPMessage
42
43 # Use this logger for logging messages directly from the iotests module
44 logger = logging.getLogger('qemu.iotests')
45 logger.addHandler(logging.NullHandler())
46
47 # Use this logger for messages that ought to be used for diff output.
48 test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51 faulthandler.enable()
52
53 # This will not work if arguments contain spaces but is necessary if we
54 # want to support the override options that ./check supports.
55 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56 if os.environ.get('QEMU_IMG_OPTIONS'):
57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60 if os.environ.get('QEMU_IO_OPTIONS'):
61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65 qemu_io_args_no_fmt += \
66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69 qemu_nbd_args = [qemu_nbd_prog]
70 if os.environ.get('QEMU_NBD_OPTIONS'):
71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76 imgfmt = os.environ.get('IMGFMT', 'raw')
77 imgproto = os.environ.get('IMGPROTO', 'file')
78 test_dir = os.environ.get('TEST_DIR')
79 sock_dir = os.environ.get('SOCK_DIR')
80 output_dir = os.environ.get('OUTPUT_DIR', '.')
81 cachemode = os.environ.get('CACHEMODE')
82 aiomode = os.environ.get('AIOMODE')
83 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
84
85 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
86
87 luks_default_secret_object = 'secret,id=keysec0,data=' + \
88 os.environ.get('IMGKEYSECRET', '')
89 luks_default_key_secret_opt = 'key-secret=keysec0'
90
91
92 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
93 connect_stderr: bool = True) -> Tuple[str, int]:
94 """
95 Run a tool and return both its output and its exit code
96 """
97 stderr = subprocess.STDOUT if connect_stderr else None
98 subp = subprocess.Popen(args,
99 stdout=subprocess.PIPE,
100 stderr=stderr,
101 universal_newlines=True)
102 output = subp.communicate()[0]
103 if subp.returncode < 0:
104 sys.stderr.write('%s received signal %i: %s\n'
105 % (tool, -subp.returncode,
106 ' '.join(qemu_img_args + list(args))))
107 return (output, subp.returncode)
108
109 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
110 """
111 Run qemu-img and return both its output and its exit code
112 """
113 full_args = qemu_img_args + list(args)
114 return qemu_tool_pipe_and_status('qemu-img', full_args)
115
116 def qemu_img(*args: str) -> int:
117 '''Run qemu-img and return the exit code'''
118 return qemu_img_pipe_and_status(*args)[1]
119
120 def ordered_qmp(qmsg, conv_keys=True):
121 # Dictionaries are not ordered prior to 3.6, therefore:
122 if isinstance(qmsg, list):
123 return [ordered_qmp(atom) for atom in qmsg]
124 if isinstance(qmsg, dict):
125 od = OrderedDict()
126 for k, v in sorted(qmsg.items()):
127 if conv_keys:
128 k = k.replace('_', '-')
129 od[k] = ordered_qmp(v, conv_keys=False)
130 return od
131 return qmsg
132
133 def qemu_img_create(*args):
134 args = list(args)
135
136 # default luks support
137 if '-f' in args and args[args.index('-f') + 1] == 'luks':
138 if '-o' in args:
139 i = args.index('-o')
140 if 'key-secret' not in args[i + 1]:
141 args[i + 1].append(luks_default_key_secret_opt)
142 args.insert(i + 2, '--object')
143 args.insert(i + 3, luks_default_secret_object)
144 else:
145 args = ['-o', luks_default_key_secret_opt,
146 '--object', luks_default_secret_object] + args
147
148 args.insert(0, 'create')
149
150 return qemu_img(*args)
151
152 def qemu_img_measure(*args):
153 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
154
155 def qemu_img_check(*args):
156 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
157
158 def qemu_img_verbose(*args):
159 '''Run qemu-img without suppressing its output and return the exit code'''
160 exitcode = subprocess.call(qemu_img_args + list(args))
161 if exitcode < 0:
162 sys.stderr.write('qemu-img received signal %i: %s\n'
163 % (-exitcode, ' '.join(qemu_img_args + list(args))))
164 return exitcode
165
166 def qemu_img_pipe(*args: str) -> str:
167 '''Run qemu-img and return its output'''
168 return qemu_img_pipe_and_status(*args)[0]
169
170 def qemu_img_log(*args):
171 result = qemu_img_pipe(*args)
172 log(result, filters=[filter_testfiles])
173 return result
174
175 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
176 args = ['info']
177 if imgopts:
178 args.append('--image-opts')
179 else:
180 args += ['-f', imgfmt]
181 args += extra_args
182 args.append(filename)
183
184 output = qemu_img_pipe(*args)
185 if not filter_path:
186 filter_path = filename
187 log(filter_img_info(output, filter_path))
188
189 def qemu_io(*args):
190 '''Run qemu-io and return the stdout data'''
191 args = qemu_io_args + list(args)
192 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
193 stderr=subprocess.STDOUT,
194 universal_newlines=True)
195 output = subp.communicate()[0]
196 if subp.returncode < 0:
197 sys.stderr.write('qemu-io received signal %i: %s\n'
198 % (-subp.returncode, ' '.join(args)))
199 return output
200
201 def qemu_io_log(*args):
202 result = qemu_io(*args)
203 log(result, filters=[filter_testfiles, filter_qemu_io])
204 return result
205
206 def qemu_io_silent(*args):
207 '''Run qemu-io and return the exit code, suppressing stdout'''
208 args = qemu_io_args + list(args)
209 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
210 if exitcode < 0:
211 sys.stderr.write('qemu-io received signal %i: %s\n' %
212 (-exitcode, ' '.join(args)))
213 return exitcode
214
215 def qemu_io_silent_check(*args):
216 '''Run qemu-io and return the true if subprocess returned 0'''
217 args = qemu_io_args + list(args)
218 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
219 stderr=subprocess.STDOUT)
220 return exitcode == 0
221
222 def get_virtio_scsi_device():
223 if qemu_default_machine == 's390-ccw-virtio':
224 return 'virtio-scsi-ccw'
225 return 'virtio-scsi-pci'
226
227 class QemuIoInteractive:
228 def __init__(self, *args):
229 self.args = qemu_io_args_no_fmt + list(args)
230 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
231 stdout=subprocess.PIPE,
232 stderr=subprocess.STDOUT,
233 universal_newlines=True)
234 out = self._p.stdout.read(9)
235 if out != 'qemu-io> ':
236 # Most probably qemu-io just failed to start.
237 # Let's collect the whole output and exit.
238 out += self._p.stdout.read()
239 self._p.wait(timeout=1)
240 raise ValueError(out)
241
242 def close(self):
243 self._p.communicate('q\n')
244
245 def _read_output(self):
246 pattern = 'qemu-io> '
247 n = len(pattern)
248 pos = 0
249 s = []
250 while pos != n:
251 c = self._p.stdout.read(1)
252 # check unexpected EOF
253 assert c != ''
254 s.append(c)
255 if c == pattern[pos]:
256 pos += 1
257 else:
258 pos = 0
259
260 return ''.join(s[:-n])
261
262 def cmd(self, cmd):
263 # quit command is in close(), '\n' is added automatically
264 assert '\n' not in cmd
265 cmd = cmd.strip()
266 assert cmd not in ('q', 'quit')
267 self._p.stdin.write(cmd + '\n')
268 self._p.stdin.flush()
269 return self._read_output()
270
271
272 def qemu_nbd(*args):
273 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
274 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
275
276 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
277 '''Run qemu-nbd in daemon mode and return both the parent's exit code
278 and its output in case of an error'''
279 full_args = qemu_nbd_args + ['--fork'] + list(args)
280 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
281 connect_stderr=False)
282 return returncode, output if returncode else ''
283
284 def qemu_nbd_list_log(*args: str) -> str:
285 '''Run qemu-nbd to list remote exports'''
286 full_args = [qemu_nbd_prog, '-L'] + list(args)
287 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
288 log(output, filters=[filter_testfiles, filter_nbd_exports])
289 return output
290
291 @contextmanager
292 def qemu_nbd_popen(*args):
293 '''Context manager running qemu-nbd within the context'''
294 pid_file = file_path("pid")
295
296 cmd = list(qemu_nbd_args)
297 cmd.extend(('--persistent', '--pid-file', pid_file))
298 cmd.extend(args)
299
300 log('Start NBD server')
301 p = subprocess.Popen(cmd)
302 try:
303 while not os.path.exists(pid_file):
304 if p.poll() is not None:
305 raise RuntimeError(
306 "qemu-nbd terminated with exit code {}: {}"
307 .format(p.returncode, ' '.join(cmd)))
308
309 time.sleep(0.01)
310 yield
311 finally:
312 log('Kill NBD server')
313 p.kill()
314 p.wait()
315
316 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
317 '''Return True if two image files are identical'''
318 return qemu_img('compare', '-f', fmt1,
319 '-F', fmt2, img1, img2) == 0
320
321 def create_image(name, size):
322 '''Create a fully-allocated raw image with sector markers'''
323 file = open(name, 'wb')
324 i = 0
325 while i < size:
326 sector = struct.pack('>l504xl', i // 512, i // 512)
327 file.write(sector)
328 i = i + 512
329 file.close()
330
331 def image_size(img):
332 '''Return image's virtual size'''
333 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
334 return json.loads(r)['virtual-size']
335
336 def is_str(val):
337 return isinstance(val, str)
338
339 test_dir_re = re.compile(r"%s" % test_dir)
340 def filter_test_dir(msg):
341 return test_dir_re.sub("TEST_DIR", msg)
342
343 win32_re = re.compile(r"\r")
344 def filter_win32(msg):
345 return win32_re.sub("", msg)
346
347 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
348 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
349 r"and [0-9\/.inf]* ops\/sec\)")
350 def filter_qemu_io(msg):
351 msg = filter_win32(msg)
352 return qemu_io_re.sub("X ops; XX:XX:XX.X "
353 "(XXX YYY/sec and XXX ops/sec)", msg)
354
355 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
356 def filter_chown(msg):
357 return chown_re.sub("chown UID:GID", msg)
358
359 def filter_qmp_event(event):
360 '''Filter a QMP event dict'''
361 event = dict(event)
362 if 'timestamp' in event:
363 event['timestamp']['seconds'] = 'SECS'
364 event['timestamp']['microseconds'] = 'USECS'
365 return event
366
367 def filter_qmp(qmsg, filter_fn):
368 '''Given a string filter, filter a QMP object's values.
369 filter_fn takes a (key, value) pair.'''
370 # Iterate through either lists or dicts;
371 if isinstance(qmsg, list):
372 items = enumerate(qmsg)
373 else:
374 items = qmsg.items()
375
376 for k, v in items:
377 if isinstance(v, (dict, list)):
378 qmsg[k] = filter_qmp(v, filter_fn)
379 else:
380 qmsg[k] = filter_fn(k, v)
381 return qmsg
382
383 def filter_testfiles(msg):
384 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
385 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
386 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
387
388 def filter_qmp_testfiles(qmsg):
389 def _filter(_key, value):
390 if is_str(value):
391 return filter_testfiles(value)
392 return value
393 return filter_qmp(qmsg, _filter)
394
395 def filter_generated_node_ids(msg):
396 return re.sub("#block[0-9]+", "NODE_NAME", msg)
397
398 def filter_img_info(output, filename):
399 lines = []
400 for line in output.split('\n'):
401 if 'disk size' in line or 'actual-size' in line:
402 continue
403 line = line.replace(filename, 'TEST_IMG')
404 line = filter_testfiles(line)
405 line = line.replace(imgfmt, 'IMGFMT')
406 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
407 line = re.sub('uuid: [-a-f0-9]+',
408 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
409 line)
410 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
411 lines.append(line)
412 return '\n'.join(lines)
413
414 def filter_imgfmt(msg):
415 return msg.replace(imgfmt, 'IMGFMT')
416
417 def filter_qmp_imgfmt(qmsg):
418 def _filter(_key, value):
419 if is_str(value):
420 return filter_imgfmt(value)
421 return value
422 return filter_qmp(qmsg, _filter)
423
424 def filter_nbd_exports(output: str) -> str:
425 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
426
427
428 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
429
430 def log(msg: Msg,
431 filters: Iterable[Callable[[Msg], Msg]] = (),
432 indent: Optional[int] = None) -> None:
433 """
434 Logs either a string message or a JSON serializable message (like QMP).
435 If indent is provided, JSON serializable messages are pretty-printed.
436 """
437 for flt in filters:
438 msg = flt(msg)
439 if isinstance(msg, (dict, list)):
440 # Don't sort if it's already sorted
441 do_sort = not isinstance(msg, OrderedDict)
442 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
443 else:
444 test_logger.info(msg)
445
446 class Timeout:
447 def __init__(self, seconds, errmsg="Timeout"):
448 self.seconds = seconds
449 self.errmsg = errmsg
450 def __enter__(self):
451 signal.signal(signal.SIGALRM, self.timeout)
452 signal.setitimer(signal.ITIMER_REAL, self.seconds)
453 return self
454 def __exit__(self, exc_type, value, traceback):
455 signal.setitimer(signal.ITIMER_REAL, 0)
456 return False
457 def timeout(self, signum, frame):
458 raise Exception(self.errmsg)
459
460 def file_pattern(name):
461 return "{0}-{1}".format(os.getpid(), name)
462
463 class FilePath:
464 """
465 Context manager generating multiple file names. The generated files are
466 removed when exiting the context.
467
468 Example usage:
469
470 with FilePath('a.img', 'b.img') as (img_a, img_b):
471 # Use img_a and img_b here...
472
473 # a.img and b.img are automatically removed here.
474
475 By default images are created in iotests.test_dir. To create sockets use
476 iotests.sock_dir:
477
478 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
479
480 For convenience, calling with one argument yields a single file instead of
481 a tuple with one item.
482
483 """
484 def __init__(self, *names, base_dir=test_dir):
485 self.paths = [os.path.join(base_dir, file_pattern(name))
486 for name in names]
487
488 def __enter__(self):
489 if len(self.paths) == 1:
490 return self.paths[0]
491 else:
492 return self.paths
493
494 def __exit__(self, exc_type, exc_val, exc_tb):
495 for path in self.paths:
496 try:
497 os.remove(path)
498 except OSError:
499 pass
500 return False
501
502
503 def file_path_remover():
504 for path in reversed(file_path_remover.paths):
505 try:
506 os.remove(path)
507 except OSError:
508 pass
509
510
511 def file_path(*names, base_dir=test_dir):
512 ''' Another way to get auto-generated filename that cleans itself up.
513
514 Use is as simple as:
515
516 img_a, img_b = file_path('a.img', 'b.img')
517 sock = file_path('socket')
518 '''
519
520 if not hasattr(file_path_remover, 'paths'):
521 file_path_remover.paths = []
522 atexit.register(file_path_remover)
523
524 paths = []
525 for name in names:
526 filename = file_pattern(name)
527 path = os.path.join(base_dir, filename)
528 file_path_remover.paths.append(path)
529 paths.append(path)
530
531 return paths[0] if len(paths) == 1 else paths
532
533 def remote_filename(path):
534 if imgproto == 'file':
535 return path
536 elif imgproto == 'ssh':
537 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
538 else:
539 raise Exception("Protocol %s not supported" % (imgproto))
540
541 class VM(qtest.QEMUQtestMachine):
542 '''A QEMU VM'''
543
544 def __init__(self, path_suffix=''):
545 name = "qemu%s-%d" % (path_suffix, os.getpid())
546 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
547 test_dir=test_dir,
548 socket_scm_helper=socket_scm_helper,
549 sock_dir=sock_dir)
550 self._num_drives = 0
551
552 def add_object(self, opts):
553 self._args.append('-object')
554 self._args.append(opts)
555 return self
556
557 def add_device(self, opts):
558 self._args.append('-device')
559 self._args.append(opts)
560 return self
561
562 def add_drive_raw(self, opts):
563 self._args.append('-drive')
564 self._args.append(opts)
565 return self
566
567 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
568 '''Add a virtio-blk drive to the VM'''
569 options = ['if=%s' % interface,
570 'id=drive%d' % self._num_drives]
571
572 if path is not None:
573 options.append('file=%s' % path)
574 options.append('format=%s' % img_format)
575 options.append('cache=%s' % cachemode)
576 options.append('aio=%s' % aiomode)
577
578 if opts:
579 options.append(opts)
580
581 if img_format == 'luks' and 'key-secret' not in opts:
582 # default luks support
583 if luks_default_secret_object not in self._args:
584 self.add_object(luks_default_secret_object)
585
586 options.append(luks_default_key_secret_opt)
587
588 self._args.append('-drive')
589 self._args.append(','.join(options))
590 self._num_drives += 1
591 return self
592
593 def add_blockdev(self, opts):
594 self._args.append('-blockdev')
595 if isinstance(opts, str):
596 self._args.append(opts)
597 else:
598 self._args.append(','.join(opts))
599 return self
600
601 def add_incoming(self, addr):
602 self._args.append('-incoming')
603 self._args.append(addr)
604 return self
605
606 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
607 cmd = 'human-monitor-command'
608 kwargs = {'command-line': command_line}
609 if use_log:
610 return self.qmp_log(cmd, **kwargs)
611 else:
612 return self.qmp(cmd, **kwargs)
613
614 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
615 """Pause drive r/w operations"""
616 if not event:
617 self.pause_drive(drive, "read_aio")
618 self.pause_drive(drive, "write_aio")
619 return
620 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
621
622 def resume_drive(self, drive: str) -> None:
623 """Resume drive r/w operations"""
624 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
625
626 def hmp_qemu_io(self, drive: str, cmd: str,
627 use_log: bool = False) -> QMPMessage:
628 """Write to a given drive using an HMP command"""
629 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
630
631 def flatten_qmp_object(self, obj, output=None, basestr=''):
632 if output is None:
633 output = dict()
634 if isinstance(obj, list):
635 for i, item in enumerate(obj):
636 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
637 elif isinstance(obj, dict):
638 for key in obj:
639 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
640 else:
641 output[basestr[:-1]] = obj # Strip trailing '.'
642 return output
643
644 def qmp_to_opts(self, obj):
645 obj = self.flatten_qmp_object(obj)
646 output_list = list()
647 for key in obj:
648 output_list += [key + '=' + obj[key]]
649 return ','.join(output_list)
650
651 def get_qmp_events_filtered(self, wait=60.0):
652 result = []
653 for ev in self.get_qmp_events(wait=wait):
654 result.append(filter_qmp_event(ev))
655 return result
656
657 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
658 full_cmd = OrderedDict((
659 ("execute", cmd),
660 ("arguments", ordered_qmp(kwargs))
661 ))
662 log(full_cmd, filters, indent=indent)
663 result = self.qmp(cmd, **kwargs)
664 log(result, filters, indent=indent)
665 return result
666
667 # Returns None on success, and an error string on failure
668 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
669 pre_finalize=None, cancel=False, wait=60.0):
670 """
671 run_job moves a job from creation through to dismissal.
672
673 :param job: String. ID of recently-launched job
674 :param auto_finalize: Bool. True if the job was launched with
675 auto_finalize. Defaults to True.
676 :param auto_dismiss: Bool. True if the job was launched with
677 auto_dismiss=True. Defaults to False.
678 :param pre_finalize: Callback. A callable that takes no arguments to be
679 invoked prior to issuing job-finalize, if any.
680 :param cancel: Bool. When true, cancels the job after the pre_finalize
681 callback.
682 :param wait: Float. Timeout value specifying how long to wait for any
683 event, in seconds. Defaults to 60.0.
684 """
685 match_device = {'data': {'device': job}}
686 match_id = {'data': {'id': job}}
687 events = [
688 ('BLOCK_JOB_COMPLETED', match_device),
689 ('BLOCK_JOB_CANCELLED', match_device),
690 ('BLOCK_JOB_ERROR', match_device),
691 ('BLOCK_JOB_READY', match_device),
692 ('BLOCK_JOB_PENDING', match_id),
693 ('JOB_STATUS_CHANGE', match_id)
694 ]
695 error = None
696 while True:
697 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
698 if ev['event'] != 'JOB_STATUS_CHANGE':
699 log(ev)
700 continue
701 status = ev['data']['status']
702 if status == 'aborting':
703 result = self.qmp('query-jobs')
704 for j in result['return']:
705 if j['id'] == job:
706 error = j['error']
707 log('Job failed: %s' % (j['error']))
708 elif status == 'ready':
709 self.qmp_log('job-complete', id=job)
710 elif status == 'pending' and not auto_finalize:
711 if pre_finalize:
712 pre_finalize()
713 if cancel:
714 self.qmp_log('job-cancel', id=job)
715 else:
716 self.qmp_log('job-finalize', id=job)
717 elif status == 'concluded' and not auto_dismiss:
718 self.qmp_log('job-dismiss', id=job)
719 elif status == 'null':
720 return error
721
722 # Returns None on success, and an error string on failure
723 def blockdev_create(self, options, job_id='job0', filters=None):
724 if filters is None:
725 filters = [filter_qmp_testfiles]
726 result = self.qmp_log('blockdev-create', filters=filters,
727 job_id=job_id, options=options)
728
729 if 'return' in result:
730 assert result['return'] == {}
731 job_result = self.run_job(job_id)
732 else:
733 job_result = result['error']
734
735 log("")
736 return job_result
737
738 def enable_migration_events(self, name):
739 log('Enabling migration QMP events on %s...' % name)
740 log(self.qmp('migrate-set-capabilities', capabilities=[
741 {
742 'capability': 'events',
743 'state': True
744 }
745 ]))
746
747 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
748 while True:
749 event = self.event_wait('MIGRATION')
750 log(event, filters=[filter_qmp_event])
751 if event['data']['status'] in ('completed', 'failed'):
752 break
753
754 if event['data']['status'] == 'completed':
755 # The event may occur in finish-migrate, so wait for the expected
756 # post-migration runstate
757 runstate = None
758 while runstate != expect_runstate:
759 runstate = self.qmp('query-status')['return']['status']
760 return True
761 else:
762 return False
763
764 def node_info(self, node_name):
765 nodes = self.qmp('query-named-block-nodes')
766 for x in nodes['return']:
767 if x['node-name'] == node_name:
768 return x
769 return None
770
771 def query_bitmaps(self):
772 res = self.qmp("query-named-block-nodes")
773 return {device['node-name']: device['dirty-bitmaps']
774 for device in res['return'] if 'dirty-bitmaps' in device}
775
776 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
777 """
778 get a specific bitmap from the object returned by query_bitmaps.
779 :param recording: If specified, filter results by the specified value.
780 :param bitmaps: If specified, use it instead of call query_bitmaps()
781 """
782 if bitmaps is None:
783 bitmaps = self.query_bitmaps()
784
785 for bitmap in bitmaps[node_name]:
786 if bitmap.get('name', '') == bitmap_name:
787 if recording is None or bitmap.get('recording') == recording:
788 return bitmap
789 return None
790
791 def check_bitmap_status(self, node_name, bitmap_name, fields):
792 ret = self.get_bitmap(node_name, bitmap_name)
793
794 return fields.items() <= ret.items()
795
796 def assert_block_path(self, root, path, expected_node, graph=None):
797 """
798 Check whether the node under the given path in the block graph
799 is @expected_node.
800
801 @root is the node name of the node where the @path is rooted.
802
803 @path is a string that consists of child names separated by
804 slashes. It must begin with a slash.
805
806 Examples for @root + @path:
807 - root="qcow2-node", path="/backing/file"
808 - root="quorum-node", path="/children.2/file"
809
810 Hypothetically, @path could be empty, in which case it would
811 point to @root. However, in practice this case is not useful
812 and hence not allowed.
813
814 @expected_node may be None. (All elements of the path but the
815 leaf must still exist.)
816
817 @graph may be None or the result of an x-debug-query-block-graph
818 call that has already been performed.
819 """
820 if graph is None:
821 graph = self.qmp('x-debug-query-block-graph')['return']
822
823 iter_path = iter(path.split('/'))
824
825 # Must start with a /
826 assert next(iter_path) == ''
827
828 node = next((node for node in graph['nodes'] if node['name'] == root),
829 None)
830
831 # An empty @path is not allowed, so the root node must be present
832 assert node is not None, 'Root node %s not found' % root
833
834 for child_name in iter_path:
835 assert node is not None, 'Cannot follow path %s%s' % (root, path)
836
837 try:
838 node_id = next(edge['child'] for edge in graph['edges']
839 if (edge['parent'] == node['id'] and
840 edge['name'] == child_name))
841
842 node = next(node for node in graph['nodes']
843 if node['id'] == node_id)
844
845 except StopIteration:
846 node = None
847
848 if node is None:
849 assert expected_node is None, \
850 'No node found under %s (but expected %s)' % \
851 (path, expected_node)
852 else:
853 assert node['name'] == expected_node, \
854 'Found node %s under %s (but expected %s)' % \
855 (node['name'], path, expected_node)
856
857 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
858
859 class QMPTestCase(unittest.TestCase):
860 '''Abstract base class for QMP test cases'''
861
862 def __init__(self, *args, **kwargs):
863 super().__init__(*args, **kwargs)
864 # Many users of this class set a VM property we rely on heavily
865 # in the methods below.
866 self.vm = None
867
868 def dictpath(self, d, path):
869 '''Traverse a path in a nested dict'''
870 for component in path.split('/'):
871 m = index_re.match(component)
872 if m:
873 component, idx = m.groups()
874 idx = int(idx)
875
876 if not isinstance(d, dict) or component not in d:
877 self.fail(f'failed path traversal for "{path}" in "{d}"')
878 d = d[component]
879
880 if m:
881 if not isinstance(d, list):
882 self.fail(f'path component "{component}" in "{path}" '
883 f'is not a list in "{d}"')
884 try:
885 d = d[idx]
886 except IndexError:
887 self.fail(f'invalid index "{idx}" in path "{path}" '
888 f'in "{d}"')
889 return d
890
891 def assert_qmp_absent(self, d, path):
892 try:
893 result = self.dictpath(d, path)
894 except AssertionError:
895 return
896 self.fail('path "%s" has value "%s"' % (path, str(result)))
897
898 def assert_qmp(self, d, path, value):
899 '''Assert that the value for a specific path in a QMP dict
900 matches. When given a list of values, assert that any of
901 them matches.'''
902
903 result = self.dictpath(d, path)
904
905 # [] makes no sense as a list of valid values, so treat it as
906 # an actual single value.
907 if isinstance(value, list) and value != []:
908 for v in value:
909 if result == v:
910 return
911 self.fail('no match for "%s" in %s' % (str(result), str(value)))
912 else:
913 self.assertEqual(result, value,
914 '"%s" is "%s", expected "%s"'
915 % (path, str(result), str(value)))
916
917 def assert_no_active_block_jobs(self):
918 result = self.vm.qmp('query-block-jobs')
919 self.assert_qmp(result, 'return', [])
920
921 def assert_has_block_node(self, node_name=None, file_name=None):
922 """Issue a query-named-block-nodes and assert node_name and/or
923 file_name is present in the result"""
924 def check_equal_or_none(a, b):
925 return a is None or b is None or a == b
926 assert node_name or file_name
927 result = self.vm.qmp('query-named-block-nodes')
928 for x in result["return"]:
929 if check_equal_or_none(x.get("node-name"), node_name) and \
930 check_equal_or_none(x.get("file"), file_name):
931 return
932 self.fail("Cannot find %s %s in result:\n%s" %
933 (node_name, file_name, result))
934
935 def assert_json_filename_equal(self, json_filename, reference):
936 '''Asserts that the given filename is a json: filename and that its
937 content is equal to the given reference object'''
938 self.assertEqual(json_filename[:5], 'json:')
939 self.assertEqual(
940 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
941 self.vm.flatten_qmp_object(reference)
942 )
943
944 def cancel_and_wait(self, drive='drive0', force=False,
945 resume=False, wait=60.0):
946 '''Cancel a block job and wait for it to finish, returning the event'''
947 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
948 self.assert_qmp(result, 'return', {})
949
950 if resume:
951 self.vm.resume_drive(drive)
952
953 cancelled = False
954 result = None
955 while not cancelled:
956 for event in self.vm.get_qmp_events(wait=wait):
957 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
958 event['event'] == 'BLOCK_JOB_CANCELLED':
959 self.assert_qmp(event, 'data/device', drive)
960 result = event
961 cancelled = True
962 elif event['event'] == 'JOB_STATUS_CHANGE':
963 self.assert_qmp(event, 'data/id', drive)
964
965
966 self.assert_no_active_block_jobs()
967 return result
968
969 def wait_until_completed(self, drive='drive0', check_offset=True,
970 wait=60.0, error=None):
971 '''Wait for a block job to finish, returning the event'''
972 while True:
973 for event in self.vm.get_qmp_events(wait=wait):
974 if event['event'] == 'BLOCK_JOB_COMPLETED':
975 self.assert_qmp(event, 'data/device', drive)
976 if error is None:
977 self.assert_qmp_absent(event, 'data/error')
978 if check_offset:
979 self.assert_qmp(event, 'data/offset',
980 event['data']['len'])
981 else:
982 self.assert_qmp(event, 'data/error', error)
983 self.assert_no_active_block_jobs()
984 return event
985 if event['event'] == 'JOB_STATUS_CHANGE':
986 self.assert_qmp(event, 'data/id', drive)
987
988 def wait_ready(self, drive='drive0'):
989 """Wait until a BLOCK_JOB_READY event, and return the event."""
990 return self.vm.events_wait([
991 ('BLOCK_JOB_READY',
992 {'data': {'type': 'mirror', 'device': drive}}),
993 ('BLOCK_JOB_READY',
994 {'data': {'type': 'commit', 'device': drive}})
995 ])
996
997 def wait_ready_and_cancel(self, drive='drive0'):
998 self.wait_ready(drive=drive)
999 event = self.cancel_and_wait(drive=drive)
1000 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1001 self.assert_qmp(event, 'data/type', 'mirror')
1002 self.assert_qmp(event, 'data/offset', event['data']['len'])
1003
1004 def complete_and_wait(self, drive='drive0', wait_ready=True,
1005 completion_error=None):
1006 '''Complete a block job and wait for it to finish'''
1007 if wait_ready:
1008 self.wait_ready(drive=drive)
1009
1010 result = self.vm.qmp('block-job-complete', device=drive)
1011 self.assert_qmp(result, 'return', {})
1012
1013 event = self.wait_until_completed(drive=drive, error=completion_error)
1014 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1015
1016 def pause_wait(self, job_id='job0'):
1017 with Timeout(3, "Timeout waiting for job to pause"):
1018 while True:
1019 result = self.vm.qmp('query-block-jobs')
1020 found = False
1021 for job in result['return']:
1022 if job['device'] == job_id:
1023 found = True
1024 if job['paused'] and not job['busy']:
1025 return job
1026 break
1027 assert found
1028
1029 def pause_job(self, job_id='job0', wait=True):
1030 result = self.vm.qmp('block-job-pause', device=job_id)
1031 self.assert_qmp(result, 'return', {})
1032 if wait:
1033 return self.pause_wait(job_id)
1034 return result
1035
1036 def case_skip(self, reason):
1037 '''Skip this test case'''
1038 case_notrun(reason)
1039 self.skipTest(reason)
1040
1041
1042 def notrun(reason):
1043 '''Skip this test suite'''
1044 # Each test in qemu-iotests has a number ("seq")
1045 seq = os.path.basename(sys.argv[0])
1046
1047 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1048 logger.warning("%s not run: %s", seq, reason)
1049 sys.exit(0)
1050
1051 def case_notrun(reason):
1052 '''Mark this test case as not having been run (without actually
1053 skipping it, that is left to the caller). See
1054 QMPTestCase.case_skip() for a variant that actually skips the
1055 current test case.'''
1056
1057 # Each test in qemu-iotests has a number ("seq")
1058 seq = os.path.basename(sys.argv[0])
1059
1060 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1061 ' [case not run] ' + reason + '\n')
1062
1063 def _verify_image_format(supported_fmts: Sequence[str] = (),
1064 unsupported_fmts: Sequence[str] = ()) -> None:
1065 if 'generic' in supported_fmts and \
1066 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1067 # similar to
1068 # _supported_fmt generic
1069 # for bash tests
1070 supported_fmts = ()
1071
1072 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1073 if not_sup or (imgfmt in unsupported_fmts):
1074 notrun('not suitable for this image format: %s' % imgfmt)
1075
1076 if imgfmt == 'luks':
1077 verify_working_luks()
1078
1079 def _verify_protocol(supported: Sequence[str] = (),
1080 unsupported: Sequence[str] = ()) -> None:
1081 assert not (supported and unsupported)
1082
1083 if 'generic' in supported:
1084 return
1085
1086 not_sup = supported and (imgproto not in supported)
1087 if not_sup or (imgproto in unsupported):
1088 notrun('not suitable for this protocol: %s' % imgproto)
1089
1090 def _verify_platform(supported: Sequence[str] = (),
1091 unsupported: Sequence[str] = ()) -> None:
1092 if any((sys.platform.startswith(x) for x in unsupported)):
1093 notrun('not suitable for this OS: %s' % sys.platform)
1094
1095 if supported:
1096 if not any((sys.platform.startswith(x) for x in supported)):
1097 notrun('not suitable for this OS: %s' % sys.platform)
1098
1099 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1100 if supported_cache_modes and (cachemode not in supported_cache_modes):
1101 notrun('not suitable for this cache mode: %s' % cachemode)
1102
1103 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1104 if supported_aio_modes and (aiomode not in supported_aio_modes):
1105 notrun('not suitable for this aio mode: %s' % aiomode)
1106
1107 def supports_quorum():
1108 return 'quorum' in qemu_img_pipe('--help')
1109
1110 def verify_quorum():
1111 '''Skip test suite if quorum support is not available'''
1112 if not supports_quorum():
1113 notrun('quorum support missing')
1114
1115 def has_working_luks() -> Tuple[bool, str]:
1116 """
1117 Check whether our LUKS driver can actually create images
1118 (this extends to LUKS encryption for qcow2).
1119
1120 If not, return the reason why.
1121 """
1122
1123 img_file = f'{test_dir}/luks-test.luks'
1124 (output, status) = \
1125 qemu_img_pipe_and_status('create', '-f', 'luks',
1126 '--object', luks_default_secret_object,
1127 '-o', luks_default_key_secret_opt,
1128 '-o', 'iter-time=10',
1129 img_file, '1G')
1130 try:
1131 os.remove(img_file)
1132 except OSError:
1133 pass
1134
1135 if status != 0:
1136 reason = output
1137 for line in output.splitlines():
1138 if img_file + ':' in line:
1139 reason = line.split(img_file + ':', 1)[1].strip()
1140 break
1141
1142 return (False, reason)
1143 else:
1144 return (True, '')
1145
1146 def verify_working_luks():
1147 """
1148 Skip test suite if LUKS does not work
1149 """
1150 (working, reason) = has_working_luks()
1151 if not working:
1152 notrun(reason)
1153
1154 def qemu_pipe(*args: str) -> str:
1155 """
1156 Run qemu with an option to print something and exit (e.g. a help option).
1157
1158 :return: QEMU's stdout output.
1159 """
1160 full_args = [qemu_prog] + qemu_opts + list(args)
1161 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1162 return output
1163
1164 def supported_formats(read_only=False):
1165 '''Set 'read_only' to True to check ro-whitelist
1166 Otherwise, rw-whitelist is checked'''
1167
1168 if not hasattr(supported_formats, "formats"):
1169 supported_formats.formats = {}
1170
1171 if read_only not in supported_formats.formats:
1172 format_message = qemu_pipe("-drive", "format=help")
1173 line = 1 if read_only else 0
1174 supported_formats.formats[read_only] = \
1175 format_message.splitlines()[line].split(":")[1].split()
1176
1177 return supported_formats.formats[read_only]
1178
1179 def skip_if_unsupported(required_formats=(), read_only=False):
1180 '''Skip Test Decorator
1181 Runs the test if all the required formats are whitelisted'''
1182 def skip_test_decorator(func):
1183 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1184 **kwargs: Dict[str, Any]) -> None:
1185 if callable(required_formats):
1186 fmts = required_formats(test_case)
1187 else:
1188 fmts = required_formats
1189
1190 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1191 if usf_list:
1192 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1193 test_case.case_skip(msg)
1194 else:
1195 func(test_case, *args, **kwargs)
1196 return func_wrapper
1197 return skip_test_decorator
1198
1199 def skip_for_formats(formats: Sequence[str] = ()) \
1200 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1201 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1202 '''Skip Test Decorator
1203 Skips the test for the given formats'''
1204 def skip_test_decorator(func):
1205 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1206 **kwargs: Dict[str, Any]) -> None:
1207 if imgfmt in formats:
1208 msg = f'{test_case}: Skipped for format {imgfmt}'
1209 test_case.case_skip(msg)
1210 else:
1211 func(test_case, *args, **kwargs)
1212 return func_wrapper
1213 return skip_test_decorator
1214
1215 def skip_if_user_is_root(func):
1216 '''Skip Test Decorator
1217 Runs the test only without root permissions'''
1218 def func_wrapper(*args, **kwargs):
1219 if os.getuid() == 0:
1220 case_notrun('{}: cannot be run as root'.format(args[0]))
1221 return None
1222 else:
1223 return func(*args, **kwargs)
1224 return func_wrapper
1225
1226 def execute_unittest(debug=False):
1227 """Executes unittests within the calling module."""
1228
1229 verbosity = 2 if debug else 1
1230
1231 if debug:
1232 output = sys.stdout
1233 else:
1234 # We need to filter out the time taken from the output so that
1235 # qemu-iotest can reliably diff the results against master output.
1236 output = io.StringIO()
1237
1238 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1239 verbosity=verbosity)
1240 try:
1241 # unittest.main() will use sys.exit(); so expect a SystemExit
1242 # exception
1243 unittest.main(testRunner=runner)
1244 finally:
1245 # We need to filter out the time taken from the output so that
1246 # qemu-iotest can reliably diff the results against master output.
1247 if not debug:
1248 out = output.getvalue()
1249 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1250
1251 # Hide skipped tests from the reference output
1252 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1253 out_first_line, out_rest = out.split('\n', 1)
1254 out = out_first_line.replace('s', '.') + '\n' + out_rest
1255
1256 sys.stderr.write(out)
1257
1258 def execute_setup_common(supported_fmts: Sequence[str] = (),
1259 supported_platforms: Sequence[str] = (),
1260 supported_cache_modes: Sequence[str] = (),
1261 supported_aio_modes: Sequence[str] = (),
1262 unsupported_fmts: Sequence[str] = (),
1263 supported_protocols: Sequence[str] = (),
1264 unsupported_protocols: Sequence[str] = ()) -> bool:
1265 """
1266 Perform necessary setup for either script-style or unittest-style tests.
1267
1268 :return: Bool; Whether or not debug mode has been requested via the CLI.
1269 """
1270 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1271
1272 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1273 # indicate that we're not being run via "check". There may be
1274 # other things set up by "check" that individual test cases rely
1275 # on.
1276 if test_dir is None or qemu_default_machine is None:
1277 sys.stderr.write('Please run this test via the "check" script\n')
1278 sys.exit(os.EX_USAGE)
1279
1280 debug = '-d' in sys.argv
1281 if debug:
1282 sys.argv.remove('-d')
1283 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1284
1285 _verify_image_format(supported_fmts, unsupported_fmts)
1286 _verify_protocol(supported_protocols, unsupported_protocols)
1287 _verify_platform(supported=supported_platforms)
1288 _verify_cache_mode(supported_cache_modes)
1289 _verify_aio_mode(supported_aio_modes)
1290
1291 return debug
1292
1293 def execute_test(*args, test_function=None, **kwargs):
1294 """Run either unittest or script-style tests."""
1295
1296 debug = execute_setup_common(*args, **kwargs)
1297 if not test_function:
1298 execute_unittest(debug)
1299 else:
1300 test_function()
1301
1302 def activate_logging():
1303 """Activate iotests.log() output to stdout for script-style tests."""
1304 handler = logging.StreamHandler(stream=sys.stdout)
1305 formatter = logging.Formatter('%(message)s')
1306 handler.setFormatter(formatter)
1307 test_logger.addHandler(handler)
1308 test_logger.setLevel(logging.INFO)
1309 test_logger.propagate = False
1310
1311 # This is called from script-style iotests without a single point of entry
1312 def script_initialize(*args, **kwargs):
1313 """Initialize script-style tests without running any tests."""
1314 activate_logging()
1315 execute_setup_common(*args, **kwargs)
1316
1317 # This is called from script-style iotests with a single point of entry
1318 def script_main(test_function, *args, **kwargs):
1319 """Run script-style tests outside of the unittest framework"""
1320 activate_logging()
1321 execute_test(*args, test_function=test_function, **kwargs)
1322
1323 # This is called from unittest style iotests
1324 def main(*args, **kwargs):
1325 """Run tests using the unittest framework"""
1326 execute_test(*args, **kwargs)