# -*- coding: utf-8 -*-
"""Main module."""
import logging
import os
try:
from urllib.request import urlretrieve
except ImportError:
from urllib import urlretrieve
import click
import patoolib
import shutil
from tqdm import tqdm
from frkl import get_full_url
from frutils import is_url_or_abbrev
from .exceptions import LupkgProcessingException
log = logging.getLogger("lucify")
IGNORE_FILE_NAMES = ["install", "readme"]
[docs]class TqdmUpTo(tqdm):
"""Alternative Class-based version of the above.
Provides `update_to(n)` which uses `tqdm.update(delta_n)`.
Inspired by [twine#242](https://github.com/pypa/twine/pull/242),
[here](https://github.com/pypa/twine/commit/42e55e06).
"""
[docs] def update_to(self, b=1, bsize=1, tsize=None):
"""
b : int, optional
Number of blocks transferred so far [default: 1].
bsize : int, optional
Size of each block (in tqdm units) [default: 1].
tsize : int, optional
Total size (in tqdm units). If [default: None] remains unchanged.
"""
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n) # will also set self.n = b * bsize
[docs]def download_file(url, file_name, target_dir, output_callback=None):
item = {}
item["target_dir"] = target_dir
log.debug("Processing: {}".format(url))
item["url"] = url
item["file_name"] = file_name
target_file = os.path.join(target_dir, file_name)
item["target_file"] = target_file
if os.path.exists(target_file):
raise LupkgProcessingException("Target file exists: {}".format(target_file))
if not os.path.exists(target_dir):
log.debug("Creating target dir: {}".format(target_dir))
os.makedirs(target_dir)
elif os.path.isfile(os.path.realpath(target_dir)):
raise LupkgProcessingException(
"Target directory exists, but is not directory: {}".format(target_dir)
)
if not is_url_or_abbrev(url):
log.debug("copying '{}'".format(url))
shutil.copyfile(url, target_file)
pass
else:
log.debug("downloading '{}'".format(url))
url_full = get_full_url(url).rstrip("\r\n")
log.debug("expanded url: {}".format(url_full))
with TqdmUpTo(
disable=output_callback.is_silent(),
unit="B",
unit_scale=True,
unit_divisor=1024,
miniters=1,
desc=" => {}".format(file_name),
) as t:
urlretrieve(
url_full, filename=target_file, reporthook=t.update_to, data=None
)
return item
[docs]def unpack_file(url, file_name, archive, archive_format=None):
parent_dir = os.path.dirname(archive)
archive_dir = os.path.join(parent_dir, "__extracted__")
if archive_format is None:
if url.endswith(".tar.gz") or url.endswith(".tgz"):
archive_format = "gztar"
elif url.endswith(".tar.bz2"):
archive_format = "bztar"
elif url.endswith(".zip"):
archive_format = "zip"
elif url.endswith(".tar"):
archive_format = "tar"
else:
raise LupkgProcessingException(
"Can't calculate format of archive. Please provide the 'format' key under 'unpack' in your package description."
)
log.debug("Using unpack format: {}".format(archive_format))
if os.path.exists(archive_dir):
if not os.path.isdir(os.path.realpath(archive_dir)):
raise LupkgProcessingException(
"Can't use extract folder, as it's not a directory: {}".format(
archive_dir
)
)
else:
os.makedirs(archive_dir)
log.debug("Extracting: {}".format(archive))
patoolib.extract_archive(archive, verbosity=-1, outdir=archive_dir)
log.debug("Unpacking finished.")
return archive_dir
[docs]def find_pkg_file_in_archive_dir(file_name, archive_dir, archive_file):
log.debug(("Trying to find file '{}' in folder: {}".format(file_name, archive_dir)))
file_in_root = os.path.join(archive_dir, file_name)
if os.path.exists(file_in_root) and os.path.isfile(os.path.realpath(file_in_root)):
return file_in_root
potential_matches = []
for f in os.listdir(archive_dir):
p = os.path.join(archive_dir, f)
if os.path.isdir(p):
for root, dirs, files in os.walk(p):
for f in files:
if f == file_name:
# we assume this is definitely the file we want
potential_matches.append(os.path.join(root, file_name))
break
elif f.startswith(file_name):
potential_matches.append(os.path.join(root, f))
else:
if f.startswith(file_name):
potential_matches.append(os.path.join(root, f))
if p.lower() in IGNORE_FILE_NAMES:
continue
potential_matches.append(f)
if len(potential_matches) == 0:
raise LupkgProcessingException(
"Can't determine which file to unpack from file '{}'. Can't continue.".format(
archive_file
)
)
elif len(potential_matches) == 1:
log.debug(
"Can't find file '{}' in archive, using only potential match instead: {}".format(
file_name, potential_matches[0]
)
)
source = os.path.join(archive_dir, potential_matches[0])
else:
match = False
for m in potential_matches:
fn = os.path.basename(m)
if fn == file_name:
source = os.path.join(archive_dir, m)
match = True
if not match:
for m in potential_matches:
if os.path.basename(m).startswith(file_name):
log.debug(
"- can't find file '{}' in archive, using first potential match instead: {}".format(
file_name, m
)
)
source = os.path.join(archive_dir, potential_matches[0])
break
else:
raise LupkgProcessingException(
"Can't determine which file to unpack from archive '{}', too many potential matches. Can't continue".format(
file_name
)
)
return source
[docs]def ensure_base_path(path, create_folder=False):
"""Utility method to manage the target path.
"""
real_path = os.path.realpath(os.path.expanduser(path))
if os.path.exists(real_path) and not os.path.isdir(real_path):
raise Exception("Target is file, not dir: {}".format(real_path))
if create_folder and not os.path.exists(real_path):
os.makedirs(real_path)
return real_path