Tasks API

Each task must implement a TaskInterface

class rkd.api.contract.TaskInterface
configure_argparse(parser: argparse.ArgumentParser)

Allows a task to configure ArgumentParser (argparse)

copy_internal_dependencies(task)

Allows to execute a task-in-task, by copying dependent services from one task to other task

exec(cmd: str, capture: bool = False, background: bool = False) → Optional[str]

Starts a process in shell. Throws exception on error. To capture output set capture=True

NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
execute(context: rkd.api.contract.ExecutionContext) → bool

Executes a task. True/False should be returned as return

format_task_name(name: str) → str

Allows to add a fancy formatting to the task name, when the task is displayed eg. on the :tasks list

get_become_as() → str

User name in UNIX/Linux system, optional. When defined, then current task will be executed as this user (WARNING: a forked process would be started)

get_declared_envs() → Dict[str, Union[str, rkd.api.contract.ArgumentEnv]]

Dictionary of allowed envs to override: KEY -> DEFAULT VALUE

get_full_name()

Returns task full name, including group name

get_group_name() → str

Group name where the task belongs eg. “:publishing”, can be empty.

get_name() → str

Task name eg. “:sh”

io() → rkd.api.inputoutput.IO

Gives access to Input/Output object

py(code: str = '', become: str = None, capture: bool = False, script_path: str = None, arguments: str = '') → Optional[str]

Executes a Python code in a separate process

NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
rkd(args: list, verbose: bool = False, capture: bool = False) → str

Spawns an RKD subprocess

NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
sh(cmd: str, capture: bool = False, verbose: bool = False, strict: bool = True, env: dict = None, use_subprocess: bool = False) → Optional[str]

Executes a shell script in bash. Throws exception on error. To capture output set capture=True

NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
should_fork() → bool

Decides if task should be ran in a separate Python process (be careful with it)

silent_sh(cmd: str, verbose: bool = False, strict: bool = True, env: dict = None) → bool

sh() shortcut that catches errors and displays using IO().error_msg()

NOTICE: Use instead of subprocess. Raw subprocess is less supported and output from raw subprocess
may be not catch properly into the logs
static table(header: list, body: list, tablefmt: str = 'simple', floatfmt: str = 'g', numalign: str = 'decimal', stralign: str = 'left', missingval: str = '', showindex: str = 'default', disable_numparse: bool = False, colalign: str = None)

Renders a table

Parameters:
  • header
  • body
  • tablefmt
  • floatfmt
  • numalign
  • stralign
  • missingval
  • showindex
  • disable_numparse
  • colalign
Returns:

Formatted table as string

To include a task, wrap it in a declaration

class rkd.api.syntax.TaskDeclaration(task: rkd.api.contract.TaskInterface, env: Dict[str, str] = None, args: List[str] = None)

To create an alias for task or multiple tasks

class rkd.api.syntax.TaskAliasDeclaration(name: str, to_execute: List[str], env: Dict[str, str] = {}, description: str = '')

Allows to define a custom task name that triggers other tasks in proper order

Execution context provides parsed shell arguments and environment variables

class rkd.api.contract.ExecutionContext(declaration: rkd.api.contract.TaskDeclarationInterface, parent: Optional[rkd.api.contract.GroupDeclarationInterface] = None, args: Dict[str, str] = {}, env: Dict[str, str] = {}, defined_args: Dict[str, dict] = {})

Defines which objects could be accessed by Task. It’s a scope of a single task execution.

get_arg(name: str) → Optional[str]

Get argument or option

Usage:
ctx.get_arg(’–name’) # for options ctx.get_arg(‘name’) # for arguments
Raises:KeyError when argument/option was not defined
Returns:Actual value or default value
get_arg_or_env(name: str) → Optional[str]

Provides value of user input

Usage:
get_arg_or_env(’–file-path’) resolves into FILE_PATH env variable, and –file-path switch (file_path in argparse)
Behavior:

When user provided explicitly switch eg. –history-id, then it’s value will be taken in priority. If switch –history-id was not used, but user provided HISTORY_ID environment variable, then it will be considered.

If no switch provided and no environment variable provided, but a switch has default value - it would be returned.

If no switch provided and no environment variable provided, the switch does not have default, but environment variable has a default value defined, it would be returned.

When the –switch has default value (user does not use it, or user sets it explicitly to default value), and environment variable SWITCH is defined, then environment variable would be taken.

From RKD 2.1 the environment variable names can be mapped to any ArgParse switch.

Below example maps “COMMAND” environment variable to “–cmd” switch.

def get_declared_envs(self) -> Dict[str, Union[str, ArgumentEnv]]:
    return {
        'COMMAND': ArgumentEnv(name='COMMAND', switch='--cmd', default='')
    }
Raises:MissingInputException – When no switch and no environment variable was provided, then an exception is thrown.
get_env(name: str, switch: str = '', error_on_not_used: bool = False)

Get environment variable value

Interaction with input and output

class rkd.api.inputoutput.IO

Interacting with input and output - stdout/stderr/stdin, logging

add_output_processor(callback: Callable[[Union[str, bytes], str], Union[str, bytes]])

Registers a output processing callback Each byte outputted by this IO instance will go through a set of registered processors

