# -*- coding: utf-8 -*-
"""Main module."""
import copy
import logging
import os
from collections import OrderedDict
from cerberus import Validator
from jinja2 import Environment, meta
from ruamel.yaml.comments import CommentedMap
from six import string_types
from frkl.frkl import dict_merge
from frutils import is_templated, readable_yaml, replace_string
from luci.finders import FolderFinder
from luci.lucify import Lucifier
from luci.readers import MetadataFolderReader
from .exceptions import LupkgMetadataException, LupkgSchemaException
LUPKG_KEYWORD = "lupkg"
METADATA_KEYWORD = "meta"
URLS_KEYWORD = "urls"
PROPERTIES_KEYWORD = "properties"
VERSIONS_KEYWORD = "versions"
log = logging.getLogger("lucify")
DEFAULT_PKG_TYPE = "default"
ENV = Environment()
[docs]class LupkgRepoFolderReader(MetadataFolderReader):
"""Parses a folder for lupkg metadata files/structure.
Args:
use_folders_as_tags (bool): whether to automatically add hierarchical folder names as tags for a package
**kwargs (dict): arguments forwarded to parent class
"""
def __init__(self, use_folders_as_tags=True, **kwargs):
super(LupkgRepoFolderReader, self).__init__(
meta_file_name="meta.lupkg", **kwargs
)
self.use_folders_as_tags = use_folders_as_tags
[docs] def is_usable_file(self, path):
f = os.path.basename(path)
return (
(
(self.use_meta_file and f != self.meta_file_name)
or not self.use_meta_file
)
and (f.endswith("lupkg") or f.endswith("lupkg"))
and not f.startswith(".")
)
[docs] def process_content(
self, content, current_metadata, luci_metadata, luci_metadata_key_name=None
):
for rel_path, metadata in content.items():
path_tokens = rel_path.split(os.path.sep)
app_name = os.path.splitext(path_tokens[-1])[0]
if not metadata.get(LUPKG_KEYWORD, {}).get("name", False):
metadata.setdefault(LUPKG_KEYWORD, {})["name"] = app_name
if self.use_folders_as_tags:
tags = path_tokens[0:-1]
for t in tags:
tag_list = metadata.setdefault(METADATA_KEYWORD, {}).setdefault(
"tags", []
)
if t not in tag_list:
tag_list.append(t)
app_name = metadata[LUPKG_KEYWORD]["name"]
if app_name in current_metadata.keys():
log.warn(
"Duplicate description for '{}': overwriting existing".format(
app_name
)
)
current_metadata[app_name] = metadata
return current_metadata
[docs]class RepoLucifier(Lucifier):
def __init__(self, **kwargs):
super(RepoLucifier, self).__init__("repo", **kwargs)
self.reader = LupkgRepoFolderReader()
self.finder = FolderFinder()
self.pkg_descs = CommentedMap()
[docs] def get_default_dictlet_reader(self):
return self.reader
[docs] def get_default_dictlet_finder(self):
return self.finder
[docs] def process_dictlet(self, metadata, dictlet_details=None):
for pkg_name, details in metadata.items():
if pkg_name in self.pkg_descs.keys():
log.warn(
"Duplicate description for ackage '{}', overwriting existing one...".format(
pkg_name
)
)
try:
self.pkg_descs[pkg_name] = LuPKG(details)
except (LupkgSchemaException) as e:
log.debug(e, exc_info=True)
raise Exception(
"Invalid pkg description for package '{}': {}".format(
pkg_name, LUPKG_VALIDATOR.errors
)
)
LUPKG_SCHEMA = {
LUPKG_KEYWORD: {
"type": "dict",
"allow_unknown": True,
"schema": {
"name": {"type": "string", "required": True},
"type": {"type": "string"},
"requirements": {"type": "list", "schema": {"type": "string"}},
},
},
METADATA_KEYWORD: {
"type": "dict",
"allow_unknown": True,
"schema": {
"authors": {
"type": "list",
"schema": {
"type": "dict",
"allow_unknown": True,
"schema": {"name": {"type": "string"}, "email": {"type": "string"}},
},
},
"slug": {"type": "string"},
"description": {"type": "string"},
"license": {"type": "list", "schema": {"type": "string"}},
"homepage": {"type": "string"},
"docs": {"type": "dict", "allow_unknown": True},
"issues": {"type": "string"},
"source": {"type": "string"},
"tags": {"type": "list", "schema": {"type": "string"}},
},
},
PROPERTIES_KEYWORD: {
"type": "dict",
"allow_unknown": True,
"valueschema": {"type": "list", "schema": {"type": "string"}},
},
URLS_KEYWORD: {
"required": False,
"type": "dict",
"allow_unknown": True,
"valueschema": {
"type": "dict",
"schema": {
"url": {"type": "string"},
"type": {"type": "string"},
"path": {"type": "string"},
"checksum": {"type": "string"},
},
},
},
VERSIONS_KEYWORD: {
"type": "list",
"schema": {
"type": "dict",
"allow_unknown": True,
"schema": {
"version": {"anyof_type": ["string", "number"]},
PROPERTIES_KEYWORD: {
"type": "dict",
"allow_unknown": True,
"valueschema": {"type": "list", "schema": {"type": "string"}},
},
URLS_KEYWORD: {
"type": "dict",
"allow_unknown": True,
"valueschema": {
"type": "dict",
"schema": {
"url": {"type": "string"},
"type": {"type": "string"},
"path": {"type": "string"},
"checksum": {"type": "string"},
},
},
},
"changelog": {"type": "string"},
"release_date": {"type": "string"},
},
},
},
}
LUPKG_VALIDATOR = Validator(LUPKG_SCHEMA)
LATEST = "__latest__"
[docs]class LuPKG(object):
"""Model class for a package.
Enables access to metadata like download url, author, type of file, etc.
"""
[docs] @classmethod
def from_name_and_url(cls, pkg_name, url, base_url=None):
"""Creates a minimal package description from a name and url.
To use this, the package name and the (only) file need to be the same.
Args:
pkg_name (str): the name of the package and (only) file
url (str): the url to the file
Returns:
LuPKG: the package object
"""
return LuPKG.from_dict(
{LUPKG_KEYWORD: {"name": pkg_name}, URLS_KEYWORD: {pkg_name, url}},
base_url=base_url,
)
[docs] @classmethod
def from_dict(cls, lupkg_dict, pkg_name=None, base_url=None):
"""Creates a :class:~LuPKG object.
Args:
lupkg_dict (dict): the metadata
pkg_name (str): if provided, overwrites a potential 'meta/name' value in the lupkg_dict
Returns:
LuPKG: the package object
"""
result_dict = {}
result_dict[LUPKG_KEYWORD] = lupkg_dict.get(LUPKG_KEYWORD, {})
if pkg_name:
result_dict[LUPKG_KEYWORD]["name"] = pkg_name
result_dict[METADATA_KEYWORD] = lupkg_dict.get(METADATA_KEYWORD, {})
result_dict[PROPERTIES_KEYWORD] = lupkg_dict.get(PROPERTIES_KEYWORD, {})
result_dict["urls"] = lupkg_dict.get("urls", {})
result_dict[VERSIONS_KEYWORD] = lupkg_dict.get(VERSIONS_KEYWORD, [])
return cls(result_dict, base_url=base_url)
def __init__(self, metadata, base_url=None):
self.name = None
if base_url is None:
base_url = os.getcwd()
self.base_url = base_url
self.latest_version = None
self.latest_version_name = None
self.versions = None
self.properties = None
self.info = None
self.meta = None
self.versions_lookup = None
self.all_metadata = None
self.process_metadata(metadata)
log.debug("Processed metadata: {}".format(self.all_metadata))
# valid = LUPKG_VALIDATOR(special_dict_to_dict(self.all_metadata))
valid = LUPKG_VALIDATOR(self.all_metadata)
if not valid:
errors = copy.deepcopy(LUPKG_VALIDATOR.errors)
raise LupkgSchemaException(
"Can't validate metadata: {}".format(errors),
item=self.all_metadata,
validation_errors=copy.deepcopy(errors),
)
[docs] def details(self):
"""Returns all metadata as a dictionary, in a raw form.
Returns:
dict: all metadata
"""
return self.all_metadata
[docs] def versions(self):
"""The available versions for this package.
The result will be a list containing dicts. For more details look up the `metadata format <XXX>`_.
Returns:
list: a list of versions for this package
"""
versions = self.metadata.get(VERSIONS_KEYWORD, [])
return versions
[docs] def version_numbers(self):
"""Returns a list of version numbers.
Returns:
list: a list of version number strings
"""
return [v["version"] for v in self.versions()]
[docs] def get_version(self, version_name):
"""Returns the version details for the specified version name.
Args:
version_name (str): the version name
Returns:
dict: the version details dict (keys: 'version', 'properties', 'urls')
"""
if version_name == LATEST:
return self.latest_version
else:
v = self.versions_lookup.get(version_name, None)
if v is None:
raise Exception("No version '{}' available.".format(version_name))
return v
[docs] def get_version_properties(self, version=None):
"""Get a dictionary of version-specific properties.
The result is a merged dictionary of the general properties and the
versions specific ones.
Args:
version (str): the version
Returns:
dict: the version properties
"""
if version is None:
version = LATEST
for v in self.version():
if v["version"] == version:
version_dict = v
if version_dict is None and version != LATEST:
raise Exception("Version '{}' not available.".format(version))
if version_dict is None:
version_properties = {}
properties = dict_merge(self.properties, version_properties, copy_dct=True)
return properties
[docs] def get_version_details(
self,
version=None,
properties=None,
ignore_template_errors=False,
use_first_value_as_default=False,
):
"""Get version details for version number.
Args:
version (str): the version in question, defaults to 'LATEST'
properties (dict): the properties to use when templating the url details (defaults to merged version/general properties of this pkg)
ignore_template_errors (bool): whether to error out when missing template keys are encountered (False) or not (True)
use_first_value_as_default (bool): whether to use the first value of the version properties as default when no user input for that key
Returns:
dict: the version details
"""
if version is None:
version = LATEST
if properties is None:
properties = {}
version_dict = None
for v in self.versions():
if v["version"] == version:
version_dict = v
if version_dict is None and version != LATEST:
raise Exception("Version '{}' not available.".format(version))
if version_dict is None:
version_dict = {}
version_dict["version"] = LATEST
version_dict["urls"] = self.get_url_details(
version=version,
properties=properties,
ignore_template_errors=ignore_template_errors,
use_first_value_as_default=use_first_value_as_default,
)
return version_dict
[docs] def fill_string_template(
self,
text,
version,
properties=None,
ignore_template_errors=False,
use_first_value_as_default=False,
use_global_properties=True,
use_version_properties=True,
):
"""Replace template variables in strings with values from the property attribute.
If 'use_first_value_as_default' is a list, the requirement key will be checked against this list,
and only if it is included will the first value used as default. Otherwise this value needs to be a bool.
Args:
text (str): the source string
version (str): the version
properties (dict): the properties to use when templating the url details (defaults to merged version/general properties of this pkg)
ignore_template_errors (bool): whether to error out when missing template keys are encountered (False) or not (True)
use_first_value_as_default (bool, list): whether to use the first value of the version properties as default when no user input for that key
use_global_properties (bool): whether to use the 'global' properties of this pkg for templating
use_version_properties (bool): whether to use the properties of the specified version of this pkg for templating
Returns:
str: the templated string
"""
# TODO add support for plugins that can guess the right value
if not is_templated(text):
return text
if version is None:
version = "{{ version }}"
elif version == LATEST:
version = self.latest_version_name
if properties is None:
properties = {}
if use_version_properties:
version_properties = self.get_version(version).get(PROPERTIES_KEYWORD, {})
else:
version_properties = {}
if use_global_properties:
repl_dict = copy.deepcopy(properties)
else:
repl_dict = {}
repl_dict["version"] = version
repl_dict["name"] = self.name
# if self.base_url is None:
# base_url = "{{ base_url }}"
# else:
# base_url = self.base_url
repl_dict["base_url"] = self.base_url
ast = ENV.parse(text)
required_vars = meta.find_undeclared_variables(ast)
for req in required_vars:
valid_values = version_properties.get(req, [])
rep_value = repl_dict.get(req, None)
if (
rep_value is None
and use_first_value_as_default
and len(valid_values) > 0
):
if isinstance(use_first_value_as_default, bool):
rep_value = valid_values[0]
elif isinstance(use_first_value_as_default, (list, tuple)):
if req in use_first_value_as_default:
rep_value = valid_values[0]
else:
raise LupkgMetadataException(
"Can't process {}: {}".format(req, text),
key=req,
valid_values=valid_values,
)
else:
raise Exception(
"Invalid type for 'use_first_value_as_default' variable, needs to be 'bool' or 'list': {}".format(
use_first_value_as_default
)
)
if rep_value is None and not ignore_template_errors:
raise LupkgMetadataException(
"Can't process {}: {}".format(req, text),
key=req,
valid_values=valid_values,
)
if rep_value is None:
rep_value = "{{{{ {} }}}}".format(req)
repl_dict[req] = rep_value
elif (
req not in ["version", "name", "base_url"]
and rep_value not in valid_values
):
raise LupkgMetadataException(
"Invalid value specified for property '{}' (package '{}'): {}".format(
req, self.name, rep_value
),
key=req,
valid_values=valid_values,
)
string_new = replace_string(text, repl_dict)
return string_new
[docs] def get_url_details(
self,
version=None,
properties=None,
ignore_template_errors=False,
use_first_value_as_default=False,
):
"""Get url details for version number.
Args:
version (str): the version in question, defaults to 'LATEST'
properties (dict): the properties to use when templating the url details
ignore_template_errors (bool): whether to error out when missing template keys are encountered (False) or not (True)
use_first_value_as_default (bool): whether to use the first value of the version properties as default when no user input for that key
Returns:
dict: the url details
"""
if version is None:
version = LATEST
if properties is None:
properties = {}
version_details = self.get_version(version)
urls = {}
for file, url_details in version_details["urls"].items():
if is_templated(file):
file_temp = self.fill_string_template(
file,
version=version,
properties=properties,
ignore_template_errors=ignore_template_errors,
use_first_value_as_default=use_first_value_as_default,
)
else:
file_temp = file
url_details_temp = copy.deepcopy(url_details)
if is_templated(url_details["url"]):
url_temp = self.fill_string_template(
url_details["url"],
version=version,
properties=properties,
ignore_template_errors=ignore_template_errors,
use_first_value_as_default=use_first_value_as_default,
)
url_details_temp["url"] = url_temp
if "path" in url_details.keys() and is_templated(url_details["path"]):
path_temp = self.fill_string_template(
url_details["path"],
version=version,
properties=properties,
ignore_template_errors=ignore_template_errors,
use_first_value_as_default=use_first_value_as_default,
)
url_details_temp["path"] = path_temp
url_details_temp["file_name"] = file_temp
urls[file] = url_details_temp
return urls
[docs] def get_property_names(self):
"""Returns a list of additional property names available for this package.
Return:
list: a list of property names
"""
result = self.properties.keys()
return sorted(list(result))
[docs] def get_property(self, prop_key):
"""Returns the value(s) associated with a property key for this package.
Returns:
list: a list of potential values for this property
"""
value = self.properties.get(prop_key, None)
return value
[docs] def has_file(self, file_name, version=None):
"""Checks whether this package contains a file with the specified filename.
Args:
file_name (str): the filename in question
version (str): the version to check, if none, all versions will be checked
Returns:
bool: whether the file is available in this package (True) or not (False)
"""
if version is not None:
v = self.get_version_details(version)
if v is None:
raise Exception("Version '{}' not available.".format(version))
for version in self.versions:
for version_number, details in version.items():
for item_file_name in details["url"].keys():
if is_templated(item_file_name):
repl_dict = {"name": self.name}
item_file_name = replace_string(item_file_name, repl_dict)
if file_name == item_file_name:
return True
return False
def __str__(self):
return readable_yaml(self.all_metadata, safe=True)
def __repr__(self):
return str(self.all_metadata)