extractar/extractar.py
TheOnePath 939cf3c0da
Updated extractar
- removed `on_setattr` argument in attr.ib() call for `name` attribute
  in `class@ArchiveInfo`.
- update docstring for class@Archive.
- if `file=` provided with None, ValueError is raised instead of
  RuntimeError.
- changed attributes of class@Archive to be private.
- added setattr dunder method similar to class@ArchiveInfo, but all
  attributes are defined as constant. Any attempt to modify constants
  throws an AttributeError
- added property for attribute `__file` in class@Archive.
- refactored code to use os.path instead of pathlib. Pathlib was
  restrictive on not allowing byte-like strings, and is an inappropriate
  module for the task (os is just simpler).
- ensured that writing the byte contents from an archive to a new file
  occurs within a try-except statement, and raises an IOError if the
  write fails in some way due to IO.
- other minor amendments.
2023-06-30 22:51:36 +01:00

321 lines
13 KiB
Python

# -* licence notice *-
# Copyright (c) 2016, Remi Rampin
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or other materials provided with the
# distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import struct
import textwrap
from typing import Any
import attr
__version__ = '1.0'
def _utf8(s: str | bytes):
"""
Keeps bytes, converts unicode into UTF-8.
This is used for filenames, which the user may supply as unicode, but is always stored as bytes in the archive.
"""
return s if isinstance(s, bytes) else s.encode()
class ArchiveMagicBytesError(ValueError):
"""
An exception to be raised when an error regarding the magic byte sequence has been detected. Exception is of type
ValueError.
"""
class ArchiveBufferReadError(IndexError):
"""
An exception to be raised when not enough content has been read into the buffer. This is specific to reading the
Debian binary package entry information. Exception is of type IndexError.
"""
@attr.s(slots=True)
class ArchiveInfo:
"""
Information on a file in an archive.
This has the filename and all the metadata for a file in an archive.
It is returned by :meth:`~unix_ar.Archive.infolist()` and
:meth:`~unix_ar.Archive.getinfo()`, and can be passed when adding or
extracting a file to or from the archive.
Missing fields will be autocompleted when passed to `Archive`, but note that
things like `size` will be respected, allowing you to store or extract only
part of a file.
`ArchiveInfo` objects returned by `Archive` have the offset to the file in the
archive, allowing to extract the correct one even if multiple files with
the same name are present; if you change the `name` attribute, the initial
file will be extracted with the new name (and new metadata).
"""
name: bytes = attr.ib(converter=lambda s: _utf8(s).rstrip(b' '))
size: int = attr.ib(converter=int)
mtime: int = attr.ib(converter=int)
perms: int = attr.ib(converter=lambda x: int(x, 8))
uid: int = attr.ib(converter=int)
gid: int = attr.ib(converter=int)
offset: int = attr.ib(converter=int, default=0)
def __setattr__(self, __name: str, __value: Any) -> None:
"""
Custom setattr dunder method to create strict immutability over certain attributes of a class. Any attributes
which may be mutable should be handled appropriately.
"""
# |------ immutable ------| |---- mutable ----|
if not hasattr(self, __name) or __name == "offset":
object.__setattr__(self, __name, __value)
def __repr__(self) -> str:
return textwrap.dedent('''\
{7}: ({0})
name: {0}
size: {1}
mtime: {2}
perms: {3}
uid: {4}
gid: {5}
offset: {6}\n'''.format(self.name, self.size, self.mtime, self.perms, self.uid, self.gid, self.offset,
self.__class__))
@classmethod
def frombuffer(cls, buffer):
"""
Decode the archive header.
"""
# 0 16 File name ASCII
# 16 12 File modification timestamp Decimal
# 28 6 Owner ID Decimal
# 34 6 Group ID Decimal
# 40 8 File mode Octal
# 48 10 File size in bytes Decimal
# 58 2 File magic 0x60 0x0A
# unpack our values
name, mtime, uid, gid, perms, size, magic =\
struct.unpack('16s12s6s6s8s10s2s', buffer)
if magic != b'\x60\n':
raise ValueError("Invalid file signature")
return cls(name=name, size=size, mtime=mtime, perms=perms, uid=uid, gid=gid)
class Archive:
"""
An archive manager for extracting files which have been stored in Debian package binaries using the common format.
Replaces the extraction method of the `ar` Unix command, to be used on read-only archives.
Initialise the class with a string path to a file, and invoke the `open()` method to load the archive into memory.
After this, the other methods of the class may be used to act upon the archived information appropriately, such as
extracting entries from the archive.
"""
def __init__(self, file: str):
"""
Create an `Archive` from an opened file (in 'rb' mode only).
Initialisation is lazy. Please invoke the `open()` method to load the archive into memory.
"""
if file is None:
raise ValueError("File must not be None.")
self.__file = os.path.realpath(file)
self.__mapping: dict[bytes, ArchiveInfo] = {}
self.__ArByteStream: bytes
def __setattr__(self, __name: str, __value: Any) -> None:
"""
Custom setattr dunder method to create strict immutability over certain attributes of a class. Any attributes
which may be mutable should be handled appropriately.
"""
if not hasattr(self, __name):
object.__setattr__(self, __name, __value)
else:
raise AttributeError(f"The attribute {__name} is private and not intended to be modified.")
@property
def file(self):
return str(self.__file)
def open(self):
"""
Open the archive file and start reading its entries.
:@Note: that this function will load the contents of the enter program into memory during runtime. This prevents
the need to constantly open the file, or have the file handle open (dangerous), reducing I/O latency.
"""
# here we can set a guard for other methods to ensure that this method has been invoked before starting to
# operate on the archive. @Ethan: Python being Python, ensure the attribute is set to the class instance.
if not hasattr(self, '__instance__'):
setattr(self, '__instance__', self)
# firstly, ensure that the path exists on the system
if not os.path.exists(self.__file) :#self.__file.exists():
raise FileNotFoundError(f"The specified file: {self.__file}, could not be found.")
# secondly, ensure that the path is of type file, and accessible for reading
if not os.path.isfile(self.__file) and os.access(self.__file, mode=os.R_OK):
raise FileExistsError(f"The specified file: {self.__file}, is either a directory or not readable.")
# @Ethan: just never trust I/O operations fully
try:
# open the file in binary read-mode
with open(self.__file, 'rb') as fp:
# we want to read and dump everything into memory.
self.__ArByteStream = fp.read()
except IOError as err:
raise RuntimeError("There was an issue reading the archive file.") from err
self._read_entries() # start reading in the file entries
def _read_entries(self):
"""
Read entries of a given archive file of the Debian binary package format.
Method is protected and invoked internally by the `open()` method, which ensures that the archive is loaded into
memory. Note, attempts to call this function without calling loaded the archive into memory will result in a
RuntimeError to be raised.
"""
if (
not hasattr(self, '__instance__') and
getattr(self, '__instance__') is not self
):
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
CONTENT_LENGTH = len(self.__ArByteStream)
if CONTENT_LENGTH < 8:
raise ArchiveMagicBytesError("The loaded archive size is less than the magic bytes.")
# check the magic bytes
if self.__ArByteStream[:8] != b'!<arch>\n':
raise ArchiveMagicBytesError("Invalid archive signature")
# define some variable constants
HEADER_LENGTH = 60
# the first 8 bytes are the magic number, so content is 8 bytes afterwards, and start the position there.
pos = 8
while pos != CONTENT_LENGTH: # keep going until we reach the end of the content
# read in 60 bytes of data (length of headers in Debian archive)
buffer = self.__ArByteStream[pos:pos+HEADER_LENGTH]
# check to ensure that the archive isn't truncated
if len(buffer) != HEADER_LENGTH:
raise ArchiveBufferReadError(
"Buffer does not contain enough content to read next entry. Truncated archive?"
)
# translate byte buffer to readable information about the entry
member = ArchiveInfo.frombuffer(buffer)
member.offset = pos
# add our entry to the mappings dict (key: name of member, value: `:class:@ArchiveInfo`)
self.__mapping[member.name.removesuffix(b'/')] = member
# move the position in the content by the size of the member (with +1 if odd size)
pos += HEADER_LENGTH + member.size + (1 if member.size % 2 != 0 else 0)
def getinfo(self, member: str | bytes):
"""
Return an ArchiveInfo object of a specified `member=` of the archive file, if such member exists. If no member
can be found in the archive, an empty ArchiveInfo is returned.
@Params:
* member: str | bytes - a member file of the archive
@Returns: ArchiveInfo
"""
if _utf8(member) in self.__mapping:
return self.__mapping[_utf8(member)]
return ArchiveInfo(b"", 0, 0, 0, 0, 0)
def extract(self, file: str | bytes, path: str | bytes = ''):
"""
Extract a single file from the archive.
@Params:
* file: str | bytes - the file to be extracted from the archive
* path: str | bytes - output path for the archive (default: current working directory).
"""
# ensure that open() has been invoked before extracting
if (
not hasattr(self, '__instance__') and
getattr(self, '__instance__') is not self
):
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
member = self.getinfo(file)
if member.name == b"":
return
if not path or os.path.isdir(path):
path = os.path.join(_utf8(path), member.name).removesuffix(b'/')
# write out the specific content of an archive member
pos = member.offset + 60
try:
with open(self.__file, 'wb') as fp:
fp.write(self.__ArByteStream[pos:pos+member.size])
except IOError:
raise IOError(f"There was an issue when writing the bytes to file: {path}.\n",
"Please be wary of any mangled or corrupt files.")
def extractall(self, path: str | bytes = ''):
"""
Extract all the files in the archive.
@Param:
* path: str | bytes - output path for the archive (default: current working directory).
"""
# ensure that open() has been invoked before extracting
if (
not hasattr(self, '__instance__') and
getattr(self, '__instance__') is not self
):
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
# iterate over all members in the mapping
for name in self.__mapping.keys():
self.extract(name, os.path.join(_utf8(path), name))