import copy
import logging
import os
from collections import OrderedDict
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap
from six import string_types
from frkl.defaults import (
STEM_KEY_NAME,
DEFAULT_LEAF_KEY_NAME,
DEFAULT_LEAF_DEFAULT_KEY_NAME,
OTHER_VALID_KEYS_NAME,
)
from frkl import Frklist
from frkl import Frkl, FrklProcessor
from frutils import dict_merge
from .lupkg import LATEST
from .lupkg_processors import get_processor
from .lupkg_config import DEFAULT_INDEX_NAME, LUPKG_INDEXES
log = logging.getLogger("lucify")
# name of the file that contains lupkg metadata for a folder
LUPKG_FOLDER_METADATA_FILE_NAME = ".lupkgs"
LUPKG_CACHE_DIR = os.path.join(
os.path.expanduser("~"), ".local", "share", "lupkg", "cache"
)
# default config folder location
LUPKG_CONFIG_FOLDER_PATH = os.path.expanduser("~/.config/lupkg")
# legacy config folder location
LUPKG_CONFIG_FOLDER_LEGACY_PATH = os.path.expanduser("~/.lupkg")
# setting default config path
if os.path.exists(LUPKG_CONFIG_FOLDER_PATH):
LUPKG_CONFIG_FOLDER = LUPKG_CONFIG_FOLDER_PATH
elif os.path.exists(LUPKG_CONFIG_FOLDER_LEGACY_PATH):
LUPKG_CONFIG_FOLDER = LUPKG_CONFIG_FOLDER_LEGACY_PATH
else:
LUPKG_CONFIG_FOLDER = LUPKG_CONFIG_FOLDER_PATH
# config file names
LUPKG_PROFILES_FILE = os.path.join(LUPKG_CONFIG_FOLDER, "profiles.yml")
# default index
yaml = YAML()
yaml.default_flow_style = False
DEFAULT_PATH_MAP = {"__default__": ""}
[docs]class LupkgPkgItem(object):
def __init__(
self,
pkg_name,
base_path,
version=LATEST,
properties=None,
index=None,
path_map=None,
):
self.pkg_name = pkg_name
if base_path is None:
raise Exception("Base path can't be none")
self.base_path = base_path
if version is None:
version = LATEST
self.version = version
if index is None:
index = DEFAULT_INDEX_NAME
if isinstance(index, string_types):
repository_obj = LUPKG_INDEXES.get_index(index)
if repository_obj is None:
raise Exception("No repository configured for alias '{}'".format(index))
else:
index = repository_obj
self.index = index
self.pkg = self.index.get_pkg(self.pkg_name)
if properties is None:
properties = {}
self.properties = properties
if path_map is None:
path_map = copy.deepcopy(DEFAULT_PATH_MAP)
if isinstance(path_map, string_types):
path_map = {"__default__": path_map}
if not isinstance(path_map, (dict, OrderedDict, CommentedMap)):
raise Exception("Invalid type for 'paths' value: {}".format(type(path_map)))
if "__default__" not in path_map.keys():
path_map["__default__"] = ""
self.path_map = path_map
# TODO: validate path_map
self.pkg = self.index.get_pkg(self.pkg_name)
if self.pkg is None:
raise Exception("Package '{}' not found in index".format(self.pkg_name))
def __eq__(self, other):
return (self.pkg_name, self.base_path) == (other.pkg_name, other.base_path)
def __ne__(self, other):
ret = self.__eq__(other)
return ret if ret is NotImplemented else not ret
def __lt__(self, other):
return (self.pkg_name, self.base_path) < (other.pkg_name, other.base_path)
def __le__(self, other):
return (self.pkg_name, self.base_path) <= (other.pkg_name, other.base_path)
def __gt__(self, other):
return (self.pkg_name, self.base_path) > (other.pkg_name, other.base_path)
def __ge__(self, other):
return (self.pkg_name, self.base_path) >= (other.pkg_name, other.base_path)
def __repr__(self):
return "LupkgPkgItem: [pkg: {}, version: {}, repo: {}]".format(
self.pkg_name, self.version, self.index.get_name()
)
def __hash__(self):
return hash(self.pkg_name, self.base_path)
[docs] def get_overlay_properties(self, properties):
if properties is None:
properties = {}
result = dict_merge(self.properties, properties, copy_dct=True)
return result
[docs] def get_file_names(self):
result = self.get_url_details().keys()
return result
[docs] def get_url_details(
self, version=None, properties=None, ignore_template_errors=True
):
if version is None:
version = self.version
urls = self.pkg.get_url_details(
version=version,
properties=self.get_overlay_properties(properties),
ignore_template_errors=ignore_template_errors,
)
# filter man pages for now, because they go into a different path
result = OrderedDict()
for file, md in urls.items():
if md["type"] == "man":
continue
result[file] = md
return result
[docs] def get_path_for_file(self, file_name):
url_details = self.get_url_details(ignore_template_errors=True)
if file_name not in url_details.keys():
raise Exception(
"File '{}' not part of package '{}'".format(file_name, self.pkg_name)
)
path = self.path_map.get(file_name, None)
if path is None:
path = url_details[file_name].get("path", None)
if path is None:
path = self.path_map.get("__default__")
if not path:
path = file_name
if path.endswith(os.sep):
path = os.path.join(path, file_name)
if os.path.isabs(path):
return path
else:
return os.path.expanduser(os.path.join(self.base_path, path))
[docs] def exists(self, file_name):
file_path = self.get_path_for_file(file_name)
if os.path.exists(file_path):
if os.path.isdir(file_path):
raise Exception("pkg file '{}' exists, but is folder".format(file_path))
return True
else:
return False
[docs] def is_installed(self):
for file_name, details in self.get_url_details(
ignore_template_errors=True
).items():
if not self.exists(file_name):
return False
return True
[docs] def is_up_to_date(self, check_files_exist=True):
for file_name, details in self.get_url_details(
ignore_template_errors=True
).items():
if check_files_exist:
if not self.exists(file_name):
return False
latest_version = self.pkg.latest_version_name
if self.version == LATEST and latest_version == LATEST:
# this means we don't know the exact version number, or the package does not use them
return False
elif self.version == latest_version:
continue
else:
return False
return True
[docs] def uninstall(self):
result = []
for file_name in self.get_file_names():
path = os.path.join(self.base_path, file_name)
if os.path.exists(path):
os.remove(path)
result.append(path)
return result
[docs] def install(self, version=LATEST, pkg_properties=None, install_properties=None):
if install_properties is None:
install_properties = {}
force = install_properties.get("force", False)
reinstall = install_properties.get("reinstall", False)
if not version or version == LATEST:
version = self.pkg.latest_version_name
url_details = self.get_url_details(
version=version,
properties=self.get_overlay_properties(pkg_properties),
ignore_template_errors=False,
)
if force is False and reinstall is False:
all_exist = True
for file_name, details in url_details.items():
if not self.exists(file_name):
all_exist = False
break
if all_exist:
if version != LATEST and self.version == version:
log.debug("pkg '{}' exists, doing nothing".format(self.pkg_name))
return []
result = []
log.debug("Installing package: {}".format(self.pkg_name))
for file_name, details in url_details.items():
log.debug("- installing file: {}".format(file_name))
file_type = details.get("type", "default")
processor = get_processor(file_type)
merged_properties = self.get_overlay_properties(pkg_properties)
local_file = processor.process_pkg_item(
self,
file_name,
self.base_path,
version=version,
properties=merged_properties,
process_properties=install_properties,
)
result.append(local_file)
new_pkg_item = LupkgPkgItem(
self.pkg_name,
self.base_path,
version=version,
properties=merged_properties,
index=self.index,
path_map=self.path_map,
)
return {"pkg_item": new_pkg_item, "installed_files": result}
[docs]class LupList(Frklist):
def __init__(self, pkgs, context=None, meta=None, vars=None):
self.base_path = None
super(LupList, self).__init__(pkgs, context, meta, vars)
[docs] def expand_and_augment_tasklist(self, pkgs):
result = []
LUPKG_FRKL_FORMAT = {
STEM_KEY_NAME: "packages",
DEFAULT_LEAF_KEY_NAME: "package",
DEFAULT_LEAF_DEFAULT_KEY_NAME: "name",
# DEFAULT_LEAF_KEY_MAP_NAME: "package",
OTHER_VALID_KEYS_NAME: ["properties", "path_map"],
}
chain = [FrklProcessor(**LUPKG_FRKL_FORMAT)]
f = Frkl([pkgs], chain)
pkgs_md = f.process()
log.debug("frklized pkg list: {}".format(result))
pkg_names = []
for pkg_md in pkgs_md:
pkg_name = pkg_md["package"]["name"]
if pkg_name in pkg_names:
log.warn(
"Duplicate package '{}', this might cause unpredictable behaviour.".format(
pkg_name
)
)
else:
pkg_names.append(pkg_name)
version = pkg_md["package"].get("version", LATEST)
index = pkg_md["package"].get("index", None)
if index is None:
raise Exception("NO INDEX")
properties = pkg_md.get("properties", {})
path_map = pkg_md.get("path_map", {})
item = LupkgPkgItem(
pkg_name,
self.base_path,
version=version,
properties=properties,
path_map=path_map,
index=index,
)
result.append(item)
return result
[docs] def find_list_index_of_package(self, pkg_name):
for index, pkg in enumerate(self.tasklist):
if pkg.pkg_name == pkg_name:
return index
return None
[docs] def find_existing_package_in_list(self, pkg_name):
for pkg in self.tasklist:
if pkg.pkg_name == pkg_name:
return pkg
return None
[docs] def get_package_names(self):
result = [n.pkg_name for n in self.tasklist]
return result
[docs] def update_pkg_item(self, new_pkg):
pkg_name = new_pkg.pkg_name
index = self.find_list_index_of_package(pkg_name)
if index is not None:
self.tasklist[index] = new_pkg
else:
self.tasklist.append(new_pkg)
[docs] def remote_package(self, pkg_name):
index = self.find_list_index_of_package(pkg_name)
if index is None:
return
del self.tasklist[index]
[docs] def render_tasklist(self):
result = []
for task in self.tasklist:
item = {}
item["package"] = {}
item["package"]["name"] = task.pkg_name
item["package"]["version"] = task.version
item["package"]["index"] = task.index.get_name()
if task.properties:
item["properties"] = task.properties
if task.path_map and not task.path_map == DEFAULT_PATH_MAP:
tm_copy = copy.deepcopy(task.path_map)
tm_copy.pop("__default__", None)
item["path_map"] = tm_copy
result.append(item)
return result
[docs]class LupkgPath(object):
def __init__(self, path, metadata_file=None):
self.path = os.path.realpath(os.path.expanduser(path))
if metadata_file is None:
self.metadata_file = os.path.join(
self.path, LUPKG_FOLDER_METADATA_FILE_NAME
)
else:
if not os.path.exists(metadata_file) or os.path.isfile(
os.path.realpath(metadata_file)
):
self.metadata_file = metadata_file
else:
self.metadata_file = os.path.join(
metadata_file, LUPKG_FOLDER_METADATA_FILE_NAME
)
metadata_file_dir = os.path.dirname(self.metadata_file)
if not os.path.exists(metadata_file_dir):
log.debug("Creating metadata file parent: {}".format(metadata_file_dir))
os.makedirs(metadata_file_dir)
elif os.path.isfile(os.path.realpath(metadata_file_dir)):
raise Exception(
"Metadata file path is not a directory: {}".format(metadata_file_dir)
)
if not os.path.exists(self.metadata_file):
open(self.metadata_file, "a").close()
self.pkg_list = LupList(self.metadata_file, meta={"path": self.path})
[docs] def get_pkgs(self):
return self.pkg_list.tasklist
[docs] def get_pkg(self, pkg_name):
return self.pkg_list.find_existing_package_in_list(pkg_name)
[docs] def get_pkg_names(self):
return self.pkg_list.get_package_names()
[docs] def uninstall_pkg(self, pkg_name):
existing_pkg = self.pkg_list.find_existing_package_in_list(pkg_name)
if existing_pkg is None:
log.debug("Package '{}' not installed, doing nothing.".format(existing_pkg))
return
deleted_files = existing_pkg.uninstall()
self.pkg_list.remote_package(pkg_name)
self.write_package_list()
return deleted_files
[docs] def install_pkg(
self,
pkg_name,
version=LATEST,
properties=None,
index=None,
install_properties=None,
ignore_if_already_installed=False,
):
existing_pkg = self.pkg_list.find_existing_package_in_list(pkg_name)
init_properties = None
init_path_map = None
init_repo = None
init_version = None
if existing_pkg is not None:
init_version = existing_pkg.version
init_properties = existing_pkg.properties
init_path_map = existing_pkg.path_map
init_repo = existing_pkg.index.get_name()
if index is None:
index = init_repo
pkg = LupkgPkgItem(
pkg_name,
self.path,
version=init_version,
index=index,
properties=init_properties,
path_map=init_path_map,
)
if ignore_if_already_installed and pkg.is_installed():
# means all required files are installed already
temp = self.get_pkg(pkg_name)
if temp is not None:
return {}
result = pkg.install(
version=version,
pkg_properties=properties,
install_properties=install_properties,
)
log.debug("Install result: {}".format(result))
if not result:
return result
new_pkg = result["pkg_item"]
self.pkg_list.update_pkg_item(new_pkg)
self.write_package_list()
return result
[docs] def write_package_list(self):
with open(self.metadata_file, "w") as mdf:
tl = self.pkg_list.render_tasklist()
if tl:
yaml.dump(tl, mdf)
else:
with open(self.metadata_file, "w") as mdf:
mdf.write("")