Source code for lupkg.utils

# -*- coding: utf-8 -*-
"""Main module."""
import logging
import os

try:
    from urllib.request import urlretrieve
except ImportError:
    from urllib import urlretrieve

import click
import patoolib
import shutil
from tqdm import tqdm

from frkl import get_full_url
from frutils import is_url_or_abbrev

from .exceptions import LupkgProcessingException

log = logging.getLogger("lucify")

IGNORE_FILE_NAMES = ["install", "readme"]


[docs]class TqdmUpTo(tqdm): """Alternative Class-based version of the above. Provides `update_to(n)` which uses `tqdm.update(delta_n)`. Inspired by [twine#242](https://github.com/pypa/twine/pull/242), [here](https://github.com/pypa/twine/commit/42e55e06). """
[docs] def update_to(self, b=1, bsize=1, tsize=None): """ b : int, optional Number of blocks transferred so far [default: 1]. bsize : int, optional Size of each block (in tqdm units) [default: 1]. tsize : int, optional Total size (in tqdm units). If [default: None] remains unchanged. """ if tsize is not None: self.total = tsize self.update(b * bsize - self.n) # will also set self.n = b * bsize
[docs]def download_file(url, file_name, target_dir, output_callback=None): item = {} item["target_dir"] = target_dir log.debug("Processing: {}".format(url)) item["url"] = url item["file_name"] = file_name target_file = os.path.join(target_dir, file_name) item["target_file"] = target_file if os.path.exists(target_file): raise LupkgProcessingException("Target file exists: {}".format(target_file)) if not os.path.exists(target_dir): log.debug("Creating target dir: {}".format(target_dir)) os.makedirs(target_dir) elif os.path.isfile(os.path.realpath(target_dir)): raise LupkgProcessingException( "Target directory exists, but is not directory: {}".format(target_dir) ) if not is_url_or_abbrev(url): log.debug("copying '{}'".format(url)) shutil.copyfile(url, target_file) pass else: log.debug("downloading '{}'".format(url)) url_full = get_full_url(url).rstrip("\r\n") log.debug("expanded url: {}".format(url_full)) with TqdmUpTo( disable=output_callback.is_silent(), unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=" => {}".format(file_name), ) as t: urlretrieve( url_full, filename=target_file, reporthook=t.update_to, data=None ) return item
[docs]def unpack_file(url, file_name, archive, archive_format=None): parent_dir = os.path.dirname(archive) archive_dir = os.path.join(parent_dir, "__extracted__") if archive_format is None: if url.endswith(".tar.gz") or url.endswith(".tgz"): archive_format = "gztar" elif url.endswith(".tar.bz2"): archive_format = "bztar" elif url.endswith(".zip"): archive_format = "zip" elif url.endswith(".tar"): archive_format = "tar" else: raise LupkgProcessingException( "Can't calculate format of archive. Please provide the 'format' key under 'unpack' in your package description." ) log.debug("Using unpack format: {}".format(archive_format)) if os.path.exists(archive_dir): if not os.path.isdir(os.path.realpath(archive_dir)): raise LupkgProcessingException( "Can't use extract folder, as it's not a directory: {}".format( archive_dir ) ) else: os.makedirs(archive_dir) log.debug("Extracting: {}".format(archive)) patoolib.extract_archive(archive, verbosity=-1, outdir=archive_dir) log.debug("Unpacking finished.") return archive_dir
[docs]def find_pkg_file_in_archive_dir(file_name, archive_dir, archive_file): log.debug(("Trying to find file '{}' in folder: {}".format(file_name, archive_dir))) file_in_root = os.path.join(archive_dir, file_name) if os.path.exists(file_in_root) and os.path.isfile(os.path.realpath(file_in_root)): return file_in_root potential_matches = [] for f in os.listdir(archive_dir): p = os.path.join(archive_dir, f) if os.path.isdir(p): for root, dirs, files in os.walk(p): for f in files: if f == file_name: # we assume this is definitely the file we want potential_matches.append(os.path.join(root, file_name)) break elif f.startswith(file_name): potential_matches.append(os.path.join(root, f)) else: if f.startswith(file_name): potential_matches.append(os.path.join(root, f)) if p.lower() in IGNORE_FILE_NAMES: continue potential_matches.append(f) if len(potential_matches) == 0: raise LupkgProcessingException( "Can't determine which file to unpack from file '{}'. Can't continue.".format( archive_file ) ) elif len(potential_matches) == 1: log.debug( "Can't find file '{}' in archive, using only potential match instead: {}".format( file_name, potential_matches[0] ) ) source = os.path.join(archive_dir, potential_matches[0]) else: match = False for m in potential_matches: fn = os.path.basename(m) if fn == file_name: source = os.path.join(archive_dir, m) match = True if not match: for m in potential_matches: if os.path.basename(m).startswith(file_name): log.debug( "- can't find file '{}' in archive, using first potential match instead: {}".format( file_name, m ) ) source = os.path.join(archive_dir, potential_matches[0]) break else: raise LupkgProcessingException( "Can't determine which file to unpack from archive '{}', too many potential matches. Can't continue".format( file_name ) ) return source
[docs]def ensure_base_path(path, create_folder=False): """Utility method to manage the target path. """ real_path = os.path.realpath(os.path.expanduser(path)) if os.path.exists(real_path) and not os.path.isdir(real_path): raise Exception("Target is file, not dir: {}".format(real_path)) if create_folder and not os.path.exists(real_path): os.makedirs(real_path) return real_path
[docs]def ensure_metadata_file(path, create_file=False): """Utility method to manage the task list metadata file.""" real_path = os.path.realpath(os.path.expanduser(path)) if os.path.exists(real_path) and not os.path.isfile(real_path): raise Exception("Metadata path needs to be a file: {}".format(real_path)) if create_file and not os.path.exists(real_path): parent = os.path.dirname(real_path) if not os.path.exists(parent): os.makedirs(parent) elif os.path.isfile(real_path): raise Exception( "Can't create parent folder of metadata file, already exists and is file: {}".format( parent ) ) open(real_path, "a").close() return real_path
[docs]def handle_metadata_exception(exc): click.echo() click.echo(exc.message) if exc.key is not None and exc.key: p = exc.valid_values if p and isinstance(p, (list, tuple)): click.echo( "\nPlease specify the '-p {} <VALUE>' option with one of these values:\n".format( exc.key ) ) for v in p: click.echo(" - {}".format(v)) else: click.echo( "\nPlease specify a valid '-p {} <VALUE>' option.".format(exc.key) )