BaseRunner API Reference
BaseRunner
Bases: ABC
Base class for CWL workflow runners.
Provides common functionality and defines the interface for specific runners.
Source code in zoo_runner_common/base_runner.py
| class BaseRunner(ABC):
"""
Base class for CWL workflow runners.
Provides common functionality and defines the interface for specific runners.
"""
def __init__(self, cwl, inputs, conf, outputs, execution_handler=None):
"""
Initialize the base runner.
:param cwl: CWL workflow definition (path to file or parsed CWL)
:param inputs: ZOO inputs dictionary
:param conf: ZOO configuration dictionary
:param outputs: ZOO outputs dictionary
:param execution_handler: Optional ExecutionHandler instance for hooks
"""
self.cwl = cwl
self.execution_handler = execution_handler or self._create_default_handler()
# Create typed wrapper objects from ZOO dictionaries
self.conf = ZooConf(conf)
self.inputs = ZooInputs(inputs)
self.outputs = ZooOutputs(outputs)
# Parse CWL workflow
self.workflow = CWLWorkflow(self.cwl, self.conf.workflow_id)
# Legacy namespace for backward compatibility
self.zoo_conf = types.SimpleNamespace(conf=conf)
# Runner-specific state
self.namespace_name = None
self.execution = None
def _create_default_handler(self):
"""Create a default handler if none provided."""
class DefaultHandler:
def pre_execution_hook(self):
pass
def post_execution_hook(self, *args, **kwargs):
pass
def get_secrets(self):
return None
def get_additional_parameters(self):
return {}
def get_pod_env_vars(self):
return None
def get_pod_node_selector(self):
return None
def handle_outputs(self, *args, **kwargs):
pass
def set_job_id(self, job_id):
pass
def get_namespace(self):
"""Get namespace for Calrissian execution."""
return None
def get_service_account(self):
"""Get service account for Calrissian execution."""
return None
return DefaultHandler()
def update_status(self, progress: int, message: str = ""):
"""
Update execution status in ZOO.
Args:
progress: Progress percentage (0-100)
message: Status message to display
"""
if hasattr(self.conf, 'conf') and "lenv" in self.conf.conf:
self.conf.conf["lenv"]["message"] = message
zoo.update_status(self.conf.conf, progress)
else:
logger.warning("Cannot update status: conf structure not available")
def get_namespace_name(self):
"""
Generate a namespace name for Kubernetes resources.
Returns:
str: Namespace name in format {workflow_id}-{unique_id}
"""
if self.namespace_name is None:
import uuid
unique_id = str(uuid.uuid4())[:8]
self.namespace_name = f"{self.get_workflow_id()}-{unique_id}".lower()
return self.namespace_name
def log_output(self, output):
"""Log output information."""
logger.info("[BaseRunner] Output: %s", output)
def validate_inputs(self):
"""Validate input parameters."""
logger.info("[BaseRunner] Validating inputs...")
return True
def prepare(self):
"""
Shared pre-execution logic.
Calls execution handler hooks and prepares processing parameters.
"""
logger.info("execution started")
self.update_status(progress=2, message="starting execution")
# Call pre-execution hook
if self.execution_handler and hasattr(
self.execution_handler, "pre_execution_hook"
):
try:
self.execution_handler.pre_execution_hook()
except Exception as e:
logger.error(f"Error in pre_execution_hook: {e}")
logger.error(traceback.format_exc())
raise
logger.info("wrap CWL workflow with stage-in/out steps")
processing_parameters = {
**self.get_processing_parameters(),
**(
self.execution_handler.get_additional_parameters()
if self.execution_handler
else {}
),
}
return types.SimpleNamespace(cwl=self.wrap(), params=processing_parameters)
def finalize(self, log, output, usage_report, tool_logs):
"""
Finalization logic after execution.
Calls execution handler post-execution and output handling hooks.
"""
logger.info("Finalization started")
# Call post-execution hook
if self.execution_handler and hasattr(
self.execution_handler, "post_execution_hook"
):
try:
self.execution_handler.post_execution_hook(
log, output, usage_report, tool_logs
)
except Exception as e:
logger.error(f"Error in post_execution_hook: {e}")
logger.error(traceback.format_exc())
raise
# Call handle_outputs hook
if self.execution_handler and hasattr(self.execution_handler, "handle_outputs"):
try:
self.execution_handler.handle_outputs(
log, output, usage_report, tool_logs
)
except Exception as e:
logger.error(f"Error in handle_outputs: {e}")
logger.error(traceback.format_exc())
raise
def get_workflow_id(self):
"""
Get the workflow identifier from configuration.
Returns:
str: The workflow identifier
"""
return self.conf.workflow_id
def get_workflow_inputs(self, mandatory=False):
"""
Get workflow input parameter names.
Args:
mandatory: If True, only return mandatory inputs (no default value)
Returns:
list: List of input parameter names
"""
return self.workflow.get_workflow_inputs(mandatory=mandatory)
def get_max_cores(self):
"""
Get the maximum number of cores from CWL ResourceRequirements.
Returns:
int: Maximum cores requested, or default from environment
"""
resources = self.workflow.eval_resource()
max_cores = max(resources["coresMax"]) if resources["coresMax"] else None
if max_cores is None:
max_cores = int(os.environ.get("DEFAULT_MAX_CORES", "2"))
return max_cores
def get_max_ram(self):
"""
Get the maximum RAM in megabytes from CWL ResourceRequirements.
Returns:
str: Maximum RAM in MB with unit (e.g., "4096Mi")
"""
resources = self.workflow.eval_resource()
max_ram = max(resources["ramMax"]) if resources["ramMax"] else None
if max_ram is None:
max_ram = int(os.environ.get("DEFAULT_MAX_RAM", "4096"))
# Return as string with Mi unit
return f"{max_ram}Mi"
def get_volume_size(self, unit="Mi"):
"""
Get the volume size for temporary and output directories.
Calculates based on tmpdir and outdir requirements from CWL.
Args:
unit: Unit for volume size ('Gi' for Gigabytes or 'Mi' for Megabytes)
Returns:
str: Volume size with unit (e.g., "10Gi" or "10240Mi")
"""
resources = self.workflow.eval_resource()
# Get max tmpdir and outdir in MB
# Use Max if available, otherwise fall back to Min
tmpdir_max = max(resources["tmpdirMax"]) if resources["tmpdirMax"] else (max(resources["tmpdirMin"]) if resources["tmpdirMin"] else 0)
outdir_max = max(resources["outdirMax"]) if resources["outdirMax"] else (max(resources["outdirMin"]) if resources["outdirMin"] else 0)
# Total in MB
volume_size_mb = tmpdir_max + outdir_max
if volume_size_mb == 0:
# Default from environment
default = os.environ.get("DEFAULT_VOLUME_SIZE", "10Gi")
# If default doesn't match requested unit, convert
if unit not in default:
return f"10{unit}"
return default
# Convert based on requested unit
if unit == "Gi":
# Convert MB to Gi (1 Gi = 1024 Mi)
volume_size = int(volume_size_mb / 1024) + 1
else: # Mi
volume_size = volume_size_mb
return f"{volume_size}{unit}"
def assert_parameters(self, mandatory=True):
"""
Validate that required workflow inputs are provided.
Args:
mandatory: If True, check only mandatory inputs
Returns:
bool: True if all required inputs are present, False otherwise
"""
try:
required_inputs = self.get_workflow_inputs(mandatory=mandatory)
for required_input in required_inputs:
if required_input not in self.inputs.inputs:
error_msg = f"Missing required input: {required_input}"
logger.error(error_msg)
return False
logger.info("All required parameters are present")
return True
except Exception as e:
logger.error(f"Error checking parameters: {e}")
return False
def get_processing_parameters(self):
"""
Get processing parameters from inputs.
Returns:
dict: Processing parameters suitable for CWL execution
"""
return self.inputs.get_processing_parameters(workflow=self.workflow.get_workflow())
@abstractmethod
def wrap(self):
"""
Wrap the CWL workflow with stage-in/stage-out steps.
Must be implemented by subclasses.
"""
raise NotImplementedError("Subclasses must implement wrap()")
@abstractmethod
def execute(self):
"""
Execute the CWL workflow.
Must be implemented by subclasses.
"""
raise NotImplementedError("Subclasses must implement execute()")
|
__init__(cwl, inputs, conf, outputs, execution_handler=None)
Initialize the base runner.
:param cwl: CWL workflow definition (path to file or parsed CWL)
:param inputs: ZOO inputs dictionary
:param conf: ZOO configuration dictionary
:param outputs: ZOO outputs dictionary
:param execution_handler: Optional ExecutionHandler instance for hooks
Source code in zoo_runner_common/base_runner.py
| def __init__(self, cwl, inputs, conf, outputs, execution_handler=None):
"""
Initialize the base runner.
:param cwl: CWL workflow definition (path to file or parsed CWL)
:param inputs: ZOO inputs dictionary
:param conf: ZOO configuration dictionary
:param outputs: ZOO outputs dictionary
:param execution_handler: Optional ExecutionHandler instance for hooks
"""
self.cwl = cwl
self.execution_handler = execution_handler or self._create_default_handler()
# Create typed wrapper objects from ZOO dictionaries
self.conf = ZooConf(conf)
self.inputs = ZooInputs(inputs)
self.outputs = ZooOutputs(outputs)
# Parse CWL workflow
self.workflow = CWLWorkflow(self.cwl, self.conf.workflow_id)
# Legacy namespace for backward compatibility
self.zoo_conf = types.SimpleNamespace(conf=conf)
# Runner-specific state
self.namespace_name = None
self.execution = None
|
assert_parameters(mandatory=True)
Validate that required workflow inputs are provided.
Parameters:
| Name |
Type |
Description |
Default |
mandatory
|
|
If True, check only mandatory inputs
|
True
|
Returns:
| Name | Type |
Description |
bool |
|
True if all required inputs are present, False otherwise
|
Source code in zoo_runner_common/base_runner.py
| def assert_parameters(self, mandatory=True):
"""
Validate that required workflow inputs are provided.
Args:
mandatory: If True, check only mandatory inputs
Returns:
bool: True if all required inputs are present, False otherwise
"""
try:
required_inputs = self.get_workflow_inputs(mandatory=mandatory)
for required_input in required_inputs:
if required_input not in self.inputs.inputs:
error_msg = f"Missing required input: {required_input}"
logger.error(error_msg)
return False
logger.info("All required parameters are present")
return True
except Exception as e:
logger.error(f"Error checking parameters: {e}")
return False
|
execute()
abstractmethod
Execute the CWL workflow.
Must be implemented by subclasses.
Source code in zoo_runner_common/base_runner.py
| @abstractmethod
def execute(self):
"""
Execute the CWL workflow.
Must be implemented by subclasses.
"""
raise NotImplementedError("Subclasses must implement execute()")
|
finalize(log, output, usage_report, tool_logs)
Finalization logic after execution.
Calls execution handler post-execution and output handling hooks.
Source code in zoo_runner_common/base_runner.py
| def finalize(self, log, output, usage_report, tool_logs):
"""
Finalization logic after execution.
Calls execution handler post-execution and output handling hooks.
"""
logger.info("Finalization started")
# Call post-execution hook
if self.execution_handler and hasattr(
self.execution_handler, "post_execution_hook"
):
try:
self.execution_handler.post_execution_hook(
log, output, usage_report, tool_logs
)
except Exception as e:
logger.error(f"Error in post_execution_hook: {e}")
logger.error(traceback.format_exc())
raise
# Call handle_outputs hook
if self.execution_handler and hasattr(self.execution_handler, "handle_outputs"):
try:
self.execution_handler.handle_outputs(
log, output, usage_report, tool_logs
)
except Exception as e:
logger.error(f"Error in handle_outputs: {e}")
logger.error(traceback.format_exc())
raise
|
get_max_cores()
Get the maximum number of cores from CWL ResourceRequirements.
Returns:
| Name | Type |
Description |
int |
|
Maximum cores requested, or default from environment
|
Source code in zoo_runner_common/base_runner.py
| def get_max_cores(self):
"""
Get the maximum number of cores from CWL ResourceRequirements.
Returns:
int: Maximum cores requested, or default from environment
"""
resources = self.workflow.eval_resource()
max_cores = max(resources["coresMax"]) if resources["coresMax"] else None
if max_cores is None:
max_cores = int(os.environ.get("DEFAULT_MAX_CORES", "2"))
return max_cores
|
get_max_ram()
Get the maximum RAM in megabytes from CWL ResourceRequirements.
Returns:
| Name | Type |
Description |
str |
|
Maximum RAM in MB with unit (e.g., "4096Mi")
|
Source code in zoo_runner_common/base_runner.py
| def get_max_ram(self):
"""
Get the maximum RAM in megabytes from CWL ResourceRequirements.
Returns:
str: Maximum RAM in MB with unit (e.g., "4096Mi")
"""
resources = self.workflow.eval_resource()
max_ram = max(resources["ramMax"]) if resources["ramMax"] else None
if max_ram is None:
max_ram = int(os.environ.get("DEFAULT_MAX_RAM", "4096"))
# Return as string with Mi unit
return f"{max_ram}Mi"
|
get_namespace_name()
Generate a namespace name for Kubernetes resources.
Returns:
| Name | Type |
Description |
str |
|
Namespace name in format {workflow_id}-{unique_id}
|
Source code in zoo_runner_common/base_runner.py
| def get_namespace_name(self):
"""
Generate a namespace name for Kubernetes resources.
Returns:
str: Namespace name in format {workflow_id}-{unique_id}
"""
if self.namespace_name is None:
import uuid
unique_id = str(uuid.uuid4())[:8]
self.namespace_name = f"{self.get_workflow_id()}-{unique_id}".lower()
return self.namespace_name
|
get_processing_parameters()
Get processing parameters from inputs.
Returns:
| Name | Type |
Description |
dict |
|
Processing parameters suitable for CWL execution
|
Source code in zoo_runner_common/base_runner.py
| def get_processing_parameters(self):
"""
Get processing parameters from inputs.
Returns:
dict: Processing parameters suitable for CWL execution
"""
return self.inputs.get_processing_parameters(workflow=self.workflow.get_workflow())
|
get_volume_size(unit='Mi')
Get the volume size for temporary and output directories.
Calculates based on tmpdir and outdir requirements from CWL.
Parameters:
| Name |
Type |
Description |
Default |
unit
|
|
Unit for volume size ('Gi' for Gigabytes or 'Mi' for Megabytes)
|
'Mi'
|
Returns:
| Name | Type |
Description |
str |
|
Volume size with unit (e.g., "10Gi" or "10240Mi")
|
Source code in zoo_runner_common/base_runner.py
| def get_volume_size(self, unit="Mi"):
"""
Get the volume size for temporary and output directories.
Calculates based on tmpdir and outdir requirements from CWL.
Args:
unit: Unit for volume size ('Gi' for Gigabytes or 'Mi' for Megabytes)
Returns:
str: Volume size with unit (e.g., "10Gi" or "10240Mi")
"""
resources = self.workflow.eval_resource()
# Get max tmpdir and outdir in MB
# Use Max if available, otherwise fall back to Min
tmpdir_max = max(resources["tmpdirMax"]) if resources["tmpdirMax"] else (max(resources["tmpdirMin"]) if resources["tmpdirMin"] else 0)
outdir_max = max(resources["outdirMax"]) if resources["outdirMax"] else (max(resources["outdirMin"]) if resources["outdirMin"] else 0)
# Total in MB
volume_size_mb = tmpdir_max + outdir_max
if volume_size_mb == 0:
# Default from environment
default = os.environ.get("DEFAULT_VOLUME_SIZE", "10Gi")
# If default doesn't match requested unit, convert
if unit not in default:
return f"10{unit}"
return default
# Convert based on requested unit
if unit == "Gi":
# Convert MB to Gi (1 Gi = 1024 Mi)
volume_size = int(volume_size_mb / 1024) + 1
else: # Mi
volume_size = volume_size_mb
return f"{volume_size}{unit}"
|
get_workflow_id()
Get the workflow identifier from configuration.
Returns:
| Name | Type |
Description |
str |
|
|
Source code in zoo_runner_common/base_runner.py
| def get_workflow_id(self):
"""
Get the workflow identifier from configuration.
Returns:
str: The workflow identifier
"""
return self.conf.workflow_id
|
Get workflow input parameter names.
Parameters:
| Name |
Type |
Description |
Default |
mandatory
|
|
If True, only return mandatory inputs (no default value)
|
False
|
Returns:
| Name | Type |
Description |
list |
|
List of input parameter names
|
Source code in zoo_runner_common/base_runner.py
| def get_workflow_inputs(self, mandatory=False):
"""
Get workflow input parameter names.
Args:
mandatory: If True, only return mandatory inputs (no default value)
Returns:
list: List of input parameter names
"""
return self.workflow.get_workflow_inputs(mandatory=mandatory)
|
log_output(output)
Log output information.
Source code in zoo_runner_common/base_runner.py
| def log_output(self, output):
"""Log output information."""
logger.info("[BaseRunner] Output: %s", output)
|
prepare()
Shared pre-execution logic.
Calls execution handler hooks and prepares processing parameters.
Source code in zoo_runner_common/base_runner.py
| def prepare(self):
"""
Shared pre-execution logic.
Calls execution handler hooks and prepares processing parameters.
"""
logger.info("execution started")
self.update_status(progress=2, message="starting execution")
# Call pre-execution hook
if self.execution_handler and hasattr(
self.execution_handler, "pre_execution_hook"
):
try:
self.execution_handler.pre_execution_hook()
except Exception as e:
logger.error(f"Error in pre_execution_hook: {e}")
logger.error(traceback.format_exc())
raise
logger.info("wrap CWL workflow with stage-in/out steps")
processing_parameters = {
**self.get_processing_parameters(),
**(
self.execution_handler.get_additional_parameters()
if self.execution_handler
else {}
),
}
return types.SimpleNamespace(cwl=self.wrap(), params=processing_parameters)
|
update_status(progress, message='')
Update execution status in ZOO.
Parameters:
| Name |
Type |
Description |
Default |
progress
|
int
|
Progress percentage (0-100)
|
required
|
message
|
str
|
Status message to display
|
''
|
Source code in zoo_runner_common/base_runner.py
| def update_status(self, progress: int, message: str = ""):
"""
Update execution status in ZOO.
Args:
progress: Progress percentage (0-100)
message: Status message to display
"""
if hasattr(self.conf, 'conf') and "lenv" in self.conf.conf:
self.conf.conf["lenv"]["message"] = message
zoo.update_status(self.conf.conf, progress)
else:
logger.warning("Cannot update status: conf structure not available")
|
Validate input parameters.
Source code in zoo_runner_common/base_runner.py
| def validate_inputs(self):
"""Validate input parameters."""
logger.info("[BaseRunner] Validating inputs...")
return True
|
wrap()
abstractmethod
Wrap the CWL workflow with stage-in/stage-out steps.
Must be implemented by subclasses.
Source code in zoo_runner_common/base_runner.py
| @abstractmethod
def wrap(self):
"""
Wrap the CWL workflow with stage-in/stage-out steps.
Must be implemented by subclasses.
"""
raise NotImplementedError("Subclasses must implement wrap()")
|
Class Overview
BaseRunner is the abstract base class for implementing CWL workflow runners in ZOO-Project.
Constructor
def __init__(self, cwl, inputs, conf, outputs, execution_handler=None)
Parameters:
cwl: CWL workflow definition (path or content)
inputs: ZOO inputs dictionary
conf: ZOO configuration dictionary
outputs: ZOO outputs dictionary
execution_handler: Optional execution handler instance (e.g., CommonExecutionHandler)
Abstract Methods
Subclasses must implement these methods:
execute()
Execute the CWL workflow.
@abstractmethod
def execute(self):
"""Execute the workflow"""
pass
wrap()
Wrap the CWL workflow with stage-in/stage-out steps.
@abstractmethod
def wrap(self):
"""Wrap workflow with staging steps"""
pass
get_processing_parameters()
Get processing parameters specific to the runner.
@abstractmethod
def get_processing_parameters(self):
"""Get runner-specific parameters"""
pass
Methods
prepare()
Shared pre-execution logic.
Behavior:
- Logs execution start
- Updates status to 2%
- Calls
execution_handler.pre_execution_hook()
- Wraps CWL workflow
- Prepares processing parameters
Returns:
- SimpleNamespace with:
- cwl: Wrapped workflow
- params: Processing parameters
Example:
prepared = runner.prepare()
wrapped_cwl = prepared.cwl
params = prepared.params
finalize()
Finalization logic after execution.
def finalize(self, log, output, usage_report, tool_logs)
Parameters:
log: Execution log
output: Workflow outputs
usage_report: Resource usage statistics
tool_logs: Tool execution logs
Behavior:
- Calls
execution_handler.post_execution_hook()
- Calls
execution_handler.handle_outputs()
Example:
runner.finalize(log, outputs, usage, tool_logs)
update_status()
Update execution status in ZOO.
def update_status(self, progress: int, message: str = "")
Parameters:
progress (int): Progress percentage (0-100)
message (str): Status message
Example:
runner.update_status(50, "Processing data...")
Validate input parameters.
def validate_inputs(self) -> bool
Returns:
- bool: True if inputs are valid
Example:
if not runner.validate_inputs():
return zoo.SERVICE_FAILED
log_output()
Log output information.
def log_output(self, output)
Parameters:
output: Output data to log
get_namespace_name()
Get or generate namespace name for execution.
def get_namespace_name(self) -> str
Returns:
- str: Namespace name (format: {identifier}-{usid})
Example:
namespace = runner.get_namespace_name()
# Returns: "process-name-12345"
Attributes
cwl
CWL workflow definition.
ZOO inputs dictionary.
conf
ZOO configuration dictionary.
outputs
ZOO outputs dictionary.
execution_handler
Execution handler instance.
namespace_name
Kubernetes namespace name.
zoo_conf
SimpleNamespace wrapper for configuration.
Default Handler
If no execution handler is provided, BaseRunner creates a default handler with no-op methods:
class DefaultHandler:
def pre_execution_hook(self):
pass
def post_execution_hook(self, *args, **kwargs):
pass
def get_secrets(self):
return None
def get_additional_parameters(self):
return {}
def get_pod_env_vars(self):
return None
def get_pod_node_selector(self):
return None
def handle_outputs(self, *args, **kwargs):
pass
def set_job_id(self, job_id):
pass
Usage Pattern
from base_runner import BaseRunner
class MyRunner(BaseRunner):
def execute(self):
"""Execute workflow"""
# Prepare
prepared = self.prepare()
self.update_status(20, "Executing")
# Execute
result = self.run_workflow(prepared.cwl, prepared.params)
# Finalize
self.finalize(None, result, None, None)
return zoo.SERVICE_SUCCEEDED
def wrap(self):
"""Wrap workflow"""
return self.cwl
def get_processing_parameters(self):
"""Get parameters"""
return {"max_cores": 8}
# Use in service
def my_service(conf, inputs, outputs):
runner = MyRunner(cwl, inputs, conf, outputs, handler)
return runner.execute()
See Also