iotests: Make qemu_nbd_popen() a contextmanager
[qemu.git] / tests / qemu-iotests / iotests.py
1 # Common utilities and Python wrappers for qemu-iotests
2 #
3 # Copyright (C) 2012 IBM Corp.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import atexit
20 from collections import OrderedDict
21 import faulthandler
22 import io
23 import json
24 import logging
25 import os
26 import re
27 import signal
28 import struct
29 import subprocess
30 import sys
31 import time
32 from typing import (Any, Callable, Dict, Iterable,
33 List, Optional, Sequence, Tuple, TypeVar)
34 import unittest
35
36 from contextlib import contextmanager
37
38 # pylint: disable=import-error, wrong-import-position
39 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40 from qemu import qtest
41 from qemu.qmp import QMPMessage
42
43 assert sys.version_info >= (3, 6)
44
45 # Use this logger for logging messages directly from the iotests module
46 logger = logging.getLogger('qemu.iotests')
47 logger.addHandler(logging.NullHandler())
48
49 # Use this logger for messages that ought to be used for diff output.
50 test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53 faulthandler.enable()
54
55 # This will not work if arguments contain spaces but is necessary if we
56 # want to support the override options that ./check supports.
57 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58 if os.environ.get('QEMU_IMG_OPTIONS'):
59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62 if os.environ.get('QEMU_IO_OPTIONS'):
63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67 qemu_io_args_no_fmt += \
68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77 imgfmt = os.environ.get('IMGFMT', 'raw')
78 imgproto = os.environ.get('IMGPROTO', 'file')
79 test_dir = os.environ.get('TEST_DIR')
80 sock_dir = os.environ.get('SOCK_DIR')
81 output_dir = os.environ.get('OUTPUT_DIR', '.')
82 cachemode = os.environ.get('CACHEMODE')
83 aiomode = os.environ.get('AIOMODE')
84 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85
86 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87
88 luks_default_secret_object = 'secret,id=keysec0,data=' + \
89 os.environ.get('IMGKEYSECRET', '')
90 luks_default_key_secret_opt = 'key-secret=keysec0'
91
92
93 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94 """
95 Run qemu-img and return both its output and its exit code
96 """
97 subp = subprocess.Popen(qemu_img_args + list(args),
98 stdout=subprocess.PIPE,
99 stderr=subprocess.STDOUT,
100 universal_newlines=True)
101 output = subp.communicate()[0]
102 if subp.returncode < 0:
103 sys.stderr.write('qemu-img received signal %i: %s\n'
104 % (-subp.returncode,
105 ' '.join(qemu_img_args + list(args))))
106 return (output, subp.returncode)
107
108 def qemu_img(*args: str) -> int:
109 '''Run qemu-img and return the exit code'''
110 return qemu_img_pipe_and_status(*args)[1]
111
112 def ordered_qmp(qmsg, conv_keys=True):
113 # Dictionaries are not ordered prior to 3.6, therefore:
114 if isinstance(qmsg, list):
115 return [ordered_qmp(atom) for atom in qmsg]
116 if isinstance(qmsg, dict):
117 od = OrderedDict()
118 for k, v in sorted(qmsg.items()):
119 if conv_keys:
120 k = k.replace('_', '-')
121 od[k] = ordered_qmp(v, conv_keys=False)
122 return od
123 return qmsg
124
125 def qemu_img_create(*args):
126 args = list(args)
127
128 # default luks support
129 if '-f' in args and args[args.index('-f') + 1] == 'luks':
130 if '-o' in args:
131 i = args.index('-o')
132 if 'key-secret' not in args[i + 1]:
133 args[i + 1].append(luks_default_key_secret_opt)
134 args.insert(i + 2, '--object')
135 args.insert(i + 3, luks_default_secret_object)
136 else:
137 args = ['-o', luks_default_key_secret_opt,
138 '--object', luks_default_secret_object] + args
139
140 args.insert(0, 'create')
141
142 return qemu_img(*args)
143
144 def qemu_img_verbose(*args):
145 '''Run qemu-img without suppressing its output and return the exit code'''
146 exitcode = subprocess.call(qemu_img_args + list(args))
147 if exitcode < 0:
148 sys.stderr.write('qemu-img received signal %i: %s\n'
149 % (-exitcode, ' '.join(qemu_img_args + list(args))))
150 return exitcode
151
152 def qemu_img_pipe(*args: str) -> str:
153 '''Run qemu-img and return its output'''
154 return qemu_img_pipe_and_status(*args)[0]
155
156 def qemu_img_log(*args):
157 result = qemu_img_pipe(*args)
158 log(result, filters=[filter_testfiles])
159 return result
160
161 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
162 args = ['info']
163 if imgopts:
164 args.append('--image-opts')
165 else:
166 args += ['-f', imgfmt]
167 args += extra_args
168 args.append(filename)
169
170 output = qemu_img_pipe(*args)
171 if not filter_path:
172 filter_path = filename
173 log(filter_img_info(output, filter_path))
174
175 def qemu_io(*args):
176 '''Run qemu-io and return the stdout data'''
177 args = qemu_io_args + list(args)
178 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
179 stderr=subprocess.STDOUT,
180 universal_newlines=True)
181 output = subp.communicate()[0]
182 if subp.returncode < 0:
183 sys.stderr.write('qemu-io received signal %i: %s\n'
184 % (-subp.returncode, ' '.join(args)))
185 return output
186
187 def qemu_io_log(*args):
188 result = qemu_io(*args)
189 log(result, filters=[filter_testfiles, filter_qemu_io])
190 return result
191
192 def qemu_io_silent(*args):
193 '''Run qemu-io and return the exit code, suppressing stdout'''
194 args = qemu_io_args + list(args)
195 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
196 if exitcode < 0:
197 sys.stderr.write('qemu-io received signal %i: %s\n' %
198 (-exitcode, ' '.join(args)))
199 return exitcode
200
201 def qemu_io_silent_check(*args):
202 '''Run qemu-io and return the true if subprocess returned 0'''
203 args = qemu_io_args + list(args)
204 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
205 stderr=subprocess.STDOUT)
206 return exitcode == 0
207
208 def get_virtio_scsi_device():
209 if qemu_default_machine == 's390-ccw-virtio':
210 return 'virtio-scsi-ccw'
211 return 'virtio-scsi-pci'
212
213 class QemuIoInteractive:
214 def __init__(self, *args):
215 self.args = qemu_io_args_no_fmt + list(args)
216 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
217 stdout=subprocess.PIPE,
218 stderr=subprocess.STDOUT,
219 universal_newlines=True)
220 out = self._p.stdout.read(9)
221 if out != 'qemu-io> ':
222 # Most probably qemu-io just failed to start.
223 # Let's collect the whole output and exit.
224 out += self._p.stdout.read()
225 self._p.wait(timeout=1)
226 raise ValueError(out)
227
228 def close(self):
229 self._p.communicate('q\n')
230
231 def _read_output(self):
232 pattern = 'qemu-io> '
233 n = len(pattern)
234 pos = 0
235 s = []
236 while pos != n:
237 c = self._p.stdout.read(1)
238 # check unexpected EOF
239 assert c != ''
240 s.append(c)
241 if c == pattern[pos]:
242 pos += 1
243 else:
244 pos = 0
245
246 return ''.join(s[:-n])
247
248 def cmd(self, cmd):
249 # quit command is in close(), '\n' is added automatically
250 assert '\n' not in cmd
251 cmd = cmd.strip()
252 assert cmd not in ('q', 'quit')
253 self._p.stdin.write(cmd + '\n')
254 self._p.stdin.flush()
255 return self._read_output()
256
257
258 def qemu_nbd(*args):
259 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
260 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
261
262 def qemu_nbd_early_pipe(*args):
263 '''Run qemu-nbd in daemon mode and return both the parent's exit code
264 and its output in case of an error'''
265 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
266 stdout=subprocess.PIPE,
267 universal_newlines=True)
268 output = subp.communicate()[0]
269 if subp.returncode < 0:
270 sys.stderr.write('qemu-nbd received signal %i: %s\n' %
271 (-subp.returncode,
272 ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
273
274 return subp.returncode, output if subp.returncode else ''
275
276 @contextmanager
277 def qemu_nbd_popen(*args):
278 '''Context manager running qemu-nbd within the context'''
279 pid_file = file_path("pid")
280
281 cmd = list(qemu_nbd_args)
282 cmd.extend(('--persistent', '--pid-file', pid_file))
283 cmd.extend(args)
284
285 log('Start NBD server')
286 p = subprocess.Popen(cmd)
287 try:
288 while not os.path.exists(pid_file):
289 if p.poll() is not None:
290 raise RuntimeError(
291 "qemu-nbd terminated with exit code {}: {}"
292 .format(p.returncode, ' '.join(cmd)))
293
294 time.sleep(0.01)
295 yield
296 finally:
297 log('Kill NBD server')
298 p.kill()
299 p.wait()
300
301 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
302 '''Return True if two image files are identical'''
303 return qemu_img('compare', '-f', fmt1,
304 '-F', fmt2, img1, img2) == 0
305
306 def create_image(name, size):
307 '''Create a fully-allocated raw image with sector markers'''
308 file = open(name, 'wb')
309 i = 0
310 while i < size:
311 sector = struct.pack('>l504xl', i // 512, i // 512)
312 file.write(sector)
313 i = i + 512
314 file.close()
315
316 def image_size(img):
317 '''Return image's virtual size'''
318 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
319 return json.loads(r)['virtual-size']
320
321 def is_str(val):
322 return isinstance(val, str)
323
324 test_dir_re = re.compile(r"%s" % test_dir)
325 def filter_test_dir(msg):
326 return test_dir_re.sub("TEST_DIR", msg)
327
328 win32_re = re.compile(r"\r")
329 def filter_win32(msg):
330 return win32_re.sub("", msg)
331
332 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
333 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
334 r"and [0-9\/.inf]* ops\/sec\)")
335 def filter_qemu_io(msg):
336 msg = filter_win32(msg)
337 return qemu_io_re.sub("X ops; XX:XX:XX.X "
338 "(XXX YYY/sec and XXX ops/sec)", msg)
339
340 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
341 def filter_chown(msg):
342 return chown_re.sub("chown UID:GID", msg)
343
344 def filter_qmp_event(event):
345 '''Filter a QMP event dict'''
346 event = dict(event)
347 if 'timestamp' in event:
348 event['timestamp']['seconds'] = 'SECS'
349 event['timestamp']['microseconds'] = 'USECS'
350 return event
351
352 def filter_qmp(qmsg, filter_fn):
353 '''Given a string filter, filter a QMP object's values.
354 filter_fn takes a (key, value) pair.'''
355 # Iterate through either lists or dicts;
356 if isinstance(qmsg, list):
357 items = enumerate(qmsg)
358 else:
359 items = qmsg.items()
360
361 for k, v in items:
362 if isinstance(v, (dict, list)):
363 qmsg[k] = filter_qmp(v, filter_fn)
364 else:
365 qmsg[k] = filter_fn(k, v)
366 return qmsg
367
368 def filter_testfiles(msg):
369 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
370 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
371 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
372
373 def filter_qmp_testfiles(qmsg):
374 def _filter(_key, value):
375 if is_str(value):
376 return filter_testfiles(value)
377 return value
378 return filter_qmp(qmsg, _filter)
379
380 def filter_generated_node_ids(msg):
381 return re.sub("#block[0-9]+", "NODE_NAME", msg)
382
383 def filter_img_info(output, filename):
384 lines = []
385 for line in output.split('\n'):
386 if 'disk size' in line or 'actual-size' in line:
387 continue
388 line = line.replace(filename, 'TEST_IMG')
389 line = filter_testfiles(line)
390 line = line.replace(imgfmt, 'IMGFMT')
391 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
392 line = re.sub('uuid: [-a-f0-9]+',
393 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
394 line)
395 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
396 lines.append(line)
397 return '\n'.join(lines)
398
399 def filter_imgfmt(msg):
400 return msg.replace(imgfmt, 'IMGFMT')
401
402 def filter_qmp_imgfmt(qmsg):
403 def _filter(_key, value):
404 if is_str(value):
405 return filter_imgfmt(value)
406 return value
407 return filter_qmp(qmsg, _filter)
408
409
410 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
411
412 def log(msg: Msg,
413 filters: Iterable[Callable[[Msg], Msg]] = (),
414 indent: Optional[int] = None) -> None:
415 """
416 Logs either a string message or a JSON serializable message (like QMP).
417 If indent is provided, JSON serializable messages are pretty-printed.
418 """
419 for flt in filters:
420 msg = flt(msg)
421 if isinstance(msg, (dict, list)):
422 # Don't sort if it's already sorted
423 do_sort = not isinstance(msg, OrderedDict)
424 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
425 else:
426 test_logger.info(msg)
427
428 class Timeout:
429 def __init__(self, seconds, errmsg="Timeout"):
430 self.seconds = seconds
431 self.errmsg = errmsg
432 def __enter__(self):
433 signal.signal(signal.SIGALRM, self.timeout)
434 signal.setitimer(signal.ITIMER_REAL, self.seconds)
435 return self
436 def __exit__(self, exc_type, value, traceback):
437 signal.setitimer(signal.ITIMER_REAL, 0)
438 return False
439 def timeout(self, signum, frame):
440 raise Exception(self.errmsg)
441
442 def file_pattern(name):
443 return "{0}-{1}".format(os.getpid(), name)
444
445 class FilePaths:
446 """
447 FilePaths is an auto-generated filename that cleans itself up.
448
449 Use this context manager to generate filenames and ensure that the file
450 gets deleted::
451
452 with FilePaths(['test.img']) as img_path:
453 qemu_img('create', img_path, '1G')
454 # migration_sock_path is automatically deleted
455 """
456 def __init__(self, names, base_dir=test_dir):
457 self.paths = []
458 for name in names:
459 self.paths.append(os.path.join(base_dir, file_pattern(name)))
460
461 def __enter__(self):
462 return self.paths
463
464 def __exit__(self, exc_type, exc_val, exc_tb):
465 try:
466 for path in self.paths:
467 os.remove(path)
468 except OSError:
469 pass
470 return False
471
472 class FilePath(FilePaths):
473 """
474 FilePath is a specialization of FilePaths that takes a single filename.
475 """
476 def __init__(self, name, base_dir=test_dir):
477 super(FilePath, self).__init__([name], base_dir)
478
479 def __enter__(self):
480 return self.paths[0]
481
482 def file_path_remover():
483 for path in reversed(file_path_remover.paths):
484 try:
485 os.remove(path)
486 except OSError:
487 pass
488
489
490 def file_path(*names, base_dir=test_dir):
491 ''' Another way to get auto-generated filename that cleans itself up.
492
493 Use is as simple as:
494
495 img_a, img_b = file_path('a.img', 'b.img')
496 sock = file_path('socket')
497 '''
498
499 if not hasattr(file_path_remover, 'paths'):
500 file_path_remover.paths = []
501 atexit.register(file_path_remover)
502
503 paths = []
504 for name in names:
505 filename = file_pattern(name)
506 path = os.path.join(base_dir, filename)
507 file_path_remover.paths.append(path)
508 paths.append(path)
509
510 return paths[0] if len(paths) == 1 else paths
511
512 def remote_filename(path):
513 if imgproto == 'file':
514 return path
515 elif imgproto == 'ssh':
516 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
517 else:
518 raise Exception("Protocol %s not supported" % (imgproto))
519
520 class VM(qtest.QEMUQtestMachine):
521 '''A QEMU VM'''
522
523 def __init__(self, path_suffix=''):
524 name = "qemu%s-%d" % (path_suffix, os.getpid())
525 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
526 test_dir=test_dir,
527 socket_scm_helper=socket_scm_helper,
528 sock_dir=sock_dir)
529 self._num_drives = 0
530
531 def add_object(self, opts):
532 self._args.append('-object')
533 self._args.append(opts)
534 return self
535
536 def add_device(self, opts):
537 self._args.append('-device')
538 self._args.append(opts)
539 return self
540
541 def add_drive_raw(self, opts):
542 self._args.append('-drive')
543 self._args.append(opts)
544 return self
545
546 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
547 '''Add a virtio-blk drive to the VM'''
548 options = ['if=%s' % interface,
549 'id=drive%d' % self._num_drives]
550
551 if path is not None:
552 options.append('file=%s' % path)
553 options.append('format=%s' % img_format)
554 options.append('cache=%s' % cachemode)
555 options.append('aio=%s' % aiomode)
556
557 if opts:
558 options.append(opts)
559
560 if img_format == 'luks' and 'key-secret' not in opts:
561 # default luks support
562 if luks_default_secret_object not in self._args:
563 self.add_object(luks_default_secret_object)
564
565 options.append(luks_default_key_secret_opt)
566
567 self._args.append('-drive')
568 self._args.append(','.join(options))
569 self._num_drives += 1
570 return self
571
572 def add_blockdev(self, opts):
573 self._args.append('-blockdev')
574 if isinstance(opts, str):
575 self._args.append(opts)
576 else:
577 self._args.append(','.join(opts))
578 return self
579
580 def add_incoming(self, addr):
581 self._args.append('-incoming')
582 self._args.append(addr)
583 return self
584
585 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
586 cmd = 'human-monitor-command'
587 kwargs = {'command-line': command_line}
588 if use_log:
589 return self.qmp_log(cmd, **kwargs)
590 else:
591 return self.qmp(cmd, **kwargs)
592
593 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
594 """Pause drive r/w operations"""
595 if not event:
596 self.pause_drive(drive, "read_aio")
597 self.pause_drive(drive, "write_aio")
598 return
599 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
600
601 def resume_drive(self, drive: str) -> None:
602 """Resume drive r/w operations"""
603 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
604
605 def hmp_qemu_io(self, drive: str, cmd: str,
606 use_log: bool = False) -> QMPMessage:
607 """Write to a given drive using an HMP command"""
608 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
609
610 def flatten_qmp_object(self, obj, output=None, basestr=''):
611 if output is None:
612 output = dict()
613 if isinstance(obj, list):
614 for i, item in enumerate(obj):
615 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
616 elif isinstance(obj, dict):
617 for key in obj:
618 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
619 else:
620 output[basestr[:-1]] = obj # Strip trailing '.'
621 return output
622
623 def qmp_to_opts(self, obj):
624 obj = self.flatten_qmp_object(obj)
625 output_list = list()
626 for key in obj:
627 output_list += [key + '=' + obj[key]]
628 return ','.join(output_list)
629
630 def get_qmp_events_filtered(self, wait=60.0):
631 result = []
632 for ev in self.get_qmp_events(wait=wait):
633 result.append(filter_qmp_event(ev))
634 return result
635
636 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
637 full_cmd = OrderedDict((
638 ("execute", cmd),
639 ("arguments", ordered_qmp(kwargs))
640 ))
641 log(full_cmd, filters, indent=indent)
642 result = self.qmp(cmd, **kwargs)
643 log(result, filters, indent=indent)
644 return result
645
646 # Returns None on success, and an error string on failure
647 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
648 pre_finalize=None, cancel=False, wait=60.0):
649 """
650 run_job moves a job from creation through to dismissal.
651
652 :param job: String. ID of recently-launched job
653 :param auto_finalize: Bool. True if the job was launched with
654 auto_finalize. Defaults to True.
655 :param auto_dismiss: Bool. True if the job was launched with
656 auto_dismiss=True. Defaults to False.
657 :param pre_finalize: Callback. A callable that takes no arguments to be
658 invoked prior to issuing job-finalize, if any.
659 :param cancel: Bool. When true, cancels the job after the pre_finalize
660 callback.
661 :param wait: Float. Timeout value specifying how long to wait for any
662 event, in seconds. Defaults to 60.0.
663 """
664 match_device = {'data': {'device': job}}
665 match_id = {'data': {'id': job}}
666 events = [
667 ('BLOCK_JOB_COMPLETED', match_device),
668 ('BLOCK_JOB_CANCELLED', match_device),
669 ('BLOCK_JOB_ERROR', match_device),
670 ('BLOCK_JOB_READY', match_device),
671 ('BLOCK_JOB_PENDING', match_id),
672 ('JOB_STATUS_CHANGE', match_id)
673 ]
674 error = None
675 while True:
676 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
677 if ev['event'] != 'JOB_STATUS_CHANGE':
678 log(ev)
679 continue
680 status = ev['data']['status']
681 if status == 'aborting':
682 result = self.qmp('query-jobs')
683 for j in result['return']:
684 if j['id'] == job:
685 error = j['error']
686 log('Job failed: %s' % (j['error']))
687 elif status == 'ready':
688 self.qmp_log('job-complete', id=job)
689 elif status == 'pending' and not auto_finalize:
690 if pre_finalize:
691 pre_finalize()
692 if cancel:
693 self.qmp_log('job-cancel', id=job)
694 else:
695 self.qmp_log('job-finalize', id=job)
696 elif status == 'concluded' and not auto_dismiss:
697 self.qmp_log('job-dismiss', id=job)
698 elif status == 'null':
699 return error
700
701 # Returns None on success, and an error string on failure
702 def blockdev_create(self, options, job_id='job0', filters=None):
703 if filters is None:
704 filters = [filter_qmp_testfiles]
705 result = self.qmp_log('blockdev-create', filters=filters,
706 job_id=job_id, options=options)
707
708 if 'return' in result:
709 assert result['return'] == {}
710 job_result = self.run_job(job_id)
711 else:
712 job_result = result['error']
713
714 log("")
715 return job_result
716
717 def enable_migration_events(self, name):
718 log('Enabling migration QMP events on %s...' % name)
719 log(self.qmp('migrate-set-capabilities', capabilities=[
720 {
721 'capability': 'events',
722 'state': True
723 }
724 ]))
725
726 def wait_migration(self, expect_runstate):
727 while True:
728 event = self.event_wait('MIGRATION')
729 log(event, filters=[filter_qmp_event])
730 if event['data']['status'] == 'completed':
731 break
732 # The event may occur in finish-migrate, so wait for the expected
733 # post-migration runstate
734 while self.qmp('query-status')['return']['status'] != expect_runstate:
735 pass
736
737 def node_info(self, node_name):
738 nodes = self.qmp('query-named-block-nodes')
739 for x in nodes['return']:
740 if x['node-name'] == node_name:
741 return x
742 return None
743
744 def query_bitmaps(self):
745 res = self.qmp("query-named-block-nodes")
746 return {device['node-name']: device['dirty-bitmaps']
747 for device in res['return'] if 'dirty-bitmaps' in device}
748
749 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
750 """
751 get a specific bitmap from the object returned by query_bitmaps.
752 :param recording: If specified, filter results by the specified value.
753 :param bitmaps: If specified, use it instead of call query_bitmaps()
754 """
755 if bitmaps is None:
756 bitmaps = self.query_bitmaps()
757
758 for bitmap in bitmaps[node_name]:
759 if bitmap.get('name', '') == bitmap_name:
760 if recording is None or bitmap.get('recording') == recording:
761 return bitmap
762 return None
763
764 def check_bitmap_status(self, node_name, bitmap_name, fields):
765 ret = self.get_bitmap(node_name, bitmap_name)
766
767 return fields.items() <= ret.items()
768
769 def assert_block_path(self, root, path, expected_node, graph=None):
770 """
771 Check whether the node under the given path in the block graph
772 is @expected_node.
773
774 @root is the node name of the node where the @path is rooted.
775
776 @path is a string that consists of child names separated by
777 slashes. It must begin with a slash.
778
779 Examples for @root + @path:
780 - root="qcow2-node", path="/backing/file"
781 - root="quorum-node", path="/children.2/file"
782
783 Hypothetically, @path could be empty, in which case it would
784 point to @root. However, in practice this case is not useful
785 and hence not allowed.
786
787 @expected_node may be None. (All elements of the path but the
788 leaf must still exist.)
789
790 @graph may be None or the result of an x-debug-query-block-graph
791 call that has already been performed.
792 """
793 if graph is None:
794 graph = self.qmp('x-debug-query-block-graph')['return']
795
796 iter_path = iter(path.split('/'))
797
798 # Must start with a /
799 assert next(iter_path) == ''
800
801 node = next((node for node in graph['nodes'] if node['name'] == root),
802 None)
803
804 # An empty @path is not allowed, so the root node must be present
805 assert node is not None, 'Root node %s not found' % root
806
807 for child_name in iter_path:
808 assert node is not None, 'Cannot follow path %s%s' % (root, path)
809
810 try:
811 node_id = next(edge['child'] for edge in graph['edges']
812 if (edge['parent'] == node['id'] and
813 edge['name'] == child_name))
814
815 node = next(node for node in graph['nodes']
816 if node['id'] == node_id)
817
818 except StopIteration:
819 node = None
820
821 if node is None:
822 assert expected_node is None, \
823 'No node found under %s (but expected %s)' % \
824 (path, expected_node)
825 else:
826 assert node['name'] == expected_node, \
827 'Found node %s under %s (but expected %s)' % \
828 (node['name'], path, expected_node)
829
830 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
831
832 class QMPTestCase(unittest.TestCase):
833 '''Abstract base class for QMP test cases'''
834
835 def __init__(self, *args, **kwargs):
836 super().__init__(*args, **kwargs)
837 # Many users of this class set a VM property we rely on heavily
838 # in the methods below.
839 self.vm = None
840
841 def dictpath(self, d, path):
842 '''Traverse a path in a nested dict'''
843 for component in path.split('/'):
844 m = index_re.match(component)
845 if m:
846 component, idx = m.groups()
847 idx = int(idx)
848
849 if not isinstance(d, dict) or component not in d:
850 self.fail(f'failed path traversal for "{path}" in "{d}"')
851 d = d[component]
852
853 if m:
854 if not isinstance(d, list):
855 self.fail(f'path component "{component}" in "{path}" '
856 f'is not a list in "{d}"')
857 try:
858 d = d[idx]
859 except IndexError:
860 self.fail(f'invalid index "{idx}" in path "{path}" '
861 f'in "{d}"')
862 return d
863
864 def assert_qmp_absent(self, d, path):
865 try:
866 result = self.dictpath(d, path)
867 except AssertionError:
868 return
869 self.fail('path "%s" has value "%s"' % (path, str(result)))
870
871 def assert_qmp(self, d, path, value):
872 '''Assert that the value for a specific path in a QMP dict
873 matches. When given a list of values, assert that any of
874 them matches.'''
875
876 result = self.dictpath(d, path)
877
878 # [] makes no sense as a list of valid values, so treat it as
879 # an actual single value.
880 if isinstance(value, list) and value != []:
881 for v in value:
882 if result == v:
883 return
884 self.fail('no match for "%s" in %s' % (str(result), str(value)))
885 else:
886 self.assertEqual(result, value,
887 '"%s" is "%s", expected "%s"'
888 % (path, str(result), str(value)))
889
890 def assert_no_active_block_jobs(self):
891 result = self.vm.qmp('query-block-jobs')
892 self.assert_qmp(result, 'return', [])
893
894 def assert_has_block_node(self, node_name=None, file_name=None):
895 """Issue a query-named-block-nodes and assert node_name and/or
896 file_name is present in the result"""
897 def check_equal_or_none(a, b):
898 return a is None or b is None or a == b
899 assert node_name or file_name
900 result = self.vm.qmp('query-named-block-nodes')
901 for x in result["return"]:
902 if check_equal_or_none(x.get("node-name"), node_name) and \
903 check_equal_or_none(x.get("file"), file_name):
904 return
905 self.fail("Cannot find %s %s in result:\n%s" %
906 (node_name, file_name, result))
907
908 def assert_json_filename_equal(self, json_filename, reference):
909 '''Asserts that the given filename is a json: filename and that its
910 content is equal to the given reference object'''
911 self.assertEqual(json_filename[:5], 'json:')
912 self.assertEqual(
913 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
914 self.vm.flatten_qmp_object(reference)
915 )
916
917 def cancel_and_wait(self, drive='drive0', force=False,
918 resume=False, wait=60.0):
919 '''Cancel a block job and wait for it to finish, returning the event'''
920 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
921 self.assert_qmp(result, 'return', {})
922
923 if resume:
924 self.vm.resume_drive(drive)
925
926 cancelled = False
927 result = None
928 while not cancelled:
929 for event in self.vm.get_qmp_events(wait=wait):
930 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
931 event['event'] == 'BLOCK_JOB_CANCELLED':
932 self.assert_qmp(event, 'data/device', drive)
933 result = event
934 cancelled = True
935 elif event['event'] == 'JOB_STATUS_CHANGE':
936 self.assert_qmp(event, 'data/id', drive)
937
938
939 self.assert_no_active_block_jobs()
940 return result
941
942 def wait_until_completed(self, drive='drive0', check_offset=True,
943 wait=60.0, error=None):
944 '''Wait for a block job to finish, returning the event'''
945 while True:
946 for event in self.vm.get_qmp_events(wait=wait):
947 if event['event'] == 'BLOCK_JOB_COMPLETED':
948 self.assert_qmp(event, 'data/device', drive)
949 if error is None:
950 self.assert_qmp_absent(event, 'data/error')
951 if check_offset:
952 self.assert_qmp(event, 'data/offset',
953 event['data']['len'])
954 else:
955 self.assert_qmp(event, 'data/error', error)
956 self.assert_no_active_block_jobs()
957 return event
958 if event['event'] == 'JOB_STATUS_CHANGE':
959 self.assert_qmp(event, 'data/id', drive)
960
961 def wait_ready(self, drive='drive0'):
962 """Wait until a BLOCK_JOB_READY event, and return the event."""
963 f = {'data': {'type': 'mirror', 'device': drive}}
964 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
965
966 def wait_ready_and_cancel(self, drive='drive0'):
967 self.wait_ready(drive=drive)
968 event = self.cancel_and_wait(drive=drive)
969 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
970 self.assert_qmp(event, 'data/type', 'mirror')
971 self.assert_qmp(event, 'data/offset', event['data']['len'])
972
973 def complete_and_wait(self, drive='drive0', wait_ready=True,
974 completion_error=None):
975 '''Complete a block job and wait for it to finish'''
976 if wait_ready:
977 self.wait_ready(drive=drive)
978
979 result = self.vm.qmp('block-job-complete', device=drive)
980 self.assert_qmp(result, 'return', {})
981
982 event = self.wait_until_completed(drive=drive, error=completion_error)
983 self.assert_qmp(event, 'data/type', 'mirror')
984
985 def pause_wait(self, job_id='job0'):
986 with Timeout(3, "Timeout waiting for job to pause"):
987 while True:
988 result = self.vm.qmp('query-block-jobs')
989 found = False
990 for job in result['return']:
991 if job['device'] == job_id:
992 found = True
993 if job['paused'] and not job['busy']:
994 return job
995 break
996 assert found
997
998 def pause_job(self, job_id='job0', wait=True):
999 result = self.vm.qmp('block-job-pause', device=job_id)
1000 self.assert_qmp(result, 'return', {})
1001 if wait:
1002 return self.pause_wait(job_id)
1003 return result
1004
1005 def case_skip(self, reason):
1006 '''Skip this test case'''
1007 case_notrun(reason)
1008 self.skipTest(reason)
1009
1010
1011 def notrun(reason):
1012 '''Skip this test suite'''
1013 # Each test in qemu-iotests has a number ("seq")
1014 seq = os.path.basename(sys.argv[0])
1015
1016 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1017 logger.warning("%s not run: %s", seq, reason)
1018 sys.exit(0)
1019
1020 def case_notrun(reason):
1021 '''Mark this test case as not having been run (without actually
1022 skipping it, that is left to the caller). See
1023 QMPTestCase.case_skip() for a variant that actually skips the
1024 current test case.'''
1025
1026 # Each test in qemu-iotests has a number ("seq")
1027 seq = os.path.basename(sys.argv[0])
1028
1029 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1030 ' [case not run] ' + reason + '\n')
1031
1032 def _verify_image_format(supported_fmts: Sequence[str] = (),
1033 unsupported_fmts: Sequence[str] = ()) -> None:
1034 assert not (supported_fmts and unsupported_fmts)
1035
1036 if 'generic' in supported_fmts and \
1037 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1038 # similar to
1039 # _supported_fmt generic
1040 # for bash tests
1041 if imgfmt == 'luks':
1042 verify_working_luks()
1043 return
1044
1045 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1046 if not_sup or (imgfmt in unsupported_fmts):
1047 notrun('not suitable for this image format: %s' % imgfmt)
1048
1049 if imgfmt == 'luks':
1050 verify_working_luks()
1051
1052 def _verify_protocol(supported: Sequence[str] = (),
1053 unsupported: Sequence[str] = ()) -> None:
1054 assert not (supported and unsupported)
1055
1056 if 'generic' in supported:
1057 return
1058
1059 not_sup = supported and (imgproto not in supported)
1060 if not_sup or (imgproto in unsupported):
1061 notrun('not suitable for this protocol: %s' % imgproto)
1062
1063 def _verify_platform(supported: Sequence[str] = (),
1064 unsupported: Sequence[str] = ()) -> None:
1065 if any((sys.platform.startswith(x) for x in unsupported)):
1066 notrun('not suitable for this OS: %s' % sys.platform)
1067
1068 if supported:
1069 if not any((sys.platform.startswith(x) for x in supported)):
1070 notrun('not suitable for this OS: %s' % sys.platform)
1071
1072 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1073 if supported_cache_modes and (cachemode not in supported_cache_modes):
1074 notrun('not suitable for this cache mode: %s' % cachemode)
1075
1076 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1077 if supported_aio_modes and (aiomode not in supported_aio_modes):
1078 notrun('not suitable for this aio mode: %s' % aiomode)
1079
1080 def supports_quorum():
1081 return 'quorum' in qemu_img_pipe('--help')
1082
1083 def verify_quorum():
1084 '''Skip test suite if quorum support is not available'''
1085 if not supports_quorum():
1086 notrun('quorum support missing')
1087
1088 def has_working_luks() -> Tuple[bool, str]:
1089 """
1090 Check whether our LUKS driver can actually create images
1091 (this extends to LUKS encryption for qcow2).
1092
1093 If not, return the reason why.
1094 """
1095
1096 img_file = f'{test_dir}/luks-test.luks'
1097 (output, status) = \
1098 qemu_img_pipe_and_status('create', '-f', 'luks',
1099 '--object', luks_default_secret_object,
1100 '-o', luks_default_key_secret_opt,
1101 '-o', 'iter-time=10',
1102 img_file, '1G')
1103 try:
1104 os.remove(img_file)
1105 except OSError:
1106 pass
1107
1108 if status != 0:
1109 reason = output
1110 for line in output.splitlines():
1111 if img_file + ':' in line:
1112 reason = line.split(img_file + ':', 1)[1].strip()
1113 break
1114
1115 return (False, reason)
1116 else:
1117 return (True, '')
1118
1119 def verify_working_luks():
1120 """
1121 Skip test suite if LUKS does not work
1122 """
1123 (working, reason) = has_working_luks()
1124 if not working:
1125 notrun(reason)
1126
1127 def qemu_pipe(*args):
1128 """
1129 Run qemu with an option to print something and exit (e.g. a help option).
1130
1131 :return: QEMU's stdout output.
1132 """
1133 args = [qemu_prog] + qemu_opts + list(args)
1134 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1135 stderr=subprocess.STDOUT,
1136 universal_newlines=True)
1137 output = subp.communicate()[0]
1138 if subp.returncode < 0:
1139 sys.stderr.write('qemu received signal %i: %s\n' %
1140 (-subp.returncode, ' '.join(args)))
1141 return output
1142
1143 def supported_formats(read_only=False):
1144 '''Set 'read_only' to True to check ro-whitelist
1145 Otherwise, rw-whitelist is checked'''
1146
1147 if not hasattr(supported_formats, "formats"):
1148 supported_formats.formats = {}
1149
1150 if read_only not in supported_formats.formats:
1151 format_message = qemu_pipe("-drive", "format=help")
1152 line = 1 if read_only else 0
1153 supported_formats.formats[read_only] = \
1154 format_message.splitlines()[line].split(":")[1].split()
1155
1156 return supported_formats.formats[read_only]
1157
1158 def skip_if_unsupported(required_formats=(), read_only=False):
1159 '''Skip Test Decorator
1160 Runs the test if all the required formats are whitelisted'''
1161 def skip_test_decorator(func):
1162 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1163 **kwargs: Dict[str, Any]) -> None:
1164 if callable(required_formats):
1165 fmts = required_formats(test_case)
1166 else:
1167 fmts = required_formats
1168
1169 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1170 if usf_list:
1171 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1172 test_case.case_skip(msg)
1173 else:
1174 func(test_case, *args, **kwargs)
1175 return func_wrapper
1176 return skip_test_decorator
1177
1178 def skip_for_formats(formats: Sequence[str] = ()) \
1179 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1180 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1181 '''Skip Test Decorator
1182 Skips the test for the given formats'''
1183 def skip_test_decorator(func):
1184 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1185 **kwargs: Dict[str, Any]) -> None:
1186 if imgfmt in formats:
1187 msg = f'{test_case}: Skipped for format {imgfmt}'
1188 test_case.case_skip(msg)
1189 else:
1190 func(test_case, *args, **kwargs)
1191 return func_wrapper
1192 return skip_test_decorator
1193
1194 def skip_if_user_is_root(func):
1195 '''Skip Test Decorator
1196 Runs the test only without root permissions'''
1197 def func_wrapper(*args, **kwargs):
1198 if os.getuid() == 0:
1199 case_notrun('{}: cannot be run as root'.format(args[0]))
1200 return None
1201 else:
1202 return func(*args, **kwargs)
1203 return func_wrapper
1204
1205 def execute_unittest(debug=False):
1206 """Executes unittests within the calling module."""
1207
1208 verbosity = 2 if debug else 1
1209
1210 if debug:
1211 output = sys.stdout
1212 else:
1213 # We need to filter out the time taken from the output so that
1214 # qemu-iotest can reliably diff the results against master output.
1215 output = io.StringIO()
1216
1217 runner = unittest.TextTestRunner(stream=output, descriptions=True,
1218 verbosity=verbosity)
1219 try:
1220 # unittest.main() will use sys.exit(); so expect a SystemExit
1221 # exception
1222 unittest.main(testRunner=runner)
1223 finally:
1224 # We need to filter out the time taken from the output so that
1225 # qemu-iotest can reliably diff the results against master output.
1226 if not debug:
1227 out = output.getvalue()
1228 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1229
1230 # Hide skipped tests from the reference output
1231 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1232 out_first_line, out_rest = out.split('\n', 1)
1233 out = out_first_line.replace('s', '.') + '\n' + out_rest
1234
1235 sys.stderr.write(out)
1236
1237 def execute_setup_common(supported_fmts: Sequence[str] = (),
1238 supported_platforms: Sequence[str] = (),
1239 supported_cache_modes: Sequence[str] = (),
1240 supported_aio_modes: Sequence[str] = (),
1241 unsupported_fmts: Sequence[str] = (),
1242 supported_protocols: Sequence[str] = (),
1243 unsupported_protocols: Sequence[str] = ()) -> bool:
1244 """
1245 Perform necessary setup for either script-style or unittest-style tests.
1246
1247 :return: Bool; Whether or not debug mode has been requested via the CLI.
1248 """
1249 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1250
1251 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1252 # indicate that we're not being run via "check". There may be
1253 # other things set up by "check" that individual test cases rely
1254 # on.
1255 if test_dir is None or qemu_default_machine is None:
1256 sys.stderr.write('Please run this test via the "check" script\n')
1257 sys.exit(os.EX_USAGE)
1258
1259 debug = '-d' in sys.argv
1260 if debug:
1261 sys.argv.remove('-d')
1262 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1263
1264 _verify_image_format(supported_fmts, unsupported_fmts)
1265 _verify_protocol(supported_protocols, unsupported_protocols)
1266 _verify_platform(supported=supported_platforms)
1267 _verify_cache_mode(supported_cache_modes)
1268 _verify_aio_mode(supported_aio_modes)
1269
1270 return debug
1271
1272 def execute_test(*args, test_function=None, **kwargs):
1273 """Run either unittest or script-style tests."""
1274
1275 debug = execute_setup_common(*args, **kwargs)
1276 if not test_function:
1277 execute_unittest(debug)
1278 else:
1279 test_function()
1280
1281 def activate_logging():
1282 """Activate iotests.log() output to stdout for script-style tests."""
1283 handler = logging.StreamHandler(stream=sys.stdout)
1284 formatter = logging.Formatter('%(message)s')
1285 handler.setFormatter(formatter)
1286 test_logger.addHandler(handler)
1287 test_logger.setLevel(logging.INFO)
1288 test_logger.propagate = False
1289
1290 # This is called from script-style iotests without a single point of entry
1291 def script_initialize(*args, **kwargs):
1292 """Initialize script-style tests without running any tests."""
1293 activate_logging()
1294 execute_setup_common(*args, **kwargs)
1295
1296 # This is called from script-style iotests with a single point of entry
1297 def script_main(test_function, *args, **kwargs):
1298 """Run script-style tests outside of the unittest framework"""
1299 activate_logging()
1300 execute_test(*args, test_function=test_function, **kwargs)
1301
1302 # This is called from unittest style iotests
1303 def main(*args, **kwargs):
1304 """Run tests using the unittest framework"""
1305 execute_test(*args, **kwargs)