From 939cf3c0da7e9ddc3151399255a4ab0df16614c2 Mon Sep 17 00:00:00 2001 From: TheOnePath Date: Fri, 30 Jun 2023 22:51:36 +0100 Subject: [PATCH] Updated extractar - removed `on_setattr` argument in attr.ib() call for `name` attribute in `class@ArchiveInfo`. - update docstring for class@Archive. - if `file=` provided with None, ValueError is raised instead of RuntimeError. - changed attributes of class@Archive to be private. - added setattr dunder method similar to class@ArchiveInfo, but all attributes are defined as constant. Any attempt to modify constants throws an AttributeError - added property for attribute `__file` in class@Archive. - refactored code to use os.path instead of pathlib. Pathlib was restrictive on not allowing byte-like strings, and is an inappropriate module for the task (os is just simpler). - ensured that writing the byte contents from an archive to a new file occurs within a try-except statement, and raises an IOError if the write fails in some way due to IO. - other minor amendments. --- extractar.py | 117 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 70 insertions(+), 47 deletions(-) diff --git a/extractar.py b/extractar.py index 1621f65..be437a6 100644 --- a/extractar.py +++ b/extractar.py @@ -30,7 +30,6 @@ import os import struct import textwrap -import pathlib from typing import Any import attr @@ -59,8 +58,9 @@ class ArchiveBufferReadError(IndexError): Debian binary package entry information. Exception is of type IndexError. """ + @attr.s(slots=True) -class ArchiveInfo(object): +class ArchiveInfo: """ Information on a file in an archive. @@ -79,7 +79,7 @@ class ArchiveInfo(object): the same name are present; if you change the `name` attribute, the initial file will be extracted with the new name (and new metadata). """ - name: bytes = attr.ib(converter=lambda s: _utf8(s).rstrip(b' '), on_setattr=attr.setters.NO_OP) + name: bytes = attr.ib(converter=lambda s: _utf8(s).rstrip(b' ')) size: int = attr.ib(converter=int) mtime: int = attr.ib(converter=int) perms: int = attr.ib(converter=lambda x: int(x, 8)) @@ -88,6 +88,29 @@ class ArchiveInfo(object): offset: int = attr.ib(converter=int, default=0) + def __setattr__(self, __name: str, __value: Any) -> None: + """ + Custom setattr dunder method to create strict immutability over certain attributes of a class. Any attributes + which may be mutable should be handled appropriately. + """ + # |------ immutable ------| |---- mutable ----| + if not hasattr(self, __name) or __name == "offset": + object.__setattr__(self, __name, __value) + + + def __repr__(self) -> str: + return textwrap.dedent('''\ + {7}: ({0}) + name: {0} + size: {1} + mtime: {2} + perms: {3} + uid: {4} + gid: {5} + offset: {6}\n'''.format(self.name, self.size, self.mtime, self.perms, self.uid, self.gid, self.offset, + self.__class__)) + + @classmethod def frombuffer(cls, buffer): """ @@ -111,33 +134,14 @@ class ArchiveInfo(object): return cls(name=name, size=size, mtime=mtime, perms=perms, uid=uid, gid=gid) - def __setattr__(self, __name: str, __value: Any) -> None: - """ - Custom setattr dunder method to create strict immutability over certain attributes of a class. Any attributes - which may be mutable should be handled appropriately. - """ - # |------ immutable ------| |---- mutable ----| - if not hasattr(self, __name) or __name == "offset": - object.__setattr__(self, __name, __value) - - - def __repr__(self) -> str: - return textwrap.dedent('''\ - {7}: ({0}) - _name: {0} - size: {1} - mtime: {2} - perms: {3} - uid: {4} - gid: {5} - offset: {6}\n'''.format(self.name, self.size, self.mtime, self.perms, self.uid, self.gid, self.offset, - self.__class__)) - - class Archive: """ - An UNIX ar archive. + An archive manager for extracting files which have been stored in Debian package binaries using the common format. + Replaces the extraction method of the `ar` Unix command, to be used on read-only archives. + Initialise the class with a string path to a file, and invoke the `open()` method to load the archive into memory. + After this, the other methods of the class may be used to act upon the archived information appropriately, such as + extracting entries from the archive. """ def __init__(self, file: str): """ @@ -146,11 +150,27 @@ class Archive: Initialisation is lazy. Please invoke the `open()` method to load the archive into memory. """ if file is None: - raise RuntimeError("File must not be None.") + raise ValueError("File must not be None.") - self._file = pathlib.Path(file) - self._mapping: dict[bytes, ArchiveInfo] = {} - self.__ar_contents: bytes + self.__file = os.path.realpath(file) + self.__mapping: dict[bytes, ArchiveInfo] = {} + self.__ArByteStream: bytes + + + def __setattr__(self, __name: str, __value: Any) -> None: + """ + Custom setattr dunder method to create strict immutability over certain attributes of a class. Any attributes + which may be mutable should be handled appropriately. + """ + if not hasattr(self, __name): + object.__setattr__(self, __name, __value) + else: + raise AttributeError(f"The attribute {__name} is private and not intended to be modified.") + + + @property + def file(self): + return str(self.__file) def open(self): @@ -166,19 +186,19 @@ class Archive: setattr(self, '__instance__', self) # firstly, ensure that the path exists on the system - if not self._file.exists(): - raise FileNotFoundError(f"The specified file: {self._file}, could not be found.") + if not os.path.exists(self.__file) :#self.__file.exists(): + raise FileNotFoundError(f"The specified file: {self.__file}, could not be found.") # secondly, ensure that the path is of type file, and accessible for reading - if not pathlib.Path(self._file).is_file() and os.access(self._file, mode=os.R_OK): - raise FileExistsError(f"The specified file: {self._file}, is either a directory or not readable.") + if not os.path.isfile(self.__file) and os.access(self.__file, mode=os.R_OK): + raise FileExistsError(f"The specified file: {self.__file}, is either a directory or not readable.") # @Ethan: just never trust I/O operations fully try: # open the file in binary read-mode - with open(self._file, 'rb') as fp: + with open(self.__file, 'rb') as fp: # we want to read and dump everything into memory. - self.__ar_contents = fp.read() + self.__ArByteStream = fp.read() except IOError as err: raise RuntimeError("There was an issue reading the archive file.") from err @@ -199,12 +219,12 @@ class Archive: ): raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.") - CONTENT_LENGTH = len(self.__ar_contents) + CONTENT_LENGTH = len(self.__ArByteStream) if CONTENT_LENGTH < 8: raise ArchiveMagicBytesError("The loaded archive size is less than the magic bytes.") # check the magic bytes - if self.__ar_contents[:8] != b'!\n': + if self.__ArByteStream[:8] != b'!\n': raise ArchiveMagicBytesError("Invalid archive signature") # define some variable constants @@ -214,7 +234,7 @@ class Archive: pos = 8 while pos != CONTENT_LENGTH: # keep going until we reach the end of the content # read in 60 bytes of data (length of headers in Debian archive) - buffer = self.__ar_contents[pos:pos+HEADER_LENGTH] + buffer = self.__ArByteStream[pos:pos+HEADER_LENGTH] # check to ensure that the archive isn't truncated if len(buffer) != HEADER_LENGTH: raise ArchiveBufferReadError( @@ -226,7 +246,7 @@ class Archive: member.offset = pos # add our entry to the mappings dict (key: name of member, value: `:class:@ArchiveInfo`) - self._mapping[member.name.removesuffix(b'/')] = member + self.__mapping[member.name.removesuffix(b'/')] = member # move the position in the content by the size of the member (with +1 if odd size) pos += HEADER_LENGTH + member.size + (1 if member.size % 2 != 0 else 0) @@ -242,8 +262,8 @@ class Archive: @Returns: ArchiveInfo """ - if _utf8(member) in self._mapping: - return self._mapping[_utf8(member)] + if _utf8(member) in self.__mapping: + return self.__mapping[_utf8(member)] return ArchiveInfo(b"", 0, 0, 0, 0, 0) @@ -273,9 +293,12 @@ class Archive: # write out the specific content of an archive member pos = member.offset + 60 - # @TODO: put in try cus IO be IO lmao - with open(path, 'wb') as fp: - fp.write(self.__ar_contents[pos:pos+member.size]) + try: + with open(self.__file, 'wb') as fp: + fp.write(self.__ArByteStream[pos:pos+member.size]) + except IOError: + raise IOError(f"There was an issue when writing the bytes to file: {path}.\n", + "Please be wary of any mangled or corrupt files.") def extractall(self, path: str | bytes = ''): @@ -293,5 +316,5 @@ class Archive: raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.") # iterate over all members in the mapping - for name in self._mapping.keys(): + for name in self.__mapping.keys(): self.extract(name, os.path.join(_utf8(path), name))