Skip to content

BaseRunner API Reference

BaseRunner

Bases: ABC

Base class for CWL workflow runners. Provides common functionality and defines the interface for specific runners.

Source code in zoo_runner_common/base_runner.py
class BaseRunner(ABC):
    """
    Base class for CWL workflow runners.
    Provides common functionality and defines the interface for specific runners.
    """

    def __init__(self, cwl, inputs, conf, outputs, execution_handler=None):
        """
        Initialize the base runner.

        :param cwl: CWL workflow definition (path to file or parsed CWL)
        :param inputs: ZOO inputs dictionary
        :param conf: ZOO configuration dictionary
        :param outputs: ZOO outputs dictionary
        :param execution_handler: Optional ExecutionHandler instance for hooks
        """
        self.cwl = cwl
        self.execution_handler = execution_handler or self._create_default_handler()

        # Create typed wrapper objects from ZOO dictionaries
        self.conf = ZooConf(conf)
        self.inputs = ZooInputs(inputs)
        self.outputs = ZooOutputs(outputs)

        # Parse CWL workflow
        self.workflow = CWLWorkflow(self.cwl, self.conf.workflow_id)

        # Legacy namespace for backward compatibility
        self.zoo_conf = types.SimpleNamespace(conf=conf)

        # Runner-specific state
        self.namespace_name = None
        self.execution = None

    def _create_default_handler(self):
        """Create a default handler if none provided."""

        class DefaultHandler:
            def pre_execution_hook(self):
                pass

            def post_execution_hook(self, *args, **kwargs):
                pass

            def get_secrets(self):
                return None

            def get_additional_parameters(self):
                return {}

            def get_pod_env_vars(self):
                return None

            def get_pod_node_selector(self):
                return None

            def handle_outputs(self, *args, **kwargs):
                pass

            def set_job_id(self, job_id):
                pass

            def get_namespace(self):
                """Get namespace for Calrissian execution."""
                return None

            def get_service_account(self):
                """Get service account for Calrissian execution."""
                return None

        return DefaultHandler()

    def update_status(self, progress: int, message: str = ""):
        """
        Update execution status in ZOO.

        Args:
            progress: Progress percentage (0-100)
            message: Status message to display
        """
        if hasattr(self.conf, 'conf') and "lenv" in self.conf.conf:
            self.conf.conf["lenv"]["message"] = message
            zoo.update_status(self.conf.conf, progress)
        else:
            logger.warning("Cannot update status: conf structure not available")

    def get_namespace_name(self):
        """
        Generate a namespace name for Kubernetes resources.

        Returns:
            str: Namespace name in format {workflow_id}-{unique_id}
        """
        if self.namespace_name is None:
            import uuid
            unique_id = str(uuid.uuid4())[:8]
            self.namespace_name = f"{self.get_workflow_id()}-{unique_id}".lower()

        return self.namespace_name

    def log_output(self, output):
        """Log output information."""
        logger.info("[BaseRunner] Output: %s", output)

    def validate_inputs(self):
        """Validate input parameters."""
        logger.info("[BaseRunner] Validating inputs...")
        return True

    def prepare(self):
        """
        Shared pre-execution logic.
        Calls execution handler hooks and prepares processing parameters.
        """
        logger.info("execution started")
        self.update_status(progress=2, message="starting execution")

        # Call pre-execution hook
        if self.execution_handler and hasattr(
            self.execution_handler, "pre_execution_hook"
        ):
            try:
                self.execution_handler.pre_execution_hook()
            except Exception as e:
                logger.error(f"Error in pre_execution_hook: {e}")
                logger.error(traceback.format_exc())
                raise

        logger.info("wrap CWL workflow with stage-in/out steps")

        processing_parameters = {
            **self.get_processing_parameters(),
            **(
                self.execution_handler.get_additional_parameters()
                if self.execution_handler
                else {}
            ),
        }

        return types.SimpleNamespace(cwl=self.wrap(), params=processing_parameters)

    def finalize(self, log, output, usage_report, tool_logs):
        """
        Finalization logic after execution.
        Calls execution handler post-execution and output handling hooks.
        """
        logger.info("Finalization started")

        # Call post-execution hook
        if self.execution_handler and hasattr(
            self.execution_handler, "post_execution_hook"
        ):
            try:
                self.execution_handler.post_execution_hook(
                    log, output, usage_report, tool_logs
                )
            except Exception as e:
                logger.error(f"Error in post_execution_hook: {e}")
                logger.error(traceback.format_exc())
                raise

        # Call handle_outputs hook
        if self.execution_handler and hasattr(self.execution_handler, "handle_outputs"):
            try:
                self.execution_handler.handle_outputs(
                    log, output, usage_report, tool_logs
                )
            except Exception as e:
                logger.error(f"Error in handle_outputs: {e}")
                logger.error(traceback.format_exc())
                raise

    def get_workflow_id(self):
        """
        Get the workflow identifier from configuration.

        Returns:
            str: The workflow identifier
        """
        return self.conf.workflow_id

    def get_workflow_inputs(self, mandatory=False):
        """
        Get workflow input parameter names.

        Args:
            mandatory: If True, only return mandatory inputs (no default value)

        Returns:
            list: List of input parameter names
        """
        return self.workflow.get_workflow_inputs(mandatory=mandatory)

    def get_max_cores(self):
        """
        Get the maximum number of cores from CWL ResourceRequirements.

        Returns:
            int: Maximum cores requested, or default from environment
        """
        resources = self.workflow.eval_resource()
        max_cores = max(resources["coresMax"]) if resources["coresMax"] else None

        if max_cores is None:
            max_cores = int(os.environ.get("DEFAULT_MAX_CORES", "2"))

        return max_cores

    def get_max_ram(self):
        """
        Get the maximum RAM in megabytes from CWL ResourceRequirements.

        Returns:
            str: Maximum RAM in MB with unit (e.g., "4096Mi")
        """
        resources = self.workflow.eval_resource()
        max_ram = max(resources["ramMax"]) if resources["ramMax"] else None

        if max_ram is None:
            max_ram = int(os.environ.get("DEFAULT_MAX_RAM", "4096"))

        # Return as string with Mi unit
        return f"{max_ram}Mi"

    def get_volume_size(self, unit="Mi"):
        """
        Get the volume size for temporary and output directories.

        Calculates based on tmpdir and outdir requirements from CWL.

        Args:
            unit: Unit for volume size ('Gi' for Gigabytes or 'Mi' for Megabytes)

        Returns:
            str: Volume size with unit (e.g., "10Gi" or "10240Mi")
        """
        resources = self.workflow.eval_resource()

        # Get max tmpdir and outdir in MB
        # Use Max if available, otherwise fall back to Min
        tmpdir_max = max(resources["tmpdirMax"]) if resources["tmpdirMax"] else (max(resources["tmpdirMin"]) if resources["tmpdirMin"] else 0)
        outdir_max = max(resources["outdirMax"]) if resources["outdirMax"] else (max(resources["outdirMin"]) if resources["outdirMin"] else 0)

        # Total in MB
        volume_size_mb = tmpdir_max + outdir_max

        if volume_size_mb == 0:
            # Default from environment
            default = os.environ.get("DEFAULT_VOLUME_SIZE", "10Gi")
            # If default doesn't match requested unit, convert
            if unit not in default:
                return f"10{unit}"
            return default

        # Convert based on requested unit
        if unit == "Gi":
            # Convert MB to Gi (1 Gi = 1024 Mi)
            volume_size = int(volume_size_mb / 1024) + 1
        else:  # Mi
            volume_size = volume_size_mb

        return f"{volume_size}{unit}"

    def assert_parameters(self, mandatory=True):
        """
        Validate that required workflow inputs are provided.

        Args:
            mandatory: If True, check only mandatory inputs

        Returns:
            bool: True if all required inputs are present, False otherwise
        """
        try:
            required_inputs = self.get_workflow_inputs(mandatory=mandatory)

            for required_input in required_inputs:
                if required_input not in self.inputs.inputs:
                    error_msg = f"Missing required input: {required_input}"
                    logger.error(error_msg)
                    return False

            logger.info("All required parameters are present")
            return True
        except Exception as e:
            logger.error(f"Error checking parameters: {e}")
            return False

    def get_processing_parameters(self):
        """
        Get processing parameters from inputs.

        Returns:
            dict: Processing parameters suitable for CWL execution
        """
        return self.inputs.get_processing_parameters(workflow=self.workflow.get_workflow())

    @abstractmethod
    def wrap(self):
        """
        Wrap the CWL workflow with stage-in/stage-out steps.
        Must be implemented by subclasses.
        """
        raise NotImplementedError("Subclasses must implement wrap()")

    @abstractmethod
    def execute(self):
        """
        Execute the CWL workflow.
        Must be implemented by subclasses.
        """
        raise NotImplementedError("Subclasses must implement execute()")

