Commit 0903e3b3 authored by Vladimir Sementsov-Ogievskiy's avatar Vladimir Sementsov-Ogievskiy Committed by Eric Blake
Browse files

qcow2_format.py: separate generic functionality of structure classes



We are going to introduce more Qcow2 structure types, defined like
QcowHeader. Move generic functionality into base class to be reused for
further structure classes.

Signed-off-by: default avatarVladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Reviewed-by: default avatarAndrey Shinkevich <andrey.shinkevich@virtuozzo.com>
Message-Id: <20200606081806.23897-9-vsementsov@virtuozzo.com>
Signed-off-by: default avatarEric Blake <eblake@redhat.com>
parent 5432a0db
Loading
Loading
Loading
Loading
+66 −35
Original line number Diff line number Diff line
# Library for manipulations with qcow2 image
#
# Copyright (c) 2020 Virtuozzo International GmbH.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
@@ -18,6 +20,68 @@ import struct
import string


class Qcow2StructMeta(type):

    # Mapping from c types to python struct format
    ctypes = {
        'u8': 'B',
        'u16': 'H',
        'u32': 'I',
        'u64': 'Q'
    }

    def __init__(self, name, bases, attrs):
        if 'fields' in attrs:
            self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)


class Qcow2Struct(metaclass=Qcow2StructMeta):

    """Qcow2Struct: base class for qcow2 data structures

    Successors should define fields class variable, which is: list of tuples,
    each of three elements:
        - c-type (one of 'u8', 'u16', 'u32', 'u64')
        - format (format_spec to use with .format() when dump or 'mask' to dump
                  bitmasks)
        - field name
    """

    def __init__(self, fd=None, offset=None, data=None):
        """
        Two variants:
            1. Specify data. fd and offset must be None.
            2. Specify fd and offset, data must be None. offset may be omitted
               in this case, than current position of fd is used.
        """
        if data is None:
            assert fd is not None
            buf_size = struct.calcsize(self.fmt)
            if offset is not None:
                fd.seek(offset)
            data = fd.read(buf_size)
        else:
            assert fd is None and offset is None

        values = struct.unpack(self.fmt, data)
        self.__dict__ = dict((field[2], values[i])
                             for i, field in enumerate(self.fields))

    def dump(self):
        for f in self.fields:
            value = self.__dict__[f[2]]
            if f[1] == 'mask':
                bits = []
                for bit in range(64):
                    if value & (1 << bit):
                        bits.append(bit)
                value_str = str(bits)
            else:
                value_str = f[1].format(value)

            print('{:<25} {}'.format(f[2], value_str))


class QcowHeaderExtension:

    def __init__(self, magic, length, data):
@@ -34,16 +98,7 @@ class QcowHeaderExtension:
        return QcowHeaderExtension(magic, len(data), data)


# Mapping from c types to python struct format
ctypes = {
    'u8': 'B',
    'u16': 'H',
    'u32': 'I',
    'u64': 'Q'
}


class QcowHeader:
class QcowHeader(Qcow2Struct):

    fields = (
        # Version 2 header fields
@@ -69,18 +124,8 @@ class QcowHeader:
        ('u32', '{}', 'header_length'),
    )

    fmt = '>' + ''.join(ctypes[f[0]] for f in fields)

    def __init__(self, fd):

        buf_size = struct.calcsize(QcowHeader.fmt)

        fd.seek(0)
        buf = fd.read(buf_size)

        header = struct.unpack(QcowHeader.fmt, buf)
        self.__dict__ = dict((field[2], header[i])
                             for i, field in enumerate(QcowHeader.fields))
        super().__init__(fd=fd, offset=0)

        self.set_defaults()
        self.cluster_size = 1 << self.cluster_bits
@@ -148,20 +193,6 @@ class QcowHeader:
        buf = buf[0:header_bytes-1]
        fd.write(buf)

    def dump(self):
        for f in QcowHeader.fields:
            value = self.__dict__[f[2]]
            if f[1] == 'mask':
                bits = []
                for bit in range(64):
                    if value & (1 << bit):
                        bits.append(bit)
                value_str = str(bits)
            else:
                value_str = f[1].format(value)

            print(f'{f[2]:<25} {value_str}')

    def dump_extensions(self):
        for ex in self.extensions: