Source code for rkd.core.api.syntax


"""
SYNTAX (part of API)
====================

Classes used in a declaration syntax in makefile.py

"""
import sys
import shlex
from dataclasses import dataclass
from types import FunctionType
from typing import List, Dict, Optional, Union
from copy import deepcopy
from uuid import uuid4
from .contract import TaskDeclarationInterface, ExtendableTaskInterface, PipelinePartInterface
from .contract import GroupDeclarationInterface
from .contract import TaskInterface
from .inputoutput import get_environment_copy, ReadableStreamType
from ..argparsing.model import ArgumentBlock
from ..exception import DeclarationException


this = sys.modules[__name__]

if 'DECLARED_TASKS_COUNT' not in dir(this):
    this.DECLARED_TASKS_COUNT = 0


def parse_path_into_subproject_prefix(path: str) -> str:
    """
    Logic for concatenating of subproject path from filesystem path as input

    :param path:
    :return:
    """

    return ':' + path.strip().strip('/').replace('/', ':').rstrip(': /')


def merge_workdir(task_workdir: Optional[str], subproject_workdir: Optional[str]) -> str:
    """
    Pure domain method that decides how the workdir merge logic should look like
    :param task_workdir:
    :param subproject_workdir:
    :return:
    """

    if not task_workdir:
        task_workdir = ''

    if not subproject_workdir:
        return task_workdir

    if task_workdir.startswith('/'):
        return task_workdir

    return subproject_workdir + '/' + task_workdir


