Source code for lupkg.lupkg_cli

# -*- coding: utf-8 -*-
"""Console script for lupkg."""
import logging
import os
import sys
from collections import OrderedDict

import click
import click_completion
import click_log
from halo import Halo
from ruamel.yaml.comments import CommentedMap
from termcolor import colored
from terminaltables import SingleTable

from frutils import SUPPORTED_OUTPUT_FORMATS, readable, reindent
from frutils.frutils_cli import CursorOff, output
from .utils import ensure_base_path, ensure_metadata_file, handle_metadata_exception
from .exceptions import LupkgMetadataException, LupkgProcessingException
from .lupkg_config import LUPKG_INDEXES
from .lupkg_list import LUPKG_FOLDER_METADATA_FILE_NAME, LupkgPath
from .lupkg_processors import DEFAULT_OUTPUT_CALLBACK
from .lupkg import LATEST

log = logging.getLogger("lucify")
click_log.basic_config(log)

# optional shell completion
click_completion.init()

DEFAULT_TARGET_PATH = os.path.expanduser("~/.local/bin")


[docs]def click_stdout(message): click.echo(message)
# @click.group(context_settings=CONTEXT_SETTINGS) @click.group() @click.option("--index", "-i", multiple=False, help="the lupkg index to use") @click.option( "--target-path", "-t", default=None, multiple=False, help="the local target folder" ) @click.option( "--metadata", "-m", default=None, multiple=False, help="the metadata file, if separate from the target folder, defaults to file named '.lupkgs' in the target", ) @click.option( "--output-format", "-o", help="output format, implies '--yes' to auto-confirm install/removal/upgrading of packages in commands that need it", default="default", required=False, type=click.Choice(["default"] + SUPPORTED_OUTPUT_FORMATS), ) @click_log.simple_verbosity_option(log, "--verbosity", default="WARN") @click.pass_context def cli(ctx, index, target_path, metadata, output_format): """lupkg is a package manager for single files and/or (small) file-sets. lupkg uses an index of a repository of files to learn about all files that are available for installation. It also parses local folders to figure out whether any of those files are already installed, and if they are, whether the version available locally can be updated. If not configured otherwise, lupkg uses the default lupkg repo (XXX) as a source. This repo contains a community-curated selection of scripts and executables that don't have any dependencies and can be executed right after download. """ click.echo("") ctx.obj = {} ctx.obj["output_format"] = output_format if not index: index = "default" try: with CursorOff(): ctx.obj["index"] = LUPKG_INDEXES.get_index(index) except (Exception) as e: # raise click.ClickException(e) log.debug(e, exc_info=True) raise e ctx.obj["output"] = click_stdout if target_path is None: target_path = DEFAULT_TARGET_PATH ctx.obj["target_path"] = target_path if metadata is None: metadata = os.path.join( os.path.realpath(os.path.expanduser(target_path)), LUPKG_FOLDER_METADATA_FILE_NAME, ) ctx.obj["metadata_path"] = metadata @cli.command(name="update", short_help="update the package index") @click.pass_context def update(ctx): """Updates the selected index. Currently this command doesn't do much, as indexes are retrieved/read on the fly anyway. But this might make more sense later on, if indexes are started to be cached after download. """ index = ctx.obj["index"].get_name() # with CursorOff(): with Halo(text="updating root index", spinner="dots") as spinner: # click.echo("Updating indexes...") LUPKG_INDEXES.get_indexes_index(update=True) spinner.text = "updating index '{}'".format(index) # click.echo("Updating index '{}'...".format(index)) LUPKG_INDEXES.get_index(index, update=True) spinner.succeed("Index(es) updated.") # click.echo("Index(es) updated.") @cli.command(name="status", short_help="display installed package status") @click.option("--details", "-d", is_flag=True, help="display additional details") # @click.option( # "--unmanaged", # "-u", # is_flag=True, # help="display unmanaged package status instead of managed", # ) # @click.option( # "--all", # "-a", # is_flag=True, # help="display both managed and unmanaged package status", # ) # @click.option( # "--show-path", "-p", help="display path of installed packages", is_flag=True # ) @click.pass_context def status(ctx, details): """Displays the status of all installed packages, and their versions. """ # index = ctx.obj["index"] target = ctx.obj["target_path"] target = ensure_base_path(target) output_format = ctx.obj["output_format"] metadata = ctx.obj["metadata_path"] metadata = ensure_metadata_file(metadata) lupkg_path = LupkgPath(target, metadata_file=metadata) if output_format != "default": result = lupkg_path.pkg_list.render_tasklist() output(result, output_type=output_format) sys.exit() if not lupkg_path.get_pkg_names(): click.echo("No packages installed in path: '{}'".format(target)) sys.exit(0) else: click.secho("Folder: ", nl=False, bold=True) click.echo(target) click.echo() data = [] title = [colored("Package", attrs=["bold"]), colored("Version", attrs=["bold"])] if details: title.append(colored("Index", attrs=["bold"])) title.append(colored("Files", attrs=["bold"])) data.append(title) for pkg in sorted(lupkg_path.pkg_list.tasklist): pkg_name = pkg.pkg_name version = pkg.version is_up_to_date = pkg.is_up_to_date(check_files_exist=False) if is_up_to_date: version_colored = colored(version, "green") else: if version == LATEST: version_colored = colored(version, "yellow") else: version_colored = colored(version, "red") if not details: row = [pkg_name, version_colored] data.append(row) continue repo = pkg.index.get_name() files = pkg.get_file_names() files_colored = [] for f in files: if pkg.exists(f): files_colored.append(colored(f, "green")) else: files_colored.append(colored(f, "red")) row = [pkg_name, version_colored, repo, files_colored[0]] data.append(row) if len(files) > 1: for f in files_colored[1:]: row = ["", "", "", f] data.append(row) # click.echo(colored("Installed packages", attrs=["bold"])) # click.echo() table = SingleTable(data) table.outer_border = False table.inner_column_border = False table.padding_left = 0 table.padding_right = 2 # table.justify_columns[1] = "center" # table.justify_columns[2] = "center" click.echo(table.table) @cli.command(name="list", short_help="list all package names") @click.option("--details", "-d", help="display details about packages", is_flag=True) @click.argument("filter", metavar="FILTER", nargs=-1) @click.pass_context def list_command(ctx, filter, details): """Lists all package names (with optional details) that are listed in all of the configured package indexes. The result can be filtered by providing one or multiple strings as arguments to this command. Those strings act as simple filters, and only the packages whose names string-match all of the provided ones will be displayed. """ index = ctx.obj["index"] output_format = ctx.obj["output_format"] pkg_names = index.get_pkg_names() result = OrderedDict() for pkg_name in pkg_names: no_match = False for fl in filter: if fl not in pkg_name: no_match = True break if no_match: continue if details: pkg = index.get_pkg(pkg_name) if output_format != "default": result[pkg_name] = pkg.details() else: if output_format != "default": result[pkg_name] = readable(pkg.all_metadata, out=output_format) else: result[pkg_name] = readable(pkg.all_metadata, out="yaml") else: result[pkg_name] = None if output_format != "default": if details: output(result, output_type=output_format) else: output(list(result.keys()), output_type=output_format) else: for name, md in result.items(): if details: click.echo("Package: ", nl=False) click.secho(name, bold=True) click.echo("") click.echo(reindent(md, 2)) click.echo("") else: click.echo(name) @cli.command(name="metadata", short_help="displays repository metadata") @click.option( "--output-file", "-f", help="save to a file instead of printing to stdout", default=None, required=False, ) @click.pass_context def metadata(ctx, output_file): """Prints all index/index metadata for the selected (or default) lupkg index descriptions. """ index = ctx.obj["index"] format = ctx.obj["output_format"] if format == "default": format = "yaml" md_all = index.get_all_metadata() output = readable(md_all, out=format, safe=False) if output_file is None: click.echo(output) sys.exit(0) if not output.endswith("\n"): output = output + "\n" with open(output_file, "w") as f: f.write(output) sys.exit() @cli.command(name="show", short_help="displays information about a package") @click.option( "--filter", "-f", help="the provided arguments act as filters instead of exact package names", is_flag=True, ) @click.argument("name", metavar="PKG_NAME", nargs=-1) @click.pass_context def show(ctx, name, filter): """Displays information about a package. If the '--fiter' option is specified, the arguments passed to this command will act as filters, and only packages whose package names (string-)match all of them will be displayed. The list of displayed packages is the same as it would be when calling the 'list' command with (the same) arguments. """ index = ctx.obj["index"] output_format = ctx.obj["output_format"] if not filter: packages = name else: packages = [] for pn in index.get_pkg_names(): no_match = False for fl in name: if fl not in pn: no_match = True break if no_match: continue packages.append(pn) result = CommentedMap() for n in packages: pkg = index.get_pkg(n) if not pkg: log.info("No package with name: {}".format(n)) continue else: if output_format != "default": result[n] = pkg.details() else: result[n] = str(pkg) if output_format != "default": output(result, output_type=output_format) sys.exit() for name, md in result.items(): click.echo() click.echo(md) @cli.command(name="url", short_help="display download url for package") @click.option("--version", "-v", help="the version of the package, defaults to latest") @click.option("--details", "-d", help="display details each file/url", is_flag=True) @click.option( "--property", "-p", help="a key/value pair describing an additional property to be used to select the right version of this package (e.g. 'arch Linux64bit')", metavar="KEY VALUE", nargs=2, multiple=True, ) @click.option( "--template", is_flag=True, help="don't require properties when result is templated, show template string instead", ) @click.argument("name", metavar="PKG_NAME", nargs=1) @click.pass_context def url(ctx, name, version, property, template, details): """Displays the download url for the specified package. """ index = ctx.obj["index"] output_format = ctx.obj["output_format"] pkg = index.get_pkg(name) if not pkg: click.echo("No package with name '{}' found.".format(name)) sys.exit(1) properties = {} for p in property: properties[p[0]] = p[1] try: urls = pkg.get_url_details( version=version, properties=properties, ignore_template_errors=template ) except (LupkgMetadataException) as e: handle_metadata_exception(e) sys.exit(1) print(urls) if output_format != "default": if not details: output([u["url"] for u in urls.values()], output_type=output_format) else: output(urls, output_type=output_format) sys.exit(0) for file_name, u in urls.items(): if not details: click.echo(u["url"]) else: click.echo("File: ", nl=False) click.secho(file_name, bold=True) details_string = readable(u, out="yaml", indent=2) click.echo(details_string) @cli.command(name="install", short_help="installs one or several packages") @click.option("--version", "-v", help="the version of the package to install") @click.option( "--property", "-p", help="a key/value pair describing an additional property to be used to select the right version of this package (e.g. 'arch Linux64bit')", metavar="KEY VALUE", nargs=2, multiple=True, ) @click.option("--force", "-f", help="overwrite existing files", is_flag=True) @click.option( "--reinstall", "-r", help="reinstalls the package(s), even if the same version is already installed", is_flag=True, ) @click.option( "--everything", help="install all packages, this will only work with certain pkg repositories that don't require you to specify special properties for any packages", is_flag=True, ) @click.argument("name", metavar="PKG_NAME", nargs=-1) @click.pass_context def install(ctx, name, version, property, everything, force, reinstall): """Installs one or several packages. By default, the latest version of a package will be installed. If the target file already exists but no metadata is available for that file, you need to use the '--force' flag to overwrite the existing file. If the package was already installed by lupkg, lupkg won't do anything (not even update if applicable), unless you use the '--reinstall' flag. Depending on the lupkg repository used, it might be possible to use the '--everything' flag to install all files described in the index of a repository. This will only work if no extra properties are necessary to pick the right version of a file for a certain host system. """ target = ctx.obj["target_path"] target = ensure_base_path(target, True) metadata = ctx.obj["metadata_path"] metadata = ensure_metadata_file(metadata) index = ctx.obj["index"] output_format = ctx.obj["output_format"] if output_format != "default": DEFAULT_OUTPUT_CALLBACK.set_silent(True) if name and everything: click.echo() click.echo( "Specifying a package name with the '--everything' flag doesn't make sense. Doing nothing." ) sys.exit() if everything: name = list(index.get_available_packages()) log.debug("'--everything' flag specified, installing all pkgs: {}".format(name)) if (version is not None or (property is not None and property)) and len(name) > 2: click.echo( "Installing multiple packages not supported when specifying the '--version' and/or additional properties option." ) sys.exit(1) properties = {} for p in property: properties[p[0]] = p[1] if not name: if output_format == "default": click.echo() click.echo("No packages specified, doing nothing...") else: output([], output_type=output_format) sys.exit(0) lupkg_path = LupkgPath(target, metadata_file=metadata) result = [] result_files = [] with CursorOff(): for n in name: try: pkg = lupkg_path.get_pkg(n) if pkg is not None and not force: if output_format == "default": click.echo("- package '{}' already present".format(n)) continue if output_format == "default": click.echo("- installing package: {}".format(n)) install_result = lupkg_path.install_pkg( n, version=version, properties=properties, index=index, install_properties={"force": force}, ) if install_result: result.append(install_result) result_files.extend(install_result.get("installed_files", [])) except (LupkgProcessingException) as epe: click.echo("\nError installing package: {}".format(n)) click.echo(" => {}".format(epe.message)) sys.exit(1) except (LupkgMetadataException) as eme: click.echo("\nError installing package: ", nl=False) click.secho(n, bold=True) handle_metadata_exception(eme) if result_files: if output_format == "default": click.echo("\nInstalled:") for r in result_files: click.echo(" - {}".format(r)) else: output(result_files, output_type=output_format) else: if output_format != "default": output([], output_type=output_format) sys.exit(0) @cli.command(name="uninstall", short_help="uninstalls one or several packages") @click.option( "--everything", help="uninstalls every (managed) package for the selected profile", is_flag=True, ) @click.option("--yes", "-y", help="omit confirmation dialog", is_flag=True) @click.argument("name", metavar="PKG_NAME", nargs=-1) @click.pass_context def uninstall(ctx, name, everything, yes): """Uninstalls one or several packages. """ output_format = ctx.obj["output_format"] target_path = ctx.obj["target_path"] metadata_path = ctx.obj["metadata_path"] if name and everything: click.echo() click.echo( "Specifying a package name with the '--everything' flag doesn't make sense. Doing nothing." ) sys.exit(1) lupkg_path = LupkgPath(target_path, metadata_path) installed_pkg_names = lupkg_path.get_pkg_names() if everything: name = sorted(installed_pkg_names) log.debug("'--everything' flag specified, uninstalling pkgs: {}".format(name)) if not name: click.echo() click.echo("No packages specified, doing nothing...") sys.exit(1) not_installed = [] installed = [] for n in name: if n not in installed_pkg_names: not_installed.append(n) else: installed.append(n) if not_installed: if output_format == "default": click.echo("Not installed:") for n in not_installed: click.echo(" - {}".format(n)) click.echo() if len(installed) == 0: if output_format == "default": click.echo("No packages to uninstall, doing nothing...") else: output([], output_type=output_format) sys.exit(0) elif len(installed) == 1: if output_format == "default": click.echo("Uninstalling packge: {}".format(installed[0])) else: if output_format == "default": click.echo("Uninstalling packges:") for n in installed: click.echo(" - {}".format(n)) if output_format == "default" and not yes: click.echo("Continue? [Y/n] ", nl=False) c = click.getchar() if c != "Y" and c != "y": click.echo("\n\nDoing nothing.") sys.exit(0) all_deleted = [] with CursorOff(): for n in installed: if output_format == "default": click.echo() click.echo() click.echo("Uninstalling: {}...".format(n)) deleted_files = lupkg_path.uninstall_pkg(n) all_deleted.extend(deleted_files) if output_format == "default": click.echo("") click.echo("Deleted files:") for d in all_deleted: click.echo(" - {}".format(d)) click.echo("") else: output(all_deleted, output_type=output_format) @cli.command(name="upgrade", short_help="upgrade all installed packages") @click.option( "--no-latest", "-n", is_flag=True, help="don't upgrade packages where the exact version number is not known ('latest')", ) @click.option( "--reinstall", "-r", is_flag=True, help="(re-)install the package(s), even if it is already the latest version", ) @click.option("--yes", "-y", help="omit confirmation dialog", is_flag=True) @click.argument("name", metavar="PKG_NAME", nargs=-1) @click.pass_context def upgrade(ctx, name, no_latest, reinstall, yes): """Upgrades all installed packages to the latest version. By default, if a version number is not specified in a repository index, a package will be re-downloaded, as we can't be sure whether it has or hasn't been updated. """ target_path = ctx.obj["target_path"] metadata_path = ctx.obj["metadata_path"] output_format = ctx.obj["output_format"] lupkg_path = LupkgPath(target_path, metadata_file=metadata_path) installed_package_names = lupkg_path.get_pkg_names() if not name: name = sorted(installed_package_names) to_upgrade = OrderedDict() up_to_date = [] for pn in name: installed = lupkg_path.get_pkg(pn) if not installed: click.echo() click.echo("Package '{}' not installed or managed.".format(pn)) sys.exit(1) pkg = installed.pkg latest_available = pkg.latest_version_name skip = False if not reinstall: if installed.version != LATEST and installed.version == latest_available: up_to_date.append(pn) skip = True elif ( no_latest and installed.version == LATEST and latest_available == LATEST ): up_to_date.append(pn) skip = True else: to_upgrade[pn] = installed if skip: up_to_date.append(pn) log.info("Package '{}' up-to-date, doing nothing.".format(pn)) else: log.debug("Adding package '{}': {}".format(pn, installed)) if not to_upgrade: if output_format == "default": if not up_to_date: click.echo("No packages installed, nothing to do.") else: click.echo("Everything up to date, doing nothng.") else: return output([], output_type=output_format) sys.exit(0) if not yes and output_format == "default": click.echo("Upgrading packages: {}".format(" ".join(to_upgrade.keys()))) click.echo("\nContinue? [Y/n] ", nl=False) c = click.getchar() if c != "Y" and c != "y": click.echo("\nExiting.") sys.exit(0) click.echo() result = [] result_files = [] with CursorOff(): for n, pkg_item in to_upgrade.items(): if output_format == "default": click.echo() click.echo( "Upgrading package '{}': {} -> {}".format( n, pkg_item.version, pkg_item.pkg.latest_version_name ) ) click.echo() try: install_result = lupkg_path.install_pkg( n, version=LATEST, install_properties={"force": True} ) if install_result: result.append(install_result) result_files.extend(install_result.get("installed_files", [])) except (LupkgProcessingException) as epe: click.echo("\nError installing package: {}".format(n)) click.echo(" => {}".format(epe.message)) sys.exit(1) except (LupkgMetadataException) as eme: click.echo("\nError installing package: ", nl=False) click.secho(n, bold=True) handle_metadata_exception(eme) if result_files: if output_format == "default": click.echo("\nInstalled:") for r in result_files: click.echo(" - {}".format(r)) else: return output(result_files, output_type=output_format) else: if output_format == "default": click.echo("Nothing to do.") else: return output([], output_type=output_format) if __name__ == "__main__": sys.exit(cli()) # pragma: no cover