Source code for glia.resolution

"""
Resolves package, mechanism and variant constraints into asset names that can be requested
from the Glia library.
"""

from contextlib import contextmanager

from ._fs import read_preferences, write_preferences
from .exceptions import (
    AssetLookupError,
    NoMatchesError,
    TooManyMatchesError,
    UnknownAssetError,
)


[docs] class IndexEntry: def __init__(self, name): self.name = name self.items = [] def __call__(self): return self.items def __iter__(self): return iter(self.items) def __len__(self): return len(self.items)
[docs] def append(self, asset): self.items.append(asset)
class _NoPreferenceError(Exception): pass
[docs] class Resolver: def __init__(self, manager): self._manager = manager self.global_preferences = read_preferences() self.local_preferences = {} self.__preference_stack = {} self.__next_stack_id = 0 self.construct_index()
[docs] def construct_index(self): packages = self._manager.packages self.index = {} self._reverse_lookup = {} for pkg in packages: for mod in pkg.mods: if mod.dialect is not None and mod.dialect != "neuron": continue name = mod.asset_name if name not in self.index: self.index[name] = IndexEntry(name) self.index[name].append(mod) self._reverse_lookup[mod.mod_name] = mod
[docs] def resolve(self, asset_name, pkg=None, variant=None): if isinstance(asset_name, tuple): try: variant = asset_name[1] pkg = asset_name[2] except IndexError: pass asset_name = asset_name[0] if asset_name not in self.index: raise UnknownAssetError( f"Selection could not be resolved: Asset '{asset_name}' not found." ) # Try resolving with preference resolved = self.resolve_preference(asset_name, pkg=pkg, variant=variant) if resolved is not None: return resolved.mod_name # If there was no preference, or if the preference couldn't resolve, try resolving # without preference. resolved = self._get_resolved(asset_name, pkg, variant) if not resolved: raise NoMatchesError( f"Selection {pkg}.{asset_name}.{variant} could not be resolved.", pkg, variant, ) elif len(resolved) > 1: return self._resolve_multi(resolved, asset_name, pkg, variant).mod_name else: return resolved[0].mod_name
[docs] def resolve_preference(self, asset_name, pkg=None, variant=None): # Combine global, script, context & local preferences to obtain # package and variant. try: pkg, variant = self._get_final_pv(asset_name, pkg, variant) except _NoPreferenceError: # No preferences found. return None # Resolve the preferred package and variant resolved = self._get_resolved(asset_name, pkg, variant) if len(resolved) == 1: # Exactly 1 candidate found. Return it. return resolved[0] elif len(resolved) > 1: # Multiple candidates found, check if one obvious candidate stands out, # if not throw TooManyMatchesError. return self._resolve_multi(resolved, asset_name, pkg, variant) else: # No candidates found. return None
[docs] def lookup(self, mod_name): if mod_name not in self._reverse_lookup: raise AssetLookupError(f"No mod with name '{mod_name}' found") else: return self._reverse_lookup[mod_name]
def _get_final_pv(self, asset_name, pkg, variant): """ Get final package and variant preference. This function fetches all preferences and checks whether there's specific package or variant preferences. If not it checks the general package or variant preferences not specific to the asset. """ # Fetch global, script & context preferences preferences = self._preferences() if asset_name not in preferences: # No specific asset preferences found, but maybe general package or variant # preferences apply? if self._has_general_preferences(preferences): # Get the general package and variant preferences p_pkg, p_variant = self._general_preferences(preferences, pkg, variant) if pkg is None: pkg = p_pkg if variant is None: variant = p_variant else: # No preferences of any sort found. raise _NoPreferenceError() else: preference = preferences[asset_name] if pkg is None and "package" in preference: pkg = preference["package"] if variant is None and "variant" in preference: variant = preference["variant"] return pkg, variant def _get_resolved(self, asset_name, pkg, variant): mods = iter(self.index[asset_name]) if pkg: mods = filter(lambda m: m.pkg_name == pkg, mods) if variant: mods = filter(lambda m: m.variant == variant, mods) return list(mods) def _has_general_preferences(self, preferences): return ("__pkg" in preferences and preferences["__pkg"]) or ( "__variant" in preferences and preferences["__variant"] ) def _general_preferences(self, preferences, pkg, variant): return ( pkg or (("__pkg" in preferences and preferences["__pkg"]) or None), variant or (("__variant" in preferences and preferences["__variant"]) or None), ) def _resolve_multi(self, resolved, asset_name, pkg, variant): # If all candidates are from the same package, and 1 is the default variant # then return that default variant if all(m.pkg_name == resolved[0].pkg_name for m in resolved) and any( m.variant == "0" for m in resolved ): return next(m for m in resolved if m.variant == "0") selection = asset_name if pkg: selection = pkg + "." + asset_name if variant: selection += f" ({variant})" raise TooManyMatchesError( f"Selection could not be resolved, too many matches for {selection}:\n * " + "\n * ".join(r.mod_name for r in resolved) + "\n Try specifying a package or variant", list(r.mod_name for r in resolved), asset_name, pkg, variant, )
[docs] def has_preference(self, asset_name): return asset_name in self._preferences()
[docs] def set_preference(self, asset_name, glbl=False, pkg=None, variant=None): preference = {} if pkg is not None: preference["package"] = pkg if variant is not None: preference["variant"] = variant if glbl: self.global_preferences[asset_name] = preference write_preferences(self.global_preferences) else: self.local_preferences[asset_name] = preference
[docs] @contextmanager def preference_context(self, assets=None, pkg=None, variant=None): if assets is None: assets = {} if pkg is not None: assets["__pkg"] = pkg if variant is not None: assets["__variant"] = variant id = self._stack_preference_context(assets) try: yield finally: self._pop_preference_context(id)
def _stack_preference_context(self, assets): id = self.__next_stack_id self.__next_stack_id += 1 self.__preference_stack[id] = assets return id def _pop_preference_context(self, id): del self.__preference_stack[id] def _preferences(self): pref = self.global_preferences.copy() pref.update(self.local_preferences) pref = self._apply_preference_stack(pref) return pref def _apply_preference_stack(self, base_pref): def combine(asset, old, new): if asset == "__pkg" or asset == "__variant": return new or old if "unset" in new and new["unset"]: old["package"] = None old["variant"] = None if "package" in new and new["package"] is not None: old["package"] = new["package"] if "variant" in new and new["variant"] is not None: old["variant"] = new["variant"] return old compiled_preferences = {} for stacked_preference in self.__preference_stack.values(): for asset, preference in stacked_preference.items(): if asset not in compiled_preferences: if asset == "__pkg" or asset == "__variant": compiled_preferences[asset] = preference else: compiled_preferences[asset] = {"package": None, "variant": None} compiled_preferences[asset] = combine( asset, compiled_preferences[asset], preference ) base_pref.update(compiled_preferences) return base_pref