[docs]class TaskDeclaration(TaskDeclarationInterface): """ Task Declaration is a DECLARED USAGE of a Task (instance of TaskInterface) Examples of usage: .. code:: python TaskDeclaration(MyNiceTask(), env={'SOME': 'thing'}, workdir='/tmp', name=':custom:task:name') """ _task: TaskInterface _env: Dict[str, str] # environment at all _user_defined_env: list # list of env variables overridden by user _args: List[str] _unique_id: str _workdir: Optional[str] # current working directory (eg. combination of subproject + task) _task_workdir: Optional[str] # original task working directory as defined in task _project_name: str _is_internal: Optional[bool] # task is not listed on :tasks _enforced_task_full_name: Optional[str] def __init__(self, task: TaskInterface, env: Dict[str, str] = None, args: List[str] = None, workdir: Optional[str] = None, internal: Optional[bool] = None, name: Optional[str] = None): if env is None: env = {} if args is None: args = [] if not isinstance(task, TaskInterface): try: has_taskinterface_subclass = list(filter(lambda cls: issubclass(cls, TaskInterface), task.__bases__)) except AttributeError: raise DeclarationException( 'Invalid class: TaskDeclaration needs to take TaskInterface as task argument. ' f'Got {type(task).__name__}' ) if not has_taskinterface_subclass: raise DeclarationException( 'Invalid class: TaskDeclaration needs to take TaskInterface as task argument. ' f'Got {type(task).__name__}' ) self._unique_id = uuid4().hex self._task = task self._env = merge_env(env) self._args = args self._workdir = workdir self._task_workdir = workdir self._user_defined_env = list(env.keys()) self._project_name = '' self._is_internal = internal self._enforced_task_full_name = name def to_full_name(self): full_name = self._enforced_task_full_name if self._enforced_task_full_name else self._task.get_full_name() if self._project_name: return self._project_name + full_name return full_name def with_new_name(self, task_name: str, group_name: str) -> 'TaskDeclaration': copy = self.clone() copy._enforced_task_full_name = task_name if group_name: copy._enforced_task_full_name += ':' + group_name return copy def as_internal_task(self) -> 'TaskDeclaration': copy = self.clone() copy._is_internal = True return copy def as_part_of_subproject(self, workdir: str, subproject_name: str) -> 'TaskDeclaration': copy = self.clone() copy._workdir = merge_workdir(copy._task_workdir, workdir) copy._project_name = subproject_name return copy def clone(self) -> 'TaskDeclaration': """Clone securely the object. There fields shared across objects as references could be kept""" copy = deepcopy(self) copy._unique_id = uuid4().hex return copy def get_args(self) -> List[str]: return self._args def get_task_to_execute(self) -> Union[TaskInterface, ExtendableTaskInterface]: return self._task def to_list(self) -> list: return [self] def get_env(self): return self._env def get_list_of_user_overridden_envs(self) -> list: """ Lists environment variables which were overridden by user """ return self._user_defined_env def get_user_overridden_env(self) -> Dict[str, str]: """ Returns key->value of environment defined by user (not inherited from OS) :return: """ return {k: v for k, v in self._env.items() if k in self.get_list_of_user_overridden_envs()} def get_group_name(self) -> str: split = self.to_full_name().split(':') return split[1] if len(split) >= 3 else '' def get_task_name(self) -> str: split = self.to_full_name().split(':') if len(split) >= 3: return split[2] try: return split[1] except KeyError: return self.to_full_name() def get_description(self) -> str: task = self.get_task_to_execute() if task.get_description(): return task.get_description() return task.__doc__.strip().split("\n")[0] if task.__doc__ else '' def get_full_description(self) -> str: task = self.get_task_to_execute() if task.get_description(): return task.get_description() return task.__doc__.strip() if task.__doc__ else '' @staticmethod def parse_name(name: str) -> tuple: split = name.split(':') task_name = ":" + split[-1] group = ":".join(split[:-1]) return task_name, group def format_task_name(self, name: str) -> str: return self.get_task_to_execute().format_task_name(name) def get_unique_id(self) -> str: """ Unique ID of a declaration is a TEMPORARY ID created during runtime to distinct even very similar declarations """ return self._unique_id @property def workdir(self) -> str: if not self._workdir: return '.' return self._workdir @property def is_internal(self) -> bool: """ Is task considered internal? Should it be unlisted on a list of tasks for end-user? """ if self._is_internal is not None: return self._is_internal return self._task.is_internal def get_input(self) -> Optional[ReadableStreamType]: return None def __str__(self): return 'TaskDeclaration<%s>' % self.to_full_name()
class DeclarationScheduledToRun(object): """ Declaration scheduled to be executed :internal: todo: Move """ declaration: TaskDeclaration runtime_arguments: List[str] parent: Optional['GroupDeclaration'] created_task_num: int _env: dict _user_overridden_env: list _unique_id: str _blocks: List[ArgumentBlock] def __init__(self, declaration: TaskDeclaration, runtime_arguments: List[str], parent: Optional['GroupDeclaration'] = None, env: Dict[str, any] = None, user_overridden_env: List[str] = None): this.DECLARED_TASKS_COUNT += 1 self.declaration = declaration self.runtime_arguments = runtime_arguments self.parent = parent self._env = env if env else {} self._user_overridden_env = user_overridden_env if user_overridden_env else [] self.created_task_num = this.DECLARED_TASKS_COUNT self._unique_id = uuid4().hex self._blocks = [] @property def args(self) -> List[str]: return self.declaration.get_args() + self.runtime_arguments @property def env(self) -> Dict[str, any]: env = {} env.update(self.declaration.get_env()) env.update(self._env) return env @property def task_num(self) -> int: return self.created_task_num def connect_block(self, block: ArgumentBlock): """ Append a relation between Block <-> Declaration scheduled for execution :param block: :return: """ block.register_resolved_task(self) self._blocks.append(block) def unique_id(self) -> str: return self._unique_id def __str__(self) -> str: return f'Task<{self.declaration.to_full_name()}, #{self.task_num}>' def debug(self) -> str: return f'Task<{self.declaration.to_full_name()}, #{self.task_num}, ' \ f'declaration_id={self.declaration.get_unique_id()}>' @property def repr_as_invoked_task(self) -> str: """ Shows how the Task was called e.g. :sh -c "echo 'The Conquest of Bread, Chapter 2: Well-Being for All'" :return: """ return (self.declaration.to_full_name() + ' ' + ' '.join(self.args)).strip() def get_blocks_ordered_by_children_to_parent(self) -> List[ArgumentBlock]: """ Order: First are children, then are parents {@first} {@second} {/@} {/@} Why: Because the CLOSING ORDER is important, it's like a COMMIT, finalization, closed blocks are collected as soon as they are closed Will return: 1. second 2. first :return: """ return list(self._blocks) @property def blocks(self) -> List[ArgumentBlock]: return self._blocks def count_non_empty_blocks(self) -> int: return len( list(filter( lambda block: not block.is_default_empty_block, self.blocks )), ) class DeclarationBelongingToPipeline(DeclarationScheduledToRun): """ Task declared inside a Pipeline :internal: todo: Move """ def append(self, runtime_arguments: List[str], env: Dict[str, str], user_overridden_env: List[str]): """ Mutate parameters in cases, where the Task is wrapped multiple times in eg. pipelines, nested blocks :param runtime_arguments: :param env: :param user_overridden_env: :return: """ self.runtime_arguments += runtime_arguments self._env.update(env) self._user_overridden_env += user_overridden_env class ExtendedTaskDeclaration(object): """ Declaration for a task that extends other task using a function-like syntax This is a factory class for the TaskDeclaration """ func: Union[FunctionType, any] declaration_args: Dict[str, any] name: str def __init__(self, task: Union[FunctionType, any], env: Dict[str, str] = None, args: List[str] = None, workdir: Optional[str] = None, internal: Optional[bool] = None, name: Optional[str] = None): """ NOTICE: Should keep the same interface as TaskDeclaration :param task: :param env: :param args: :param workdir: :param internal: :param name: """ self.func = task self.name = name self.declaration_args = { 'env': env, 'args': args, 'workdir': workdir, 'internal': internal, 'name': name } def create_declaration(self, task: TaskInterface, stdin: Optional[FunctionType] = None): """ To not create dependencies from TaskFactory in the API the job to create a task from function is delegated to later layer :param task: :param stdin: :return: """ args = self.declaration_args args['task'] = task declaration = TaskDeclaration( **args ) if stdin: declaration.get_input = stdin return declaration class GroupDeclaration(GroupDeclarationInterface): """ Internal DTO: Processed definition of TaskAliasDeclaration into TaskDeclaration """ _name: str _declarations: List[DeclarationBelongingToPipeline] _description: str def __init__(self, name: str, declarations: List[DeclarationBelongingToPipeline], description: str): self._name = name self._declarations = declarations self._description = description def get_declarations(self) -> List[DeclarationBelongingToPipeline]: return self._declarations def get_name(self) -> str: return self._name def get_group_name(self) -> str: split = self._name.split(':') return split[1] if len(split) >= 3 else '' def get_task_name(self) -> str: split = self._name.split(':') if len(split) >= 3: return split[2] try: return split[1] except KeyError: return self._name def to_full_name(self): return self.get_name() def get_description(self) -> str: return self._description def format_task_name(self, name: str) -> str: return name @property def is_internal(self) -> bool: return False
[docs]class Pipeline(object): """ Task Caller Has a name like a Task, but itself does not do anything than calling other tasks in selected order """ _name: str _arguments: List[str] _env: Dict[str, str] _user_defined_env: list # list of env variables overridden by user _description: str _workdir: str _project_name: str def __init__(self, name: str, to_execute: List[Union[str, PipelinePartInterface]], env: Dict[str, str] = None, description: str = ''): if env is None: env = {} self._name = name self._arguments = self._resolve_pipeline_parts(to_execute) self._env = merge_env(env) self._user_defined_env = list(env.keys()) self._description = description self._workdir = '' self._project_name = '' def get_name(self): if self._project_name: return self._project_name + self._name return self._name def get_arguments(self) -> List[str]: return self._arguments def get_env(self) -> Dict[str, str]: return self._env def get_user_overridden_envs(self) -> list: """ Lists environment variables which were overridden by user """ return self._user_defined_env def get_description(self) -> str: return self._description def _clone(self) -> 'Pipeline': """Clone securely the object. There fields shared across objects as references could be kept""" return deepcopy(self) def as_part_of_subproject(self, workdir: str, subproject_name: str) -> 'Pipeline': copy = self._clone() copy._workdir = merge_workdir(copy._workdir, workdir) copy._project_name = subproject_name return copy @property def workdir(self) -> str: return self._workdir @property def project_name(self) -> str: return self._project_name def is_part_of_subproject(self) -> bool: return isinstance(self._project_name, str) and len(self._project_name) > 1 @staticmethod def _resolve_pipeline_parts(parts: List[Union[str, PipelinePartInterface]]) -> List[str]: resolved = [] for part in parts: if isinstance(part, PipelinePartInterface): resolved += part.to_pipeline_part() else: resolved.append(part) return resolved def __str__(self) -> str: return f'Pipeline<{self.get_name()}>'
[docs]class TaskAliasDeclaration(Pipeline): """ Deprecated: Name will be removed in RKD 6.0 """
[docs]class PipelineTask(PipelinePartInterface): """ Represents a single task in a Pipeline .. code:: python from rkd.core.api.syntax import Pipeline PIPELINES = [ Pipeline( name=':build', to_execute=[ Task(':server:build --with-bluetooth'), Task(':client:build', '--with-bluetooth') ] ) ] """ task_args: List[str] def __init__(self, task: str, *task_args): if not task_args: self.task_args = shlex.split(task) else: self.task_args = [task] + list(task_args) def to_pipeline_part(self) -> List[str]: return self.task_args
[docs]@dataclass class PipelineBlock(PipelinePartInterface): """ Represents block of tasks Example of generated block: {@retry 3} :some-task {/@} .. code:: python from rkd.core.api.syntax import Pipeline, PipelineTask as Task, PipelineBlock as Block, TaskDeclaration Pipeline( name=':error-handling-example', description=':notify should be invoked after "doing exit" task, and execution of a BLOCK should be interrupted', to_execute=[ Task(':server:build'), Block(error=':notify -c "echo \'Build failed\'"', retry=3, tasks=[ Task(':docs:build', '--test'), Task(':sh', '-c', 'echo "doing exit"; exit 1'), Task(':client:build') ]), Task(':server:clear') ] ) """ tasks: List[PipelineTask] retry: Optional[int] = None retry_block: Optional[int] = None error: Optional[str] = None rescue: Optional[str] = None def __init__(self, tasks: List[PipelineTask], retry: Optional[int] = None, retry_block: Optional[int] = None, error: Optional[str] = None, rescue: Optional[str] = None): self.tasks = tasks self.retry = retry self.retry_block = retry_block self.error = error self.rescue = rescue if not retry and not retry_block and not error and not rescue: # @todo: Better exception raise Exception('Block needs to have defined at least one modifier e.g. @rescue') def to_pipeline_part(self) -> List[str]: partial = [] block_body = ['{'] if self.error: block_body.append(f'@error {self.error} ') if self.rescue: block_body.append(f'@rescue {self.rescue} ') if self.retry: block_body.append(f'@retry {self.retry} ') if self.retry_block: block_body.append(f'@retry-block {self.retry_block} ') block_body.append('} ') partial.append(''.join(block_body)) for task in self.tasks: partial += task.to_pipeline_part() partial.append('{/@}') return partial
def merge_env(env: Dict[str, str]): """ Merge custom environment variables set per-task with system environment """ merged_dict = deepcopy(env) merged_dict.update(get_environment_copy()) return merged_dict