Added extractar.py
Version 1.0 release. Initial commit of library module. The module has the ability to open, read, and extract files from within an archived file, using the common format (Debian binary packages can be unarchived of its contents).
This commit is contained in:
parent
d6e7d79291
commit
50443715fc
290
extractar.py
Normal file
290
extractar.py
Normal file
|
|
@ -0,0 +1,290 @@
|
|||
# -* licence notice *-
|
||||
# Copyright (c) 2016, Remi Rampin
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice, this
|
||||
# list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
|
||||
#
|
||||
# 3. Neither the name of the copyright holder nor the names of its contributors
|
||||
# may be used to endorse or promote products derived from this software
|
||||
# without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
import os
|
||||
import struct
|
||||
import textwrap
|
||||
import pathlib
|
||||
|
||||
__version__ = '1.0'
|
||||
|
||||
|
||||
def _utf8(s: str | bytes):
|
||||
"""
|
||||
Keeps bytes, converts unicode into UTF-8.
|
||||
|
||||
This is used for filenames, which the user may supply as unicode, but is always stored as bytes in the archive.
|
||||
"""
|
||||
return s if isinstance(s, bytes) else s.encode()
|
||||
|
||||
|
||||
class ArchiveMagicBytesError(ValueError):
|
||||
"""
|
||||
An exception to be raised when an error regarding the magic byte sequence has been detected. Exception is of type
|
||||
ValueError.
|
||||
"""
|
||||
|
||||
|
||||
class ArchiveBufferReadError(IndexError):
|
||||
"""
|
||||
An exception to be raised when not enough content has been read into the buffer. This is specific to reading the
|
||||
Debian binary package entry information. Exception is of type IndexError.
|
||||
"""
|
||||
|
||||
|
||||
class ArchiveInfo:
|
||||
"""
|
||||
Information on a file in an archive.
|
||||
|
||||
This has the filename and all the metadata for a file in an archive.
|
||||
|
||||
It is returned by :meth:`~unix_ar.Archive.infolist()` and
|
||||
:meth:`~unix_ar.Archive.getinfo()`, and can be passed when adding or
|
||||
extracting a file to or from the archive.
|
||||
|
||||
Missing fields will be autocompleted when passed to `Archive`, but note that
|
||||
things like `size` will be respected, allowing you to store or extract only
|
||||
part of a file.
|
||||
|
||||
`ArchiveInfo` objects returned by `Archive` have the offset to the file in the
|
||||
archive, allowing to extract the correct one even if multiple files with
|
||||
the same name are present; if you change the `name` attribute, the initial
|
||||
file will be extracted with the new name (and new metadata).
|
||||
"""
|
||||
def __init__(self, name: bytes, size: int, mtime: int, perms: int, uid: int, gid: int):
|
||||
self._name = name
|
||||
self.size = size
|
||||
self.mtime = mtime
|
||||
self.perms = perms
|
||||
self.uid = uid
|
||||
self.gid = gid
|
||||
self.offset: int = 0
|
||||
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
|
||||
@classmethod
|
||||
def frombuffer(cls, buffer):
|
||||
"""
|
||||
Decode the archive header.
|
||||
"""
|
||||
# 0 16 File name ASCII
|
||||
# 16 12 File modification timestamp Decimal
|
||||
# 28 6 Owner ID Decimal
|
||||
# 34 6 Group ID Decimal
|
||||
# 40 8 File mode Octal
|
||||
# 48 10 File size in bytes Decimal
|
||||
# 58 2 File magic 0x60 0x0A
|
||||
|
||||
# unpack our values
|
||||
name, mtime, uid, gid, perms, size, magic =\
|
||||
struct.unpack('16s12s6s6s8s10s2s', buffer)
|
||||
|
||||
if magic != b'\x60\n':
|
||||
raise ValueError("Invalid file signature")
|
||||
|
||||
return cls(_utf8(name).rstrip(b' '), int(size, 10), int(mtime, 10), int(perms, 8), int(uid, 10), int(gid, 10))
|
||||
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return textwrap.dedent('''\
|
||||
ArchiveInfo: ({0})
|
||||
_name: {0}
|
||||
size: {1}
|
||||
mtime: {2}
|
||||
perms: {3}
|
||||
uid: {4}
|
||||
gid: {5}
|
||||
offset: {6}\n'''.format(self._name, self.size, self.mtime, self.perms, self.uid, self.gid, self.offset))
|
||||
|
||||
|
||||
class Archive:
|
||||
"""
|
||||
An UNIX ar archive.
|
||||
|
||||
"""
|
||||
def __init__(self, file: str):
|
||||
"""
|
||||
Create an `Archive` from an opened file (in 'rb' mode only).
|
||||
|
||||
Initialisation is lazy. Please invoke the `open()` method to load the archive into memory.
|
||||
"""
|
||||
if file is None:
|
||||
raise RuntimeError("File must not be None.")
|
||||
|
||||
self._file = pathlib.Path(file)
|
||||
self._mapping: dict[bytes, ArchiveInfo] = {}
|
||||
self.__ar_contents: bytes
|
||||
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
Open the archive file and start reading its entries.
|
||||
|
||||
:@Note: that this function will load the contents of the enter program into memory during runtime. This prevents
|
||||
the need to constantly open the file, or have the file handle open (dangerous), reducing I/O latency.
|
||||
"""
|
||||
# here we can set a guard for other methods to ensure that this method has been invoked before starting to
|
||||
# operate on the archive. @Ethan: Python being Python, ensure the attribute is set to the class instance.
|
||||
if not hasattr(self, '__instance__'):
|
||||
setattr(self, '__instance__', self)
|
||||
|
||||
# firstly, ensure that the path exists on the system
|
||||
if not self._file.exists():
|
||||
raise FileNotFoundError(f"The specified file: {self._file}, could not be found.")
|
||||
|
||||
# secondly, ensure that the path is of type file, and accessible for reading
|
||||
if not pathlib.Path(self._file).is_file() and os.access(self._file, mode=os.R_OK):
|
||||
raise FileExistsError(f"The specified file: {self._file}, is either a directory or not readable.")
|
||||
|
||||
# @Ethan: just never trust I/O operations fully
|
||||
try:
|
||||
# open the file in binary read-mode
|
||||
with open(self._file, 'rb') as fp:
|
||||
# we want to read and dump everything into memory.
|
||||
self.__ar_contents = fp.read()
|
||||
except IOError as err:
|
||||
raise RuntimeError("There was an issue reading the archive file.") from err
|
||||
|
||||
self._read_entries() # start reading in the file entries
|
||||
|
||||
|
||||
def _read_entries(self):
|
||||
"""
|
||||
Read entries of a given archive file of the Debian binary package format.
|
||||
|
||||
Method is protected and invoked internally by the `open()` method, which ensures that the archive is loaded into
|
||||
memory. Note, attempts to call this function without calling loaded the archive into memory will result in a
|
||||
RuntimeError to be raised.
|
||||
"""
|
||||
if (
|
||||
not hasattr(self, '__instance__') and
|
||||
getattr(self, '__instance__') is not self
|
||||
):
|
||||
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
|
||||
|
||||
CONTENT_LENGTH = len(self.__ar_contents)
|
||||
if CONTENT_LENGTH < 8:
|
||||
raise ArchiveMagicBytesError("The loaded archive size is less than the magic bytes.")
|
||||
|
||||
# check the magic bytes
|
||||
if self.__ar_contents[:8] != b'!<arch>\n':
|
||||
raise ArchiveMagicBytesError("Invalid archive signature")
|
||||
|
||||
# define some variable constants
|
||||
HEADER_LENGTH = 60
|
||||
|
||||
# the first 8 bytes are the magic number, so content is 8 bytes afterwards, and start the position there.
|
||||
pos = 8
|
||||
while pos != CONTENT_LENGTH: # keep going until we reach the end of the content
|
||||
# read in 60 bytes of data (length of headers in Debian archive)
|
||||
buffer = self.__ar_contents[pos:pos+HEADER_LENGTH]
|
||||
# check to ensure that the archive isn't truncated
|
||||
if len(buffer) != HEADER_LENGTH:
|
||||
raise ArchiveBufferReadError(
|
||||
"Buffer does not contain enough content to read next entry. Truncated archive?"
|
||||
)
|
||||
|
||||
# translate byte buffer to readable information about the entry
|
||||
member = ArchiveInfo.frombuffer(buffer)
|
||||
member.offset = pos
|
||||
|
||||
# add our entry to the mappings dict (key: name of member, value: `:class:@ArchiveInfo`)
|
||||
self._mapping[member.name.removesuffix(b'/')] = member
|
||||
|
||||
# move the position in the content by the size of the member (with +1 if odd size)
|
||||
pos += HEADER_LENGTH + member.size + (1 if member.size % 2 != 0 else 0)
|
||||
|
||||
|
||||
def getinfo(self, member: str | bytes):
|
||||
"""
|
||||
Return an ArchiveInfo object of a specified `member=` of the archive file, if such member exists. If no member
|
||||
can be found in the archive, an empty ArchiveInfo is returned.
|
||||
|
||||
@Params:
|
||||
* member: str | bytes - a member file of the archive
|
||||
|
||||
@Returns: ArchiveInfo
|
||||
"""
|
||||
if _utf8(member) in self._mapping:
|
||||
return self._mapping[_utf8(member)]
|
||||
|
||||
return ArchiveInfo(b"", 0, 0, 0, 0, 0)
|
||||
|
||||
|
||||
def extract(self, file: str | bytes, path: str | bytes = ''):
|
||||
"""
|
||||
Extract a single file from the archive.
|
||||
|
||||
@Params:
|
||||
* file: str | bytes - the file to be extracted from the archive
|
||||
* path: str | bytes - output path for the archive (default: current working directory).
|
||||
"""
|
||||
# ensure that open() has been invoked before extracting
|
||||
if (
|
||||
not hasattr(self, '__instance__') and
|
||||
getattr(self, '__instance__') is not self
|
||||
):
|
||||
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
|
||||
|
||||
|
||||
member = self.getinfo(file)
|
||||
if member.name == b"":
|
||||
return
|
||||
|
||||
if not path or os.path.isdir(path):
|
||||
path = os.path.join(_utf8(path), member.name).removesuffix(b'/')
|
||||
|
||||
# write out the specific content of an archive member
|
||||
pos = member.offset + 60
|
||||
with open(path, 'wb') as fp:
|
||||
data = self.__ar_contents[pos:pos+member.size]
|
||||
fp.write(data)
|
||||
|
||||
|
||||
def extractall(self, path: str | bytes = ''):
|
||||
"""
|
||||
Extract all the files in the archive.
|
||||
|
||||
@Param:
|
||||
* path: str | bytes - output path for the archive (default: current working directory).
|
||||
"""
|
||||
# ensure that open() has been invoked before extracting
|
||||
if (
|
||||
not hasattr(self, '__instance__') and
|
||||
getattr(self, '__instance__') is not self
|
||||
):
|
||||
raise RuntimeError(self, "has been partially initialised. Please invoke Archive@open(...) first.")
|
||||
|
||||
# iterate over all members in the mapping
|
||||
for name in self._mapping.keys():
|
||||
self.extract(name, os.path.join(_utf8(path), name))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user