- removed `on_setattr` argument in attr.ib() call for `name` attribute in `class@ArchiveInfo`. - update docstring for class@Archive. - if `file=` provided with None, ValueError is raised instead of RuntimeError. - changed attributes of class@Archive to be private. - added setattr dunder method similar to class@ArchiveInfo, but all attributes are defined as constant. Any attempt to modify constants throws an AttributeError - added property for attribute `__file` in class@Archive. - refactored code to use os.path instead of pathlib. Pathlib was restrictive on not allowing byte-like strings, and is an inappropriate module for the task (os is just simpler). - ensured that writing the byte contents from an archive to a new file occurs within a try-except statement, and raises an IOError if the write fails in some way due to IO. - other minor amendments.
321 lines
13 KiB
Python
321 lines
13 KiB
Python
# -* licence notice *-
|
|
# Copyright (c) 2016, Remi Rampin
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice, this
|
|
# list of conditions and the following disclaimer.
|
|
#
|
|
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions and the following disclaimer in the documentation and/or other materials provided with the
|
|
# distribution.
|
|
#
|
|
# 3. Neither the name of the copyright holder nor the names of its contributors
|
|
# may be used to endorse or promote products derived from this software
|
|
# without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
|
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
|
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
|
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
import os
|
|
import struct
|
|
import textwrap
|
|
from typing import Any
|
|
import attr
|
|
|
|
__version__ = '1.0'
|
|
|
|
|
|
def _utf8(s: str | bytes):
|
|
"""
|
|
Keeps bytes, converts unicode into UTF-8.
|
|
|
|
This is used for filenames, which the user may supply as unicode, but is always stored as bytes in the archive.
|
|
"""
|
|
return s if isinstance(s, bytes) else s.encode()
|
|
|
|
|
|
class ArchiveMagicBytesError(ValueError):
|
|
"""
|
|
An exception to be raised when an error regarding the magic byte sequence has been detected. Exception is of type
|
|
ValueError.
|
|
"""
|
|
|
|
|
|
class ArchiveBufferReadError(IndexError):
|
|
"""
|
|
An exception to be raised when not enough content has been read into the buffer. This is specific to reading the
|
|
Debian binary package entry information. Exception is of type IndexError.
|
|
"""
|
|
|
|
|
|
@attr.s(slots=True)
|
|
class ArchiveInfo:
|
|
"""
|
|
Information on a file in an archive.
|
|
|
|
This has the filename and all the metadata for a file in an archive.
|
|
|
|
It is returned by :meth:`~unix_ar.Archive.infolist()` and
|
|
:meth:`~unix_ar.Archive.getinfo()`, and can be passed when adding or
|
|
extracting a file to or from the archive.
|
|
|
|
Missing fields will be autocompleted when passed to `Archive`, but note that
|
|
things like `size` will be respected, allowing you to store or extract only
|
|
part of a file.
|
|
|
|
`ArchiveInfo` objects returned by `Archive` have the offset to the file in the
|
|
archive, allowing to extract the correct one even if multiple files with
|
|
the same name are present; if you change the `name` attribute, the initial
|
|
file will be extracted with the new name (and new metadata).
|
|
"""
|
|
name: bytes = attr.ib(converter=lambda s: _utf8(s).rstrip(b' '))
|
|
size: int = attr.ib(converter=int)
|
|
mtime: int = attr.ib(converter=int)
|
|
perms: int = attr.ib(converter=lambda x: int(x, 8))
|
|
uid: int = attr.ib(converter=int)
|
|
gid: int = attr.ib(converter=int)
|
|
offset: int = attr.ib(converter=int, default=0)
|
|
|
|
|
|
def __setattr__(self, __name: str, __value: Any) -> None:
|
|
"""
|
|
Custom setattr dunder method to create strict immutability over certain attributes of a class. Any attributes
|
|
which may be mutable should be handled appropriately.
|
|
"""
|
|
# |------ immutable ------| |---- mutable ----|
|
|
if not hasattr(self, __name) or __name == "offset":
|
|
object.__setattr__(self, __name, __value)
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
return textwrap.dedent('''\
|
|
{7}: ({0})
|
|
name: {0}
|
|
size: {1}
|
|
mtime: {2}
|
|
perms: {3}
|
|
uid: {4}
|
|
gid: {5}
|
|
offset: {6}\n'''.format(self.name, self.size, self.mtime, self.perms, self.uid, self.gid, self.offset,
|
|
self.__class__))
|
|
|
|
|
|
@classmethod
|
|
def frombuffer(cls, buffer):
|
|
"""
|
|
Decode the archive header.
|
|
"""
|
|
# 0 16 File name ASCII
|
|
# 16 12 File modification timestamp Decimal
|
|
# 28 6 Owner ID Decimal
|
|
# 34 6 Group ID Decimal
|
|
# 40 8 File mode Octal
|
|
# 48 10 File size in bytes Decimal
|
|
# 58 2 File magic 0x60 0x0A
|
|
|
|
# unpack our values
|
|
name, mtime, uid, gid, perms, size, magic =\
|
|
struct.unpack('16s12s6s6s8s10s2s', buffer)
|
|
|
|
if magic != b'\x60\n':
|
|
raise ValueError("Invalid file signature")
|
|
|
|
return cls(name=name, size=size, mtime=mtime, perms=perms, uid=uid, gid=gid)
|
|
|
|
|
|
class Archive:
|
|
"""
|
|
An archive manager for extracting files which have been stored in Debian package binaries using the common format.
|
|
Replaces the extraction method of the `ar` Unix command, to be used on read-only archives.
|
|
|
|
Initialise the class with a string path to a file, and invoke the `open()` method to load the archive into memory.
|
|
After this, the other methods of the class may be used to act upon the archived information appropriately, such as
|
|
extracting entries from the archive.
|
|
"""
|
|
def __init__(self, file: str):
|
|
"""
|
|
Create an `Archive` from an opened file (in 'rb' mode only).
|
|
|
|
Initialisation is lazy. Please invoke the `open()` method to load the archive into memory.
|
|
"""
|
|
if file is None:
|
|
raise ValueError("File must not be None.")
|
|
|
|
self.__file = os.path.realpath(file)
|
|
self.__mapping: dict[bytes, ArchiveInfo] = {}
|
|
self.__ArByteStream: bytes
|
|
|
|
|
|
def __setattr__(self, __name: str, __value: Any) -> None:
|
|
"""
|
|
Custom setattr dunder method to create strict immutability over certain attributes of a class. Any attributes
|
|
which may be mutable should be handled appropriately.
|
|
"""
|
|
if not hasattr(self, __name):
|
|
object.__setattr__(self, __name, __value)
|
|
else:
|
|
raise AttributeError(f"The attribute {__name} is private and not intended to be modified.")
|
|
|
|
|
|
@property
|
|
def file(self):
|
|
return str(self.__file)
|
|
|
|
|
|
def open(self):
|
|
"""
|
|
Open the archive file and start reading its entries.
|
|
|
|
:@Note: that this function will load the contents of the enter program into memory during runtime. This prevents
|
|
the need to constantly open the file, or have the file handle open (dangerous), reducing I/O latency.
|
|
"""
|
|
# here we can set a guard for other methods to ensure that this method has been invoked before starting to
|
|
# operate on the archive. @Ethan: Python being Python, ensure the attribute is set to the class instance.
|
|
if not hasattr(self, '__instance__'):
|
|
setattr(self, '__instance__', self)
|
|
|
|
# firstly, ensure that the path exists on the system
|
|
if not os.path.exists(self.__file) :#self.__file.exists():
|
|
raise FileNotFoundError(f"The specified file: {self.__file}, could not be found.")
|
|
|
|
# secondly, ensure that the path is of type file, and accessible for reading
|
|
if not os.path.isfile(self.__file) and os.access(self.__file, mode=os.R_OK):
|
|
raise FileExistsError(f"The specified file: {self.__file}, is either a directory or not readable.")
|
|
|
|
# @Ethan: just never trust I/O operations fully
|
|
try:
|
|
# open the file in binary read-mode
|
|
with open(self.__file, 'rb') as fp:
|
|
# we want to read and dump everything into memory.
|
|
self.__ArByteStream = fp.read()
|
|
except IOError as err:
|
|
raise RuntimeError("There was an issue reading the archive file.") from err
|
|
|
|
self._read_entries() # start reading in the file entries
|
|
|
|
|
|
def _read_entries(self):
|
|
"""
|
|
Read entries of a given archive file of the Debian binary package format.
|
|
|
|
Method is protected and invoked internally by the `open()` method, which ensures that the archive is loaded into
|
|
memory. Note, attempts to call this function without calling loaded the archive into memory will result in a
|
|
RuntimeError to be raised.
|
|
"""
|
|
if (
|
|
not hasattr(self, '__instance__') and
|
|
getattr(self, '__instance__') is not self
|
|
):
|
|
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
|
|
|
|
CONTENT_LENGTH = len(self.__ArByteStream)
|
|
if CONTENT_LENGTH < 8:
|
|
raise ArchiveMagicBytesError("The loaded archive size is less than the magic bytes.")
|
|
|
|
# check the magic bytes
|
|
if self.__ArByteStream[:8] != b'!<arch>\n':
|
|
raise ArchiveMagicBytesError("Invalid archive signature")
|
|
|
|
# define some variable constants
|
|
HEADER_LENGTH = 60
|
|
|
|
# the first 8 bytes are the magic number, so content is 8 bytes afterwards, and start the position there.
|
|
pos = 8
|
|
while pos != CONTENT_LENGTH: # keep going until we reach the end of the content
|
|
# read in 60 bytes of data (length of headers in Debian archive)
|
|
buffer = self.__ArByteStream[pos:pos+HEADER_LENGTH]
|
|
# check to ensure that the archive isn't truncated
|
|
if len(buffer) != HEADER_LENGTH:
|
|
raise ArchiveBufferReadError(
|
|
"Buffer does not contain enough content to read next entry. Truncated archive?"
|
|
)
|
|
|
|
# translate byte buffer to readable information about the entry
|
|
member = ArchiveInfo.frombuffer(buffer)
|
|
member.offset = pos
|
|
|
|
# add our entry to the mappings dict (key: name of member, value: `:class:@ArchiveInfo`)
|
|
self.__mapping[member.name.removesuffix(b'/')] = member
|
|
|
|
# move the position in the content by the size of the member (with +1 if odd size)
|
|
pos += HEADER_LENGTH + member.size + (1 if member.size % 2 != 0 else 0)
|
|
|
|
|
|
def getinfo(self, member: str | bytes):
|
|
"""
|
|
Return an ArchiveInfo object of a specified `member=` of the archive file, if such member exists. If no member
|
|
can be found in the archive, an empty ArchiveInfo is returned.
|
|
|
|
@Params:
|
|
* member: str | bytes - a member file of the archive
|
|
|
|
@Returns: ArchiveInfo
|
|
"""
|
|
if _utf8(member) in self.__mapping:
|
|
return self.__mapping[_utf8(member)]
|
|
|
|
return ArchiveInfo(b"", 0, 0, 0, 0, 0)
|
|
|
|
|
|
def extract(self, file: str | bytes, path: str | bytes = ''):
|
|
"""
|
|
Extract a single file from the archive.
|
|
|
|
@Params:
|
|
* file: str | bytes - the file to be extracted from the archive
|
|
* path: str | bytes - output path for the archive (default: current working directory).
|
|
"""
|
|
# ensure that open() has been invoked before extracting
|
|
if (
|
|
not hasattr(self, '__instance__') and
|
|
getattr(self, '__instance__') is not self
|
|
):
|
|
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
|
|
|
|
|
|
member = self.getinfo(file)
|
|
if member.name == b"":
|
|
return
|
|
|
|
if not path or os.path.isdir(path):
|
|
path = os.path.join(_utf8(path), member.name).removesuffix(b'/')
|
|
|
|
# write out the specific content of an archive member
|
|
pos = member.offset + 60
|
|
try:
|
|
with open(self.__file, 'wb') as fp:
|
|
fp.write(self.__ArByteStream[pos:pos+member.size])
|
|
except IOError:
|
|
raise IOError(f"There was an issue when writing the bytes to file: {path}.\n",
|
|
"Please be wary of any mangled or corrupt files.")
|
|
|
|
|
|
def extractall(self, path: str | bytes = ''):
|
|
"""
|
|
Extract all the files in the archive.
|
|
|
|
@Param:
|
|
* path: str | bytes - output path for the archive (default: current working directory).
|
|
"""
|
|
# ensure that open() has been invoked before extracting
|
|
if (
|
|
not hasattr(self, '__instance__') and
|
|
getattr(self, '__instance__') is not self
|
|
):
|
|
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
|
|
|
|
# iterate over all members in the mapping
|
|
for name in self.__mapping.keys():
|
|
self.extract(name, os.path.join(_utf8(path), name))
|