__init__(cwl, inputs, conf, outputs, execution_handler=None)

Initialize the base runner.

:param cwl: CWL workflow definition (path to file or parsed CWL) :param inputs: ZOO inputs dictionary :param conf: ZOO configuration dictionary :param outputs: ZOO outputs dictionary :param execution_handler: Optional ExecutionHandler instance for hooks

Source code in zoo_runner_common/base_runner.py
def __init__(self, cwl, inputs, conf, outputs, execution_handler=None):
    """
    Initialize the base runner.

    :param cwl: CWL workflow definition (path to file or parsed CWL)
    :param inputs: ZOO inputs dictionary
    :param conf: ZOO configuration dictionary
    :param outputs: ZOO outputs dictionary
    :param execution_handler: Optional ExecutionHandler instance for hooks
    """
    self.cwl = cwl
    self.execution_handler = execution_handler or self._create_default_handler()

    # Create typed wrapper objects from ZOO dictionaries
    self.conf = ZooConf(conf)
    self.inputs = ZooInputs(inputs)
    self.outputs = ZooOutputs(outputs)

    # Parse CWL workflow
    self.workflow = CWLWorkflow(self.cwl, self.conf.workflow_id)

    # Legacy namespace for backward compatibility
    self.zoo_conf = types.SimpleNamespace(conf=conf)

    # Runner-specific state
    self.namespace_name = None
    self.execution = None