Example use cases:
  • Hide sensitive information (secrets)
  • Reformat output
  • Strip long stdouts from commands
  • Change colors
  • Add/remove formatting
Parameters:callback
Returns:
capture_descriptors(target_files: List[str] = None, stream=None, enable_standard_out: bool = True)

Capture stdout and stderr from a block of code - use with ‘with’

critical(text)

Logger: critical

debug(text)

Logger: debug

err(text)

Standard error

errln(text)

Standard error + newline

error(text)

Logger: error

error_msg(text)

Error message

h1(text)

Heading #1 (optional output)

h2(text)

Heading #2 (optional output)

h3(text)

Heading #3 (optional output)

h4(text)

Heading #3 (optional output)

info(text)

Logger: info

info_msg(text)

Informational message (optional output)

internal(text)

Logger: internal Should be used only by RKD core for more intensive logging

is_silent() → bool

Is output silent? In silent mode OPTIONAL MESSAGES are not shown

opt_errln(text)

Optional errln()

opt_out(text)

Optional output - fancy output skipped in –silent mode

opt_outln(text)

Optional output - fancy output skipped in –silent mode + newline

out(text)

Standard output

outln(text)

Standard output + newline

print_group(text)

Prints a colored text inside brackets [text] (optional output)

print_line()

Prints a newline

print_opt_line()

Prints a newline (optional output)

print_separator()

Prints a text separator (optional output)

success_msg(text)

Success message (optional output)

warn(text)

Logger: warn

warn_msg(text) → None

Warning message (optional output)

Storing temporary files

class rkd.api.temp.TempManager(chdir: str = './.rkd/')

Manages temporary files inside .rkd directory Using this class you make sure your code is more safe to use on Continuous Integration systems (CI)

Usage:
path = self.temp.assign_temporary_file(mode=0o755)
assign_temporary_file(mode: int = 493) → str

Assign a path for writing temporary files in RKD workspace

Note: The RKD is executing the finally_clean_up() at the end of each task

Usage:
try:
path = RKDTemp.assign_temporary_file_path() # (…) some action there
finally:
RKDTemp.finally_clean_up()
finally_clean_up()

Used to clean up all temporary files at the end of the code execution

TaskExecutor is running this method after each finished task

Parsing RKD syntax

class rkd.api.parsing.SyntaxParsing
static parse_imports_by_list_of_classes(classes_or_modules: List[str]) → List[rkd.api.syntax.TaskDeclaration]

Parses a List[str] of imports, like in YAML syntax. Produces a List[TaskDeclaration] with imported list of tasks.

Could be used to import & validate RKD tasks.

Examples

  • rkd.standardlib
  • rkd.standardlib.jinja.FileRendererTask

:raises ParsingException :return:

Testing

class rkd.api.testing.BasicTestingCase(methodName='runTest')
Provides minimum of:
  • Doing backup of environment and cwd
  • Methods for mocking task dependencies (RKD-specific like ExecutionContext)
environment(environ: dict)

Mocks environment

Example usage:
with self.environment({‘RKD_PATH’: SCRIPT_DIR_PATH + ‘/../docs/examples/env-in-yaml/.rkd’}):
Parameters:environ
Returns:
static list_to_str(in_list: list) → List[str]

Execute __str__ on each list element, and replace element with the result

static mock_execution_context(task: rkd.api.contract.TaskInterface, args: Dict[str, str] = None, env: Dict[str, str] = None, defined_args: Dict[str, dict] = None) → rkd.api.contract.ExecutionContext

Prepares a simplified rkd.api.contract.ExecutionContext instance

Parameters:
  • task
  • args
  • env
  • defined_args
Returns:

static satisfy_task_dependencies(task: rkd.api.contract.TaskInterface, io: rkd.api.inputoutput.IO = None) → rkd.api.contract.TaskInterface

Inserts required dependencies to your task that implements rkd.api.contract.TaskInterface

Parameters:
  • task
  • io
Returns:

setUp() → None

Hook method for setting up the test fixture before exercising it.

tearDown() → None

Hook method for deconstructing the test fixture after testing it.

class rkd.api.testing.FunctionalTestingCase(methodName='runTest')

Provides methods for running RKD task or multiple tasks with output and exit code capturing. Inherits OutputCapturingSafeTestCase.

execute_mocked_task_and_get_output(task: rkd.api.contract.TaskInterface, args=None, env=None) → str

Run a single task, capturing it’s output in a simplified way. There is no whole RKD bootstrapped in this operation.

Parameters:
Returns:

run_and_capture_output(argv: list, verbose: bool = False) → Tuple[str, int]

Run task(s) and capture output + exit code. Whole RKD from scratch will be bootstrapped there.

Example usage:
full_output, exit_code = self.run_and_capture_output([‘:tasks’])
Parameters:
  • argv (list) – List of tasks, arguments, commandline switches
  • verbose (bool) – Print all output also to stdout
Returns:

class rkd.api.testing.OutputCapturingSafeTestCase(methodName='runTest')

Provides hooks for keeping stdout/stderr immutable between tests.

setUp() → None

Hook method for setting up the test fixture before exercising it.

tearDown() → None

Hook method for deconstructing the test fixture after testing it.