Source code for lupkg.lupkg

# -*- coding: utf-8 -*-
"""Main module."""

import copy
import logging
import os
from collections import OrderedDict

from cerberus import Validator
from jinja2 import Environment, meta

from ruamel.yaml.comments import CommentedMap
from six import string_types

from frkl.frkl import dict_merge
from frutils import is_templated, readable_yaml, replace_string
from luci.finders import FolderFinder
from luci.lucify import Lucifier
from luci.readers import MetadataFolderReader
from .exceptions import LupkgMetadataException, LupkgSchemaException

LUPKG_KEYWORD = "lupkg"
METADATA_KEYWORD = "meta"
URLS_KEYWORD = "urls"
PROPERTIES_KEYWORD = "properties"
VERSIONS_KEYWORD = "versions"

log = logging.getLogger("lucify")

DEFAULT_PKG_TYPE = "default"
ENV = Environment()


[docs]class LupkgRepoFolderReader(MetadataFolderReader): """Parses a folder for lupkg metadata files/structure. Args: use_folders_as_tags (bool): whether to automatically add hierarchical folder names as tags for a package **kwargs (dict): arguments forwarded to parent class """ def __init__(self, use_folders_as_tags=True, **kwargs): super(LupkgRepoFolderReader, self).__init__( meta_file_name="meta.lupkg", **kwargs ) self.use_folders_as_tags = use_folders_as_tags
[docs] def is_usable_file(self, path): f = os.path.basename(path) return ( ( (self.use_meta_file and f != self.meta_file_name) or not self.use_meta_file ) and (f.endswith("lupkg") or f.endswith("lupkg")) and not f.startswith(".") )
[docs] def process_content( self, content, current_metadata, luci_metadata, luci_metadata_key_name=None ): for rel_path, metadata in content.items(): path_tokens = rel_path.split(os.path.sep) app_name = os.path.splitext(path_tokens[-1])[0] if not metadata.get(LUPKG_KEYWORD, {}).get("name", False): metadata.setdefault(LUPKG_KEYWORD, {})["name"] = app_name if self.use_folders_as_tags: tags = path_tokens[0:-1] for t in tags: tag_list = metadata.setdefault(METADATA_KEYWORD, {}).setdefault( "tags", [] ) if t not in tag_list: tag_list.append(t) app_name = metadata[LUPKG_KEYWORD]["name"] if app_name in current_metadata.keys(): log.warn( "Duplicate description for '{}': overwriting existing".format( app_name ) ) current_metadata[app_name] = metadata return current_metadata
[docs]class RepoLucifier(Lucifier): def __init__(self, **kwargs): super(RepoLucifier, self).__init__("repo", **kwargs) self.reader = LupkgRepoFolderReader() self.finder = FolderFinder() self.pkg_descs = CommentedMap()
[docs] def get_default_dictlet_reader(self): return self.reader
[docs] def get_default_dictlet_finder(self): return self.finder
[docs] def process_dictlet(self, metadata, dictlet_details=None): for pkg_name, details in metadata.items(): if pkg_name in self.pkg_descs.keys(): log.warn( "Duplicate description for ackage '{}', overwriting existing one...".format( pkg_name ) ) try: self.pkg_descs[pkg_name] = LuPKG(details) except (LupkgSchemaException) as e: log.debug(e, exc_info=True) raise Exception( "Invalid pkg description for package '{}': {}".format( pkg_name, LUPKG_VALIDATOR.errors ) )
LUPKG_SCHEMA = { LUPKG_KEYWORD: { "type": "dict", "allow_unknown": True, "schema": { "name": {"type": "string", "required": True}, "type": {"type": "string"}, "requirements": {"type": "list", "schema": {"type": "string"}}, }, }, METADATA_KEYWORD: { "type": "dict", "allow_unknown": True, "schema": { "authors": { "type": "list", "schema": { "type": "dict", "allow_unknown": True, "schema": {"name": {"type": "string"}, "email": {"type": "string"}}, }, }, "slug": {"type": "string"}, "description": {"type": "string"}, "license": {"type": "list", "schema": {"type": "string"}}, "homepage": {"type": "string"}, "docs": {"type": "dict", "allow_unknown": True}, "issues": {"type": "string"}, "source": {"type": "string"}, "tags": {"type": "list", "schema": {"type": "string"}}, }, }, PROPERTIES_KEYWORD: { "type": "dict", "allow_unknown": True, "valueschema": {"type": "list", "schema": {"type": "string"}}, }, URLS_KEYWORD: { "required": False, "type": "dict", "allow_unknown": True, "valueschema": { "type": "dict", "schema": { "url": {"type": "string"}, "type": {"type": "string"}, "path": {"type": "string"}, "checksum": {"type": "string"}, }, }, }, VERSIONS_KEYWORD: { "type": "list", "schema": { "type": "dict", "allow_unknown": True, "schema": { "version": {"anyof_type": ["string", "number"]}, PROPERTIES_KEYWORD: { "type": "dict", "allow_unknown": True, "valueschema": {"type": "list", "schema": {"type": "string"}}, }, URLS_KEYWORD: { "type": "dict", "allow_unknown": True, "valueschema": { "type": "dict", "schema": { "url": {"type": "string"}, "type": {"type": "string"}, "path": {"type": "string"}, "checksum": {"type": "string"}, }, }, }, "changelog": {"type": "string"}, "release_date": {"type": "string"}, }, }, }, } LUPKG_VALIDATOR = Validator(LUPKG_SCHEMA) LATEST = "__latest__"
[docs]class LuPKG(object): """Model class for a package. Enables access to metadata like download url, author, type of file, etc. """
[docs] @classmethod def from_name_and_url(cls, pkg_name, url, base_url=None): """Creates a minimal package description from a name and url. To use this, the package name and the (only) file need to be the same. Args: pkg_name (str): the name of the package and (only) file url (str): the url to the file Returns: LuPKG: the package object """ return LuPKG.from_dict( {LUPKG_KEYWORD: {"name": pkg_name}, URLS_KEYWORD: {pkg_name, url}}, base_url=base_url, )
[docs] @classmethod def from_dict(cls, lupkg_dict, pkg_name=None, base_url=None): """Creates a :class:~LuPKG object. Args: lupkg_dict (dict): the metadata pkg_name (str): if provided, overwrites a potential 'meta/name' value in the lupkg_dict Returns: LuPKG: the package object """ result_dict = {} result_dict[LUPKG_KEYWORD] = lupkg_dict.get(LUPKG_KEYWORD, {}) if pkg_name: result_dict[LUPKG_KEYWORD]["name"] = pkg_name result_dict[METADATA_KEYWORD] = lupkg_dict.get(METADATA_KEYWORD, {}) result_dict[PROPERTIES_KEYWORD] = lupkg_dict.get(PROPERTIES_KEYWORD, {}) result_dict["urls"] = lupkg_dict.get("urls", {}) result_dict[VERSIONS_KEYWORD] = lupkg_dict.get(VERSIONS_KEYWORD, []) return cls(result_dict, base_url=base_url)
def __init__(self, metadata, base_url=None): self.name = None if base_url is None: base_url = os.getcwd() self.base_url = base_url self.latest_version = None self.latest_version_name = None self.versions = None self.properties = None self.info = None self.meta = None self.versions_lookup = None self.all_metadata = None self.process_metadata(metadata) log.debug("Processed metadata: {}".format(self.all_metadata)) # valid = LUPKG_VALIDATOR(special_dict_to_dict(self.all_metadata)) valid = LUPKG_VALIDATOR(self.all_metadata) if not valid: errors = copy.deepcopy(LUPKG_VALIDATOR.errors) raise LupkgSchemaException( "Can't validate metadata: {}".format(errors), item=self.all_metadata, validation_errors=copy.deepcopy(errors), )
[docs] def process_metadata(self, metadata): if isinstance(metadata, string_types): url = metadata pkg_name = os.path.basename(url) metadata = {LUPKG_KEYWORD: {"name": pkg_name}, "urls": url} name = metadata.get(LUPKG_KEYWORD, {}).get("name", None) if name is None: raise LupkgSchemaException("Missing package name.", item=metadata) self.name = name self.meta = metadata.get(LUPKG_KEYWORD, {}) if "type" not in self.meta: self.meta["type"] = "default" self.pkg_type = self.meta["type"] self.info = metadata.get(METADATA_KEYWORD, {}) if "urls" not in metadata.keys(): metadata["urls"] = {} if isinstance(metadata["urls"], string_types): metadata["urls"] = {self.name: {"url": metadata["urls"]}} urls = {} for url_details in metadata["urls"]: if isinstance(url_details, string_types): file_name = os.path.basename(url_details) url = url_details file_type = self.pkg_type file_path = None elif isinstance(url_details, (dict, OrderedDict, CommentedMap)): url = url_details.get("url", None) if url is None: if not url_details or len(url_details) > 1: raise LupkgSchemaException("No 'url' key provided.", metadata) # assume key is file_name, value is url file_name = next(iter(url_details)) url = url_details[file_name] file_type = self.pkg_type file_path = None else: file_name = url_details.get("file_name", os.path.basename(url)) file_type = url_details.get("type", self.pkg_type) file_path = url_details.get("path", None) else: raise LupkgSchemaException( "Invalid type '{}' for url item: {}".format( type(url_details), url_details ), metadata, ) urls[file_name] = {"type": file_type, "url": url} if file_path is not None: urls[file_name]["path"] = file_path self.urls = urls self.properties = metadata.get(PROPERTIES_KEYWORD, {}) if not metadata.get(VERSIONS_KEYWORD, None): latest_version = {} latest_version["version"] = LATEST metadata[VERSIONS_KEYWORD] = [] metadata[VERSIONS_KEYWORD].append(latest_version) versions = metadata[VERSIONS_KEYWORD] versions_new = [] if isinstance(versions, (dict, OrderedDict, CommentedMap)): for key, value in versions.items(): temp = dict_merge(value, {"version": key}, copy_dct=True) versions_new.append(temp) versions = versions_new versions_new = [] for version in versions: if isinstance(version, string_types): version_dict = {"version": version} versions_new.append(version_dict) elif isinstance(version, (dict, OrderedDict, CommentedMap)): versions_new.append(version) else: version_dict = {"version": str(version)} versions_new.append(version_dict) # raise Exception( # "Type '{}' not supported as init value for LuPKG class: {}".format( # type(version), version # ) # ) self.versions_lookup = OrderedDict() for v in versions_new: version = v.get("version", None) if version is None: raise LupkgSchemaException( "No 'version' key provided for version.", item=metadata ) if version in self.versions_lookup.keys(): raise LupkgSchemaException( "Duplicate version in pkg '{}': {}".format(self.name, version), metadata, ) version_properties = dict_merge( self.properties, v.get(PROPERTIES_KEYWORD, {}), copy_dct=True ) v[PROPERTIES_KEYWORD] = version_properties if v.get("urls", None): version_urls = v["urls"] if isinstance(version_urls, string_types): version_urls = {self.name: version_urls} urls = {} for file_name, url_details in version_urls.items(): if isinstance(url_details, string_types): file_name = os.path.basename(url_details) url = url_details file_type = self.pkg_type file_path = None elif isinstance(url_details, (dict, OrderedDict, CommentedMap)): url = url_details.get("url", None) if url is None: if not url_details or len(url_details) > 1: raise LupkgSchemaException( "No 'url' key provided.", metadata ) # assume key is file_name, value is url file_name = next(iter(url_details)) url = url_details[file_name] file_type = self.pkg_type file_path = None else: file_name = url_details.get( "file_name", os.path.basename(url) ) file_type = url_details.get("type", self.pkg_type) file_path = url_details.get("path", None) url = url_details.get("url", None) else: raise LupkgSchemaException( "Invalid type '{}' for url item: {}".format( type(url_details), url_details ), version_urls, ) urls[file_name] = {"type": file_type, "url": url} if file_path is not None: urls[file_name]["path"] = file_path v["urls"] = urls version_urls = dict_merge(self.urls, v.get("urls", {})) repl_dict = {} repl_dict["version"] = version repl_dict["name"] = name if version != LATEST: temp = {} for file_name, details in version_urls.items(): if is_templated(file_name): file_name_new = self.fill_string_template( file_name, version, repl_dict, ignore_template_errors=True, use_global_properties=False, use_version_properties=False, ) else: file_name_new = file_name if is_templated(details["url"]): url_new = self.fill_string_template( details["url"], version, repl_dict, ignore_template_errors=True, use_global_properties=False, use_version_properties=False, ) details["url"] = url_new temp[file_name_new] = details version_urls = temp v["urls"] = version_urls self.versions_lookup[version] = v self.versions = versions_new self.latest_version = versions_new[0] self.latest_version_name = versions_new[0]["version"] self.all_metadata = {} self.all_metadata[LUPKG_KEYWORD] = self.meta self.all_metadata[METADATA_KEYWORD] = self.info self.all_metadata[PROPERTIES_KEYWORD] = self.properties self.all_metadata["urls"] = self.urls self.all_metadata[VERSIONS_KEYWORD] = self.versions
[docs] def details(self): """Returns all metadata as a dictionary, in a raw form. Returns: dict: all metadata """ return self.all_metadata
[docs] def versions(self): """The available versions for this package. The result will be a list containing dicts. For more details look up the `metadata format <XXX>`_. Returns: list: a list of versions for this package """ versions = self.metadata.get(VERSIONS_KEYWORD, []) return versions
[docs] def version_numbers(self): """Returns a list of version numbers. Returns: list: a list of version number strings """ return [v["version"] for v in self.versions()]
[docs] def get_version(self, version_name): """Returns the version details for the specified version name. Args: version_name (str): the version name Returns: dict: the version details dict (keys: 'version', 'properties', 'urls') """ if version_name == LATEST: return self.latest_version else: v = self.versions_lookup.get(version_name, None) if v is None: raise Exception("No version '{}' available.".format(version_name)) return v
[docs] def get_version_properties(self, version=None): """Get a dictionary of version-specific properties. The result is a merged dictionary of the general properties and the versions specific ones. Args: version (str): the version Returns: dict: the version properties """ if version is None: version = LATEST for v in self.version(): if v["version"] == version: version_dict = v if version_dict is None and version != LATEST: raise Exception("Version '{}' not available.".format(version)) if version_dict is None: version_properties = {} properties = dict_merge(self.properties, version_properties, copy_dct=True) return properties
[docs] def get_version_details( self, version=None, properties=None, ignore_template_errors=False, use_first_value_as_default=False, ): """Get version details for version number. Args: version (str): the version in question, defaults to 'LATEST' properties (dict): the properties to use when templating the url details (defaults to merged version/general properties of this pkg) ignore_template_errors (bool): whether to error out when missing template keys are encountered (False) or not (True) use_first_value_as_default (bool): whether to use the first value of the version properties as default when no user input for that key Returns: dict: the version details """ if version is None: version = LATEST if properties is None: properties = {} version_dict = None for v in self.versions(): if v["version"] == version: version_dict = v if version_dict is None and version != LATEST: raise Exception("Version '{}' not available.".format(version)) if version_dict is None: version_dict = {} version_dict["version"] = LATEST version_dict["urls"] = self.get_url_details( version=version, properties=properties, ignore_template_errors=ignore_template_errors, use_first_value_as_default=use_first_value_as_default, ) return version_dict
[docs] def fill_string_template( self, text, version, properties=None, ignore_template_errors=False, use_first_value_as_default=False, use_global_properties=True, use_version_properties=True, ): """Replace template variables in strings with values from the property attribute. If 'use_first_value_as_default' is a list, the requirement key will be checked against this list, and only if it is included will the first value used as default. Otherwise this value needs to be a bool. Args: text (str): the source string version (str): the version properties (dict): the properties to use when templating the url details (defaults to merged version/general properties of this pkg) ignore_template_errors (bool): whether to error out when missing template keys are encountered (False) or not (True) use_first_value_as_default (bool, list): whether to use the first value of the version properties as default when no user input for that key use_global_properties (bool): whether to use the 'global' properties of this pkg for templating use_version_properties (bool): whether to use the properties of the specified version of this pkg for templating Returns: str: the templated string """ # TODO add support for plugins that can guess the right value if not is_templated(text): return text if version is None: version = "{{ version }}" elif version == LATEST: version = self.latest_version_name if properties is None: properties = {} if use_version_properties: version_properties = self.get_version(version).get(PROPERTIES_KEYWORD, {}) else: version_properties = {} if use_global_properties: repl_dict = copy.deepcopy(properties) else: repl_dict = {} repl_dict["version"] = version repl_dict["name"] = self.name # if self.base_url is None: # base_url = "{{ base_url }}" # else: # base_url = self.base_url repl_dict["base_url"] = self.base_url ast = ENV.parse(text) required_vars = meta.find_undeclared_variables(ast) for req in required_vars: valid_values = version_properties.get(req, []) rep_value = repl_dict.get(req, None) if ( rep_value is None and use_first_value_as_default and len(valid_values) > 0 ): if isinstance(use_first_value_as_default, bool): rep_value = valid_values[0] elif isinstance(use_first_value_as_default, (list, tuple)): if req in use_first_value_as_default: rep_value = valid_values[0] else: raise LupkgMetadataException( "Can't process {}: {}".format(req, text), key=req, valid_values=valid_values, ) else: raise Exception( "Invalid type for 'use_first_value_as_default' variable, needs to be 'bool' or 'list': {}".format( use_first_value_as_default ) ) if rep_value is None and not ignore_template_errors: raise LupkgMetadataException( "Can't process {}: {}".format(req, text), key=req, valid_values=valid_values, ) if rep_value is None: rep_value = "{{{{ {} }}}}".format(req) repl_dict[req] = rep_value elif ( req not in ["version", "name", "base_url"] and rep_value not in valid_values ): raise LupkgMetadataException( "Invalid value specified for property '{}' (package '{}'): {}".format( req, self.name, rep_value ), key=req, valid_values=valid_values, ) string_new = replace_string(text, repl_dict) return string_new
[docs] def get_url_details( self, version=None, properties=None, ignore_template_errors=False, use_first_value_as_default=False, ): """Get url details for version number. Args: version (str): the version in question, defaults to 'LATEST' properties (dict): the properties to use when templating the url details ignore_template_errors (bool): whether to error out when missing template keys are encountered (False) or not (True) use_first_value_as_default (bool): whether to use the first value of the version properties as default when no user input for that key Returns: dict: the url details """ if version is None: version = LATEST if properties is None: properties = {} version_details = self.get_version(version) urls = {} for file, url_details in version_details["urls"].items(): if is_templated(file): file_temp = self.fill_string_template( file, version=version, properties=properties, ignore_template_errors=ignore_template_errors, use_first_value_as_default=use_first_value_as_default, ) else: file_temp = file url_details_temp = copy.deepcopy(url_details) if is_templated(url_details["url"]): url_temp = self.fill_string_template( url_details["url"], version=version, properties=properties, ignore_template_errors=ignore_template_errors, use_first_value_as_default=use_first_value_as_default, ) url_details_temp["url"] = url_temp if "path" in url_details.keys() and is_templated(url_details["path"]): path_temp = self.fill_string_template( url_details["path"], version=version, properties=properties, ignore_template_errors=ignore_template_errors, use_first_value_as_default=use_first_value_as_default, ) url_details_temp["path"] = path_temp url_details_temp["file_name"] = file_temp urls[file] = url_details_temp return urls
[docs] def get_property_names(self): """Returns a list of additional property names available for this package. Return: list: a list of property names """ result = self.properties.keys() return sorted(list(result))
[docs] def get_property(self, prop_key): """Returns the value(s) associated with a property key for this package. Returns: list: a list of potential values for this property """ value = self.properties.get(prop_key, None) return value
[docs] def has_file(self, file_name, version=None): """Checks whether this package contains a file with the specified filename. Args: file_name (str): the filename in question version (str): the version to check, if none, all versions will be checked Returns: bool: whether the file is available in this package (True) or not (False) """ if version is not None: v = self.get_version_details(version) if v is None: raise Exception("Version '{}' not available.".format(version)) for version in self.versions: for version_number, details in version.items(): for item_file_name in details["url"].keys(): if is_templated(item_file_name): repl_dict = {"name": self.name} item_file_name = replace_string(item_file_name, repl_dict) if file_name == item_file_name: return True return False
def __str__(self): return readable_yaml(self.all_metadata, safe=True) def __repr__(self): return str(self.all_metadata)