assert_parameters(mandatory=True)

Validate that required workflow inputs are provided.

Parameters:

Name Type Description Default
mandatory

If True, check only mandatory inputs

True

Returns:

Name Type Description
bool

True if all required inputs are present, False otherwise

Source code in zoo_runner_common/base_runner.py
def assert_parameters(self, mandatory=True):
    """
    Validate that required workflow inputs are provided.

    Args:
        mandatory: If True, check only mandatory inputs

    Returns:
        bool: True if all required inputs are present, False otherwise
    """
    try:
        required_inputs = self.get_workflow_inputs(mandatory=mandatory)

        for required_input in required_inputs:
            if required_input not in self.inputs.inputs:
                error_msg = f"Missing required input: {required_input}"
                logger.error(error_msg)
                return False

        logger.info("All required parameters are present")
        return True
    except Exception as e:
        logger.error(f"Error checking parameters: {e}")
        return False

execute() abstractmethod

Execute the CWL workflow. Must be implemented by subclasses.

Source code in zoo_runner_common/base_runner.py
@abstractmethod
def execute(self):
    """
    Execute the CWL workflow.
    Must be implemented by subclasses.
    """
    raise NotImplementedError("Subclasses must implement execute()")

finalize(log, output, usage_report, tool_logs)

Finalization logic after execution. Calls execution handler post-execution and output handling hooks.

Source code in zoo_runner_common/base_runner.py
def finalize(self, log, output, usage_report, tool_logs):
    """
    Finalization logic after execution.
    Calls execution handler post-execution and output handling hooks.
    """
    logger.info("Finalization started")

    # Call post-execution hook
    if self.execution_handler and hasattr(
        self.execution_handler, "post_execution_hook"
    ):
        try:
            self.execution_handler.post_execution_hook(
                log, output, usage_report, tool_logs
            )
        except Exception as e:
            logger.error(f"Error in post_execution_hook: {e}")
            logger.error(traceback.format_exc())
            raise

    # Call handle_outputs hook
    if self.execution_handler and hasattr(self.execution_handler, "handle_outputs"):
        try:
            self.execution_handler.handle_outputs(
                log, output, usage_report, tool_logs
            )
        except Exception as e:
            logger.error(f"Error in handle_outputs: {e}")
            logger.error(traceback.format_exc())
            raise

