"""
CONTRACT (part of API)
======================
Core interfaces that should be changed WITH CAREFUL as those are parts of API.
Any breaking change there requires to bump RKD major version (see: Semantic Versioning)
"""
from abc import abstractmethod, ABC as AbstractClass, ABC
from typing import Dict, List, Union, Optional, Type
from argparse import ArgumentParser
from .inputoutput import IO
from ..exception import UndefinedEnvironmentVariableUsageError
from ..exception import EnvironmentVariableNotUsed
from ..exception import MissingInputException
from ..exception import EnvironmentVariableNameNotAllowed
from ..taskutil import TaskUtilities
from .temp import TempManager
from .inputoutput import SystemIO, ReadableStreamType
def env_to_switch(env_name: str) -> str:
return '--' + env_name.replace('_', '-').lower()
class ArgumentEnv(object):
"""Represents an environment variable that should provide a value to an argparse switch
Note: There is a list of reserved environment variables, that cannot be used. See ArgumentEnv.RESERVED_VARS
"""
RESERVED_VARS = ['PATH', 'PWD', 'LANG', 'DISPLAY', 'SHELL', 'SHLVL', 'HOME', 'EDITOR']
name: str
default: str
switch: str
def __init__(self, name: str, switch: str = '', default: str = ''):
self.name = name
self.default = default
self.switch = switch if switch else env_to_switch(name)
self._validate()
def _validate(self):
if self.name in self.RESERVED_VARS:
raise EnvironmentVariableNameNotAllowed(self.name)
class TaskDeclarationInterface(AbstractClass):
@abstractmethod
def to_full_name(self):
pass
@abstractmethod
def get_args(self) -> List[str]:
pass
@abstractmethod
def get_task_to_execute(self): # -> TaskInterface:
pass
@abstractmethod
def to_list(self) -> list:
pass
@abstractmethod
def get_env(self):
pass
@abstractmethod
def get_group_name(self) -> str:
pass
@abstractmethod
def get_task_name(self) -> str:
pass
@abstractmethod
def get_description(self) -> str:
pass
@abstractmethod
def get_full_description(self) -> str:
pass
@abstractmethod
def format_task_name(self, name: str) -> str:
pass
@property
def is_internal(self) -> bool:
return False
@abstractmethod
def get_input(self) -> ReadableStreamType:
pass
class GroupDeclarationInterface(AbstractClass):
@abstractmethod
def get_declarations(self) -> Dict[str, TaskDeclarationInterface]:
pass
@abstractmethod
def get_group_name(self) -> str:
pass
@abstractmethod
def get_task_name(self) -> str:
pass
@abstractmethod
def to_full_name(self):
pass
@abstractmethod
def get_description(self) -> str:
pass
@abstractmethod
def format_task_name(self, name: str) -> str:
pass
class ContextInterface(AbstractClass):
directories: []
io: SystemIO
@abstractmethod
def merge(cls, first, second):
pass
@abstractmethod
def compile(self) -> None:
pass
def find_task_by_name(self, name: str) -> Union[TaskDeclarationInterface, GroupDeclarationInterface]:
pass
def find_all_tasks(self) -> Dict[str, Union[TaskDeclarationInterface, GroupDeclarationInterface]]:
pass
class ExecutorInterface(AbstractClass):
@abstractmethod
def execute(self, scheduled_declaration, task_num: int):
pass
[docs]class ExecutionContext(object):
"""
Defines which objects could be accessed by Task. It's a scope of a single task execution.
"""
__declaration: TaskDeclarationInterface
parent: Union[GroupDeclarationInterface, None]
args: Dict[str, str]
env: Dict[str, str]
__ctx: ContextInterface
executor: ExecutorInterface
_allow_mutating_globals: bool
# List of arguments definitions populated by Argparse (with limited parameters supported)
# Read about "traced arguments"
defined_args: Dict[str, dict]
def __init__(self, declaration: TaskDeclarationInterface,
parent: Union[GroupDeclarationInterface, None] = None, args: Dict[str, str] = {},
env: Dict[str, str] = {},
defined_args: Dict[str, dict] = {}):
self.__declaration = declaration
self.parent = parent
self.args = args
self.env = env
self.defined_args = defined_args
self._allow_mutating_globals = declaration.to_full_name() == ':init'
[docs] def get_env(self, name: str, switch: str = '', error_on_not_used: bool = False):
"""Get environment variable value"""
return self.__declaration.get_task_to_execute().internal_getenv(name, self.env, switch=switch,
error_on_not_used=error_on_not_used)
[docs] def get_arg_or_env(self, name: str) -> Union[str, None]:
"""Provides value of user input
Usage:
get_arg_or_env('--file-path') resolves into FILE_PATH env variable, and --file-path switch
(file_path in argparse)
Behavior:
When user provided explicitly switch eg. --history-id, then it's value will be taken in priority.
If switch --history-id was not used, but user provided HISTORY_ID environment variable,
then it will be considered.
If no switch provided and no environment variable provided, but a switch has
default value - it would be returned.
If no switch provided and no environment variable provided, the switch does not have default,
but environment variable has a default value defined, it would be returned.
When the --switch has default value (user does not use it, or user sets it explicitly to default value),
and environment variable SWITCH is defined, then environment variable would be taken.
Explicit environment variables definitions
------------------------------------------
From RKD 2.1 the environment variable names can be mapped to any ArgParse switch.
Below example maps "COMMAND" environment variable to "--cmd" switch.
.. code:: python
def get_declared_envs(self) -> Dict[str, Union[str, ArgumentEnv]]:
return {
'COMMAND': ArgumentEnv(name='COMMAND', switch='--cmd', default='')
}
Raises:
MissingInputException: When no switch and no environment variable was provided, then an exception is thrown.
"""
env_name = name[2:].replace('-', '_').upper()
env_value = None
try:
env_value = self.get_env(env_name, switch=name, error_on_not_used=True)
is_env_variable_defined = True
except EnvironmentVariableNotUsed:
is_env_variable_defined = False
# case 1: a --switch was used
# case 2: --switch as default value set, environment variable is set, then pick env
try:
value = self.get_arg(name)
# https://github.com/riotkit-org/riotkit-do/issues/23
# When --switch has same value as default, and environment variable is not empty, then env has priority
if self.defined_args[name]['default'] == value and is_env_variable_defined:
return env_value
if value is not None:
return value
except KeyError:
pass
# case: No --switch defined, no ENV defined
if not is_env_variable_defined:
raise MissingInputException(name, env_name)
# case: No --switch defined, ENV defined
return env_value
[docs] def get_arg(self, name: str) -> Optional[str]:
"""Get argument or option
Usage:
ctx.get_arg('--name') # for options
ctx.get_arg('name') # for arguments
Raises:
KeyError when argument/option was not defined
Returns:
Actual value or default value
"""
try:
arg_name = name[2:].replace('-', '_')
return self.args[arg_name]
except KeyError:
try:
return self.args[name]
except KeyError:
raise MissingInputException(name, '')
def get_input(self) -> Optional[ReadableStreamType]:
return self.__declaration.get_input()
[docs] def can_mutate_globals(self) -> bool:
"""
Is task having a special permissions to mutate globals such as OS environment
:return:
"""
return self._allow_mutating_globals
def get_args(self) -> Dict[str, str]:
return self.args
[docs]class TaskInterface(TaskUtilities):
_io: IO
_ctx: ContextInterface
_executor: ExecutorInterface
temp: TempManager
_internal: bool
_extended_from: Type['TaskInterface']
def internal_inject_dependencies(self, io: IO, ctx: ContextInterface = None,
executor: ExecutorInterface = None, temp_manager: TempManager = None):
"""""" # sphinx: skip
self._io = io
self._ctx = ctx
self._executor = executor
self.temp = temp_manager
[docs] def copy_internal_dependencies(self, task):
"""
Allows to execute a task-in-task, by copying dependent services from one task to other task
"""
task.internal_inject_dependencies(self._io, self._ctx, self._executor, self.temp)
[docs] @abstractmethod
def get_name(self) -> str:
"""
Task name eg. ":sh"
"""
pass
[docs] @abstractmethod
def get_group_name(self) -> str:
"""
Group name where the task belongs eg. ":publishing", can be empty.
"""
pass
[docs] def get_become_as(self) -> str:
"""
User name in UNIX/Linux system, optional.
When defined, then current task will be executed as this user (WARNING: a forked process would be started)
"""
return ''
[docs] def should_fork(self) -> bool:
"""
Decides if task should be ran in a separate Python process (be careful with it)
"""
return self.get_become_as() != ''
def get_description(self) -> str:
return ''
# ====== LIFECYCLE METHODS
[docs] @abstractmethod
def execute(self, context: ExecutionContext) -> bool:
"""
Executes a task. True/False should be returned as return
"""
pass
# ====== LIFECYCLE METHODS ENDS
[docs] def get_full_name(self):
"""
Returns task full name, including group name
"""
return self.get_group_name() + self.get_name()
[docs] @classmethod
def get_declared_envs(cls) -> Dict[str, Union[str, ArgumentEnv]]:
"""
Dictionary of allowed envs to override: KEY -> DEFAULT VALUE
All environment variables fetched from the ExecutionContext needs to be defined there.
Declared values there are automatically documented in --help
.. code:: python
@classmethod
def get_declared_envs(cls) -> Dict[str, Union[str, ArgumentEnv]]:
return {
'PHP': ArgumentEnv('PHP', '--php', '8.0-alpine'),
'IMAGE': ArgumentEnv('IMAGE', '--image', 'php')
}
"""
return {}
def internal_normalized_get_declared_envs(self) -> Dict[str, ArgumentEnv]:
"""""" # sphinx: ignore
# Method used internally, supports conversion of values from primitives to ArgumentEnv
# as developers can specify env variables in get_declared_envs() as primitives or as ArgumentEnv
# so, there we normalize everything
#
# WRAPPER OVER INTERFACE METHOD: get_declared_envs() should be defined by developer
envs = {}
for name, value in self.get_declared_envs().items():
if not isinstance(value, ArgumentEnv):
value = ArgumentEnv(name=name, default=value)
envs[name] = value
return envs
def internal_getenv(self, env_name: str, envs: Dict[str, str], switch: str = '',
error_on_not_used: bool = False) -> str:
"""""" # sphinx: ignore
declared_envs = self.internal_normalized_get_declared_envs()
# find env by switch, when env was defined to be non-standard name
if switch:
for env in declared_envs.values():
if env.switch == switch:
self.io().debug('Resolved environment "%s" from switch "%s"' % (env_name, env.switch))
env_name = env.name
if env_name not in declared_envs:
raise UndefinedEnvironmentVariableUsageError(
(
'Attempt to use not declared environment variable "%s". ' +
'Please report the problem to the maintainers of this task, not to RKD (unless it is a core task)'
)
% env_name
)
# return default value
if env_name not in envs:
if error_on_not_used:
raise EnvironmentVariableNotUsed(env_name)
return declared_envs[env_name].default
return envs[env_name]
def is_silent_in_observer(self) -> bool:
"""""" # sphinx: skip
return False
[docs] def io(self) -> IO:
"""Gives access to Input/Output object"""
return self._io
[docs] def py(self, code: str = '', become: str = None, capture: bool = False,
script_path: str = None, arguments: str = '') -> Union[str, None]:
"""Executes a Python code in a separate process
NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
"""
return super().py(
code=code, become=become, capture=capture, script_path=script_path, arguments=arguments
)
[docs] def sh(self, cmd: str, capture: bool = False, verbose: bool = False, strict: bool = True,
env: dict = None, use_subprocess: bool = False) -> Union[str, None]:
"""Executes a shell script in bash. Throws exception on error.
To capture output set capture=True
NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
"""
return super().sh(
cmd=cmd, capture=capture, verbose=verbose, strict=strict, env=env
)
[docs] def exec(self, cmd: str, capture: bool = False, background: bool = False) -> Union[str, None]:
"""Starts a process in shell. Throws exception on error.
To capture output set capture=True
NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
"""
return super().exec(cmd=cmd, capture=capture, background=background)
[docs] def rkd(self, args: list, verbose: bool = False, capture: bool = False) -> str:
"""Spawns an RKD subprocess
NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
"""
return super().rkd(args=args, verbose=verbose, capture=capture)
[docs] def silent_sh(self, cmd: str, verbose: bool = False, strict: bool = True,
env: dict = None) -> bool:
"""sh() shortcut that catches errors and displays using IO().error_msg()
NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
"""
return super().silent_sh(cmd=cmd, verbose=verbose, strict=strict, env=env)
def __str__(self):
return 'Task<{name}, object_id={id}, extended_from={extends}>'.format(
name=self.get_full_name(),
id=id(self),
extends=self.extends_task()
)
@property
def is_internal(self) -> bool:
return False
[docs] def extends_task(self):
"""
Provides information if this Task has a Parent Task
:return:
"""
try:
extends_from = self._extended_from.__module__ + '.' + self._extended_from.__name__
except AttributeError:
extends_from = 'TaskInterface'
return extends_from
def __deepcopy__(self, memodict={}):
"""
Do not allow copy.deepcopy() to copy this object. Declaration can be doubled, TaskInterface implementation not.
:param memodict:
:return:
"""
return self
[docs]class ExtendableTaskInterface(TaskInterface, ABC):
[docs] def inner_execute(self, ctx: ExecutionContext) -> bool:
"""
Method that can be executed inside execute() - if implemented.
Use cases:
- Allow child Task to inject code between e.g. database startup and database shutdown to execute some
operations on the database
:param ctx:
:return:
"""
pass
def get_configuration_attributes(self) -> List[str]:
return []
[docs] def compile(self, event: 'CompilationLifecycleEvent') -> None:
"""
Execute code after all tasks were collected into a single context
"""
pass
class ArgparseArgument(object):
"""Represents a add_argument(*args, **kwargs)"""
args: list
kwargs: dict
def __init__(self, args: list = None, kwargs: dict = None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
self.args = args
self.kwargs = kwargs
class PipelinePartInterface(object):
"""
Partial element of a Pipeline - string that is being converted to GroupDeclaration
"""
@abstractmethod
def to_pipeline_part(self) -> List[str]:
pass
class MultiStepLanguageExtensionInterface(ABC):
@abstractmethod
def with_predefined_details(self, code: str, name: str, step_num: int):
pass