diff --git a/extractar.py b/extractar.py new file mode 100644 index 0000000..52e65d3 --- /dev/null +++ b/extractar.py @@ -0,0 +1,290 @@ +# -* licence notice *- +# Copyright (c) 2016, Remi Rampin +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os +import struct +import textwrap +import pathlib + +__version__ = '1.0' + + +def _utf8(s: str | bytes): + """ + Keeps bytes, converts unicode into UTF-8. + + This is used for filenames, which the user may supply as unicode, but is always stored as bytes in the archive. + """ + return s if isinstance(s, bytes) else s.encode() + + +class ArchiveMagicBytesError(ValueError): + """ + An exception to be raised when an error regarding the magic byte sequence has been detected. Exception is of type + ValueError. + """ + + +class ArchiveBufferReadError(IndexError): + """ + An exception to be raised when not enough content has been read into the buffer. This is specific to reading the + Debian binary package entry information. Exception is of type IndexError. + """ + + +class ArchiveInfo: + """ + Information on a file in an archive. + + This has the filename and all the metadata for a file in an archive. + + It is returned by :meth:`~unix_ar.Archive.infolist()` and + :meth:`~unix_ar.Archive.getinfo()`, and can be passed when adding or + extracting a file to or from the archive. + + Missing fields will be autocompleted when passed to `Archive`, but note that + things like `size` will be respected, allowing you to store or extract only + part of a file. + + `ArchiveInfo` objects returned by `Archive` have the offset to the file in the + archive, allowing to extract the correct one even if multiple files with + the same name are present; if you change the `name` attribute, the initial + file will be extracted with the new name (and new metadata). + """ + def __init__(self, name: bytes, size: int, mtime: int, perms: int, uid: int, gid: int): + self._name = name + self.size = size + self.mtime = mtime + self.perms = perms + self.uid = uid + self.gid = gid + self.offset: int = 0 + + + @property + def name(self): + return self._name + + + @classmethod + def frombuffer(cls, buffer): + """ + Decode the archive header. + """ + # 0 16 File name ASCII + # 16 12 File modification timestamp Decimal + # 28 6 Owner ID Decimal + # 34 6 Group ID Decimal + # 40 8 File mode Octal + # 48 10 File size in bytes Decimal + # 58 2 File magic 0x60 0x0A + + # unpack our values + name, mtime, uid, gid, perms, size, magic =\ + struct.unpack('16s12s6s6s8s10s2s', buffer) + + if magic != b'\x60\n': + raise ValueError("Invalid file signature") + + return cls(_utf8(name).rstrip(b' '), int(size, 10), int(mtime, 10), int(perms, 8), int(uid, 10), int(gid, 10)) + + + def __repr__(self) -> str: + return textwrap.dedent('''\ + ArchiveInfo: ({0}) + _name: {0} + size: {1} + mtime: {2} + perms: {3} + uid: {4} + gid: {5} + offset: {6}\n'''.format(self._name, self.size, self.mtime, self.perms, self.uid, self.gid, self.offset)) + + +class Archive: + """ + An UNIX ar archive. + + """ + def __init__(self, file: str): + """ + Create an `Archive` from an opened file (in 'rb' mode only). + + Initialisation is lazy. Please invoke the `open()` method to load the archive into memory. + """ + if file is None: + raise RuntimeError("File must not be None.") + + self._file = pathlib.Path(file) + self._mapping: dict[bytes, ArchiveInfo] = {} + self.__ar_contents: bytes + + + def open(self): + """ + Open the archive file and start reading its entries. + + :@Note: that this function will load the contents of the enter program into memory during runtime. This prevents + the need to constantly open the file, or have the file handle open (dangerous), reducing I/O latency. + """ + # here we can set a guard for other methods to ensure that this method has been invoked before starting to + # operate on the archive. @Ethan: Python being Python, ensure the attribute is set to the class instance. + if not hasattr(self, '__instance__'): + setattr(self, '__instance__', self) + + # firstly, ensure that the path exists on the system + if not self._file.exists(): + raise FileNotFoundError(f"The specified file: {self._file}, could not be found.") + + # secondly, ensure that the path is of type file, and accessible for reading + if not pathlib.Path(self._file).is_file() and os.access(self._file, mode=os.R_OK): + raise FileExistsError(f"The specified file: {self._file}, is either a directory or not readable.") + + # @Ethan: just never trust I/O operations fully + try: + # open the file in binary read-mode + with open(self._file, 'rb') as fp: + # we want to read and dump everything into memory. + self.__ar_contents = fp.read() + except IOError as err: + raise RuntimeError("There was an issue reading the archive file.") from err + + self._read_entries() # start reading in the file entries + + + def _read_entries(self): + """ + Read entries of a given archive file of the Debian binary package format. + + Method is protected and invoked internally by the `open()` method, which ensures that the archive is loaded into + memory. Note, attempts to call this function without calling loaded the archive into memory will result in a + RuntimeError to be raised. + """ + if ( + not hasattr(self, '__instance__') and + getattr(self, '__instance__') is not self + ): + raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.") + + CONTENT_LENGTH = len(self.__ar_contents) + if CONTENT_LENGTH < 8: + raise ArchiveMagicBytesError("The loaded archive size is less than the magic bytes.") + + # check the magic bytes + if self.__ar_contents[:8] != b'!\n': + raise ArchiveMagicBytesError("Invalid archive signature") + + # define some variable constants + HEADER_LENGTH = 60 + + # the first 8 bytes are the magic number, so content is 8 bytes afterwards, and start the position there. + pos = 8 + while pos != CONTENT_LENGTH: # keep going until we reach the end of the content + # read in 60 bytes of data (length of headers in Debian archive) + buffer = self.__ar_contents[pos:pos+HEADER_LENGTH] + # check to ensure that the archive isn't truncated + if len(buffer) != HEADER_LENGTH: + raise ArchiveBufferReadError( + "Buffer does not contain enough content to read next entry. Truncated archive?" + ) + + # translate byte buffer to readable information about the entry + member = ArchiveInfo.frombuffer(buffer) + member.offset = pos + + # add our entry to the mappings dict (key: name of member, value: `:class:@ArchiveInfo`) + self._mapping[member.name.removesuffix(b'/')] = member + + # move the position in the content by the size of the member (with +1 if odd size) + pos += HEADER_LENGTH + member.size + (1 if member.size % 2 != 0 else 0) + + + def getinfo(self, member: str | bytes): + """ + Return an ArchiveInfo object of a specified `member=` of the archive file, if such member exists. If no member + can be found in the archive, an empty ArchiveInfo is returned. + + @Params: + * member: str | bytes - a member file of the archive + + @Returns: ArchiveInfo + """ + if _utf8(member) in self._mapping: + return self._mapping[_utf8(member)] + + return ArchiveInfo(b"", 0, 0, 0, 0, 0) + + + def extract(self, file: str | bytes, path: str | bytes = ''): + """ + Extract a single file from the archive. + + @Params: + * file: str | bytes - the file to be extracted from the archive + * path: str | bytes - output path for the archive (default: current working directory). + """ + # ensure that open() has been invoked before extracting + if ( + not hasattr(self, '__instance__') and + getattr(self, '__instance__') is not self + ): + raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.") + + + member = self.getinfo(file) + if member.name == b"": + return + + if not path or os.path.isdir(path): + path = os.path.join(_utf8(path), member.name).removesuffix(b'/') + + # write out the specific content of an archive member + pos = member.offset + 60 + with open(path, 'wb') as fp: + data = self.__ar_contents[pos:pos+member.size] + fp.write(data) + + + def extractall(self, path: str | bytes = ''): + """ + Extract all the files in the archive. + + @Param: + * path: str | bytes - output path for the archive (default: current working directory). + """ + # ensure that open() has been invoked before extracting + if ( + not hasattr(self, '__instance__') and + getattr(self, '__instance__') is not self + ): + raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.") + + # iterate over all members in the mapping + for name in self._mapping.keys(): + self.extract(name, os.path.join(_utf8(path), name)) +