get_max_cores()

Get the maximum number of cores from CWL ResourceRequirements.

Returns:

Name Type Description
int

Maximum cores requested, or default from environment

Source code in zoo_runner_common/base_runner.py
def get_max_cores(self):
    """
    Get the maximum number of cores from CWL ResourceRequirements.

    Returns:
        int: Maximum cores requested, or default from environment
    """
    resources = self.workflow.eval_resource()
    max_cores = max(resources["coresMax"]) if resources["coresMax"] else None

    if max_cores is None:
        max_cores = int(os.environ.get("DEFAULT_MAX_CORES", "2"))

    return max_cores

get_max_ram()

Get the maximum RAM in megabytes from CWL ResourceRequirements.

Returns:

Name Type Description
str

Maximum RAM in MB with unit (e.g., "4096Mi")

Source code in zoo_runner_common/base_runner.py
def get_max_ram(self):
    """
    Get the maximum RAM in megabytes from CWL ResourceRequirements.

    Returns:
        str: Maximum RAM in MB with unit (e.g., "4096Mi")
    """
    resources = self.workflow.eval_resource()
    max_ram = max(resources["ramMax"]) if resources["ramMax"] else None

    if max_ram is None:
        max_ram = int(os.environ.get("DEFAULT_MAX_RAM", "4096"))

    # Return as string with Mi unit
    return f"{max_ram}Mi"

get_namespace_name()

Generate a namespace name for Kubernetes resources.

Returns:

Name Type Description
str

Namespace name in format {workflow_id}-{unique_id}

Source code in zoo_runner_common/base_runner.py
def get_namespace_name(self):
    """
    Generate a namespace name for Kubernetes resources.

    Returns:
        str: Namespace name in format {workflow_id}-{unique_id}
    """
    if self.namespace_name is None:
        import uuid
        unique_id = str(uuid.uuid4())[:8]
        self.namespace_name = f"{self.get_workflow_id()}-{unique_id}".lower()

    return self.namespace_name

get_processing_parameters()

Get processing parameters from inputs.

Returns:

Name Type Description
dict

Processing parameters suitable for CWL execution

Source code in zoo_runner_common/base_runner.py
def get_processing_parameters(self):
    """
    Get processing parameters from inputs.

    Returns:
        dict: Processing parameters suitable for CWL execution
    """
    return self.inputs.get_processing_parameters(workflow=self.workflow.get_workflow())

get_volume_size(unit='Mi')

Get the volume size for temporary and output directories.

Calculates based on tmpdir and outdir requirements from CWL.

Parameters:

Name Type Description Default
unit

Unit for volume size ('Gi' for Gigabytes or 'Mi' for Megabytes)

'Mi'

Returns:

Name Type Description
str

Volume size with unit (e.g., "10Gi" or "10240Mi")

Source code in zoo_runner_common/base_runner.py
def get_volume_size(self, unit="Mi"):
    """
    Get the volume size for temporary and output directories.

    Calculates based on tmpdir and outdir requirements from CWL.

    Args:
        unit: Unit for volume size ('Gi' for Gigabytes or 'Mi' for Megabytes)

    Returns:
        str: Volume size with unit (e.g., "10Gi" or "10240Mi")
    """
    resources = self.workflow.eval_resource()

    # Get max tmpdir and outdir in MB
    # Use Max if available, otherwise fall back to Min
    tmpdir_max = max(resources["tmpdirMax"]) if resources["tmpdirMax"] else (max(resources["tmpdirMin"]) if resources["tmpdirMin"] else 0)
    outdir_max = max(resources["outdirMax"]) if resources["outdirMax"] else (max(resources["outdirMin"]) if resources["outdirMin"] else 0)

    # Total in MB
    volume_size_mb = tmpdir_max + outdir_max

    if volume_size_mb == 0:
        # Default from environment
        default = os.environ.get("DEFAULT_VOLUME_SIZE", "10Gi")
        # If default doesn't match requested unit, convert
        if unit not in default:
            return f"10{unit}"
        return default

    # Convert based on requested unit
    if unit == "Gi":
        # Convert MB to Gi (1 Gi = 1024 Mi)
        volume_size = int(volume_size_mb / 1024) + 1
    else:  # Mi
        volume_size = volume_size_mb

    return f"{volume_size}{unit}"

