Source code for lupkg.lupkg_processors

# -*- coding: utf-8 -*-
"""Main module."""
import logging
import os
import shutil
import stat
import sys
import tempfile

import click
from stevedore import driver

from .exceptions import LupkgProcessingException
from .lupkg import LATEST
from .utils import download_file, find_pkg_file_in_archive_dir, unpack_file

log = logging.getLogger("lucify")

IGNORE_FILE_NAMES = ["install", "readme"]


[docs]def create_processor(processor_alias, output_callback=None): log2 = logging.getLogger("stevedore") out_hdlr = logging.StreamHandler(sys.stdout) out_hdlr.setFormatter( logging.Formatter("dictlet_finder plugin error -> %(message)s") ) out_hdlr.setLevel(logging.DEBUG) log2.addHandler(out_hdlr) log2.setLevel(logging.INFO) log.debug("Loading dictlet_finder...") mgr = driver.DriverManager( namespace="lupkg.processors", name=processor_alias, invoke_on_load=True, invoke_kwds={"output_callback": output_callback}, ) log.debug( "Registered lupkg processor: {}".format( ", ".join(ext.name for ext in mgr.extensions) ) ) return mgr.driver
PROCESSOR_TYPE_MAP = {"binary": "download-exe", "script": "download-exe"} PROCESSOR_CACHE = {}
[docs]class SimpleCallback(object): def __init__(self): self.silent = True
[docs] def is_silent(self): return self.silent
[docs] def set_silent(self, silent): self.silent = silent
[docs] def output(self, output, log_level=None): if not self.silent: click.echo(output)
[docs]def default_output_callback(): sc = SimpleCallback() return sc
DEFAULT_OUTPUT_CALLBACK = default_output_callback()
[docs]def get_processor(pkg_file_type): global PROCESSOR_TYPE_MAP global DEFAULT_OUTPUT_CALLBACK ptype = PROCESSOR_TYPE_MAP.get(pkg_file_type, "download") if ptype not in PROCESSOR_CACHE: plugin = create_processor(ptype, output_callback=DEFAULT_OUTPUT_CALLBACK) PROCESSOR_CACHE[ptype] = plugin return PROCESSOR_CACHE[ptype]
[docs]class LupkgProcessor(object): def __init__(self, output_callback=None, default_log_level=logging.INFO): self.output_callback = output_callback self.default_log_level = default_log_level
[docs] def process_pkg(self, file_name, base_path, pkg): pass
[docs] def output(self, message, log_level=None): if self.output_callback is not None: self.output_callback.output(message, log_level) else: if log_level is None: log_level = logging.INFO if log_level is None: log_level = self.default_log_level log.info(message, log_level)
[docs] def update_pkg_folder_metadata(self, path, file_name, file_details): # self.config.update_pkg_folder(path, file_name, file_details) pass
SUPPORTED_ARCHIVE_FORMATS = ("zip", "tar.bz2", "tar.gz", "tar", "tgz") ARCHIVE_TRANSLATOR = {"tar.bz2": "bztar", "tar.gz": "gztar"}
[docs]class DownloadProcessor(LupkgProcessor): def __init__(self, **kwargs): super(DownloadProcessor, self).__init__(**kwargs) # if target_dir is None: # target_dir = self.config.get_default_pkg_folder() # else: # target_dir = self.config.get_pkg_folder_path(target_dir) # self.target_dir = target_dir
[docs] def process_pkg_item( self, pkg_item, file_name, base_path, version=LATEST, properties=None, process_properties=None, ): target_file = pkg_item.get_path_for_file(file_name) target_dir = os.path.dirname(target_file) file_name_new = os.path.basename(target_file) force = process_properties.get("force", False) reinstall = process_properties.get("reinstall", False) self.pre_process_target(file_name_new, target_dir, force, reinstall) tmp_folder = tempfile.mkdtemp(prefix="tmp_lupkg") temp_file = self.download_into_temp( pkg_item, file_name, tmp_folder, version=version, properties=properties, process_properties=process_properties, ) if not os.path.exists(os.path.realpath(target_dir)): log.debug("Creating target dir: {}".format(target_dir)) os.makedirs(target_dir) log.debug("Downloaded temp file: {}".format(temp_file)) if os.path.exists(target_file) and force: os.remove(target_file) log.debug("Moving file: {} -> {}".format(temp_file, target_file)) self.output("- moving file to target directory") shutil.move(temp_file, target_file) log.debug("Deleting temp dir...") shutil.rmtree(tmp_folder) return target_file
[docs] def pre_process_target(self, file_name, base_path, force, reinstall): target_file = os.path.join(os.path.expanduser(base_path), file_name) target_dir = os.path.dirname(target_file) if os.path.exists(os.path.realpath(target_dir)): if os.path.isfile(os.path.realpath(target_dir)): raise LupkgProcessingException( "Target dir '{}' exists, but is file.".format(target_dir) ) if os.path.exists(target_file): # this should not happen, better to abort because if that's the case, there's likely something wrong # don't want to have to delete a folder, even with 'force' if os.path.isdir(os.path.realpath(target_file)): raise LupkgProcessingException( "Target file is directory: {}".format(target_file) ) if not force and not reinstall: raise LupkgProcessingException( "Target file exists: {}".format(target_file) ) else: log.debug( "Target file '{}' exists, but 'force' or 'reinstall' specified, will overwrite...".format( target_file ) )
[docs] def download_into_temp( self, pkg_item, file_name, temp_dir, version=LATEST, properties=None, process_properties=None, ): if properties is None: properties = {} if process_properties is None: process_properties = {} url_details_all = pkg_item.get_url_details( version=version, properties=properties, ignore_template_errors=False ) if file_name not in url_details_all.keys(): raise Exception("No file '{}' in package '{}'".format(file_name)) url_details = url_details_all[file_name] log.debug("Processing: {}".format(url_details)) url = url_details["url"] unpack = url_details.get("unpack", False) unpack_format = url_details.get("archive_format", None) target_file = os.path.join(temp_dir, file_name) target_dir = os.path.dirname(target_file) if os.path.exists(os.path.realpath(target_dir)): if os.path.isfile(os.path.realpath(target_dir)): raise LupkgProcessingException( "Target dir '{}' exists, but is file.".format(target_dir) ) if os.path.exists(target_file): # this should not happen, better to abort because if that's the case, there's likely something wrong # don't want to have to delete a folder, even with 'force' raise LupkgProcessingException( "Target file '{}' exists in temp folder: {}".format( target_file, temp_dir ) ) self.output("- downloading: {}".format(url)) download_item = download_file(url, file_name, temp_dir, self.output_callback) if unpack or url.endswith(SUPPORTED_ARCHIVE_FORMATS): target_file = download_item["target_file"] self.output("- unpacking archive...") archive_dir = unpack_file(url, file_name, target_file, unpack_format) target_file = find_pkg_file_in_archive_dir( file_name, archive_dir, target_file ) if target_file is None: raise LupkgProcessingException( "Could not find file '{}' in downloaded archive.".format( target_file ) ) else: target_file = download_item["target_file"] return target_file
# def process(self, lupkg, version=None, properties=None, force=False): # # urls = lupkg.get_url_details(version, properties) # # result = [] # for url_details in urls: # item = self.download(url_details, self.target_dir) # # result.append(item) # # return result
[docs]class DownloadExecutableProcessor(DownloadProcessor): def __init__(self, **kwargs): super(DownloadExecutableProcessor, self).__init__(**kwargs)
[docs] def process_pkg_item( self, pkg_item, file_name, base_path, version=LATEST, properties=None, process_properties=None, ): target_file = super(DownloadExecutableProcessor, self).process_pkg_item( pkg_item, file_name, base_path, version=version, properties=properties, process_properties=process_properties, ) self.output("- making script/binary executable...") st = os.stat(target_file) os.chmod(target_file, st.st_mode | stat.S_IEXEC) return target_file