get_workflow_id()

Get the workflow identifier from configuration.

Returns:

Name Type Description
str

The workflow identifier

Source code in zoo_runner_common/base_runner.py
def get_workflow_id(self):
    """
    Get the workflow identifier from configuration.

    Returns:
        str: The workflow identifier
    """
    return self.conf.workflow_id

get_workflow_inputs(mandatory=False)

Get workflow input parameter names.

Parameters:

Name Type Description Default
mandatory

If True, only return mandatory inputs (no default value)

False

Returns:

Name Type Description
list

List of input parameter names

Source code in zoo_runner_common/base_runner.py
def get_workflow_inputs(self, mandatory=False):
    """
    Get workflow input parameter names.

    Args:
        mandatory: If True, only return mandatory inputs (no default value)

    Returns:
        list: List of input parameter names
    """
    return self.workflow.get_workflow_inputs(mandatory=mandatory)

log_output(output)

Log output information.

Source code in zoo_runner_common/base_runner.py
def log_output(self, output):
    """Log output information."""
    logger.info("[BaseRunner] Output: %s", output)

prepare()

Shared pre-execution logic. Calls execution handler hooks and prepares processing parameters.

Source code in zoo_runner_common/base_runner.py
def prepare(self):
    """
    Shared pre-execution logic.
    Calls execution handler hooks and prepares processing parameters.
    """
    logger.info("execution started")
    self.update_status(progress=2, message="starting execution")

    # Call pre-execution hook
    if self.execution_handler and hasattr(
        self.execution_handler, "pre_execution_hook"
    ):
        try:
            self.execution_handler.pre_execution_hook()
        except Exception as e:
            logger.error(f"Error in pre_execution_hook: {e}")
            logger.error(traceback.format_exc())
            raise

    logger.info("wrap CWL workflow with stage-in/out steps")

    processing_parameters = {
        **self.get_processing_parameters(),
        **(
            self.execution_handler.get_additional_parameters()
            if self.execution_handler
            else {}
        ),
    }

    return types.SimpleNamespace(cwl=self.wrap(), params=processing_parameters)

update_status(progress, message='')

Update execution status in ZOO.

Parameters:

Name Type Description Default
progress int

Progress percentage (0-100)

required
message str

Status message to display

''
Source code in zoo_runner_common/base_runner.py
def update_status(self, progress: int, message: str = ""):
    """
    Update execution status in ZOO.

    Args:
        progress: Progress percentage (0-100)
        message: Status message to display
    """
    if hasattr(self.conf, 'conf') and "lenv" in self.conf.conf:
        self.conf.conf["lenv"]["message"] = message
        zoo.update_status(self.conf.conf, progress)
    else:
        logger.warning("Cannot update status: conf structure not available")

validate_inputs()

Validate input parameters.

Source code in zoo_runner_common/base_runner.py
def validate_inputs(self):
    """Validate input parameters."""
    logger.info("[BaseRunner] Validating inputs...")
    return True

wrap() abstractmethod

Wrap the CWL workflow with stage-in/stage-out steps. Must be implemented by subclasses.

Source code in zoo_runner_common/base_runner.py
@abstractmethod
def wrap(self):
    """
    Wrap the CWL workflow with stage-in/stage-out steps.
    Must be implemented by subclasses.
    """
    raise NotImplementedError("Subclasses must implement wrap()")

Class Overview

BaseRunner is the abstract base class for implementing CWL workflow runners in ZOO-Project.

Constructor

def __init__(self, cwl, inputs, conf, outputs, execution_handler=None)

Parameters:

  • cwl: CWL workflow definition (path or content)
  • inputs: ZOO inputs dictionary
  • conf: ZOO configuration dictionary
  • outputs: ZOO outputs dictionary
  • execution_handler: Optional execution handler instance (e.g., CommonExecutionHandler)

Abstract Methods

Subclasses must implement these methods:

execute()

Execute the CWL workflow.

@abstractmethod
def execute(self):
    """Execute the workflow"""
    pass

wrap()

Wrap the CWL workflow with stage-in/stage-out steps.

@abstractmethod
def wrap(self):
    """Wrap workflow with staging steps"""
    pass

get_processing_parameters()

Get processing parameters specific to the runner.

@abstractmethod
def get_processing_parameters(self):
    """Get runner-specific parameters"""
    pass

Methods

prepare()

Shared pre-execution logic.

def prepare(self)

Behavior:

  1. Logs execution start
  2. Updates status to 2%
  3. Calls execution_handler.pre_execution_hook()
  4. Wraps CWL workflow
  5. Prepares processing parameters

Returns: - SimpleNamespace with: - cwl: Wrapped workflow - params: Processing parameters

Example:

prepared = runner.prepare()
wrapped_cwl = prepared.cwl
params = prepared.params

finalize()

Finalization logic after execution.

def finalize(self, log, output, usage_report, tool_logs)

Parameters:

  • log: Execution log
  • output: Workflow outputs
  • usage_report: Resource usage statistics
  • tool_logs: Tool execution logs

Behavior:

  1. Calls execution_handler.post_execution_hook()
  2. Calls execution_handler.handle_outputs()

Example:

runner.finalize(log, outputs, usage, tool_logs)

update_status()

Update execution status in ZOO.

def update_status(self, progress: int, message: str = "")

Parameters:

  • progress (int): Progress percentage (0-100)
  • message (str): Status message

Example:

runner.update_status(50, "Processing data...")

validate_inputs()

Validate input parameters.

def validate_inputs(self) -> bool

Returns: - bool: True if inputs are valid

Example:

if not runner.validate_inputs():
    return zoo.SERVICE_FAILED

log_output()

Log output information.

def log_output(self, output)

Parameters:

  • output: Output data to log

get_namespace_name()

Get or generate namespace name for execution.

def get_namespace_name(self) -> str

Returns: - str: Namespace name (format: {identifier}-{usid})

Example:

namespace = runner.get_namespace_name()
# Returns: "process-name-12345"

Attributes

cwl

CWL workflow definition.

inputs

ZOO inputs dictionary.

conf

ZOO configuration dictionary.

outputs

ZOO outputs dictionary.

execution_handler

Execution handler instance.

namespace_name

Kubernetes namespace name.

zoo_conf

SimpleNamespace wrapper for configuration.

Default Handler

If no execution handler is provided, BaseRunner creates a default handler with no-op methods:

class DefaultHandler:
    def pre_execution_hook(self):
        pass

    def post_execution_hook(self, *args, **kwargs):
        pass

    def get_secrets(self):
        return None

    def get_additional_parameters(self):
        return {}

    def get_pod_env_vars(self):
        return None

    def get_pod_node_selector(self):
        return None

    def handle_outputs(self, *args, **kwargs):
        pass

    def set_job_id(self, job_id):
        pass

Usage Pattern

from base_runner import BaseRunner

class MyRunner(BaseRunner):
    def execute(self):
        """Execute workflow"""
        # Prepare
        prepared = self.prepare()
        self.update_status(20, "Executing")

        # Execute
        result = self.run_workflow(prepared.cwl, prepared.params)

        # Finalize
        self.finalize(None, result, None, None)

        return zoo.SERVICE_SUCCEEDED

    def wrap(self):
        """Wrap workflow"""
        return self.cwl

    def get_processing_parameters(self):
        """Get parameters"""
        return {"max_cores": 8}

# Use in service
def my_service(conf, inputs, outputs):
    runner = MyRunner(cwl, inputs, conf, outputs, handler)
    return runner.execute()

See Also