Implementing a New Runner¶
This guide walks through implementing a new CWL runner for ZOO-Project.
Overview¶
To create a new runner, you need to:
- Extend
BaseRunner - Implement the
execute()method - Handle workflow execution logic
- Process outputs
Basic Structure¶
from base_runner import BaseRunner
import logging
logger = logging.getLogger(__name__)
class MyRunner(BaseRunner):
"""
Custom CWL runner implementation.
This runner executes CWL workflows using [your backend].
"""
def __init__(self, cwl, inputs, conf, outputs, execution_handler=None):
"""Initialize runner"""
super().__init__(cwl, inputs, conf, outputs, execution_handler)
# Add custom initialization
self.client = None
def execute(self):
"""Execute the workflow"""
try:
# 1. Prepare
self.prepare()
self.update_status(10, "Initializing")
# 2. Validate
if not self.validate_inputs():
return self.SERVICE_FAILED
# 3. Execute workflow
self.update_status(30, "Executing workflow")
result = self._run_workflow()
# 4. Process outputs
self.update_status(80, "Processing outputs")
self._process_outputs(result)
# 5. Finish
self.update_status(100, "Complete")
return self.SERVICE_SUCCEEDED
except Exception as e:
logger.error(f"Execution failed: {e}", exc_info=True)
self.conf["lenv"]["message"] = str(e)
return self.SERVICE_FAILED
def _run_workflow(self):
"""Execute workflow on backend"""
# Implement your workflow execution logic
pass
def _process_outputs(self, result):
"""Process workflow outputs"""
# Process and populate self.outputs
pass
Step-by-Step Implementation¶
Step 1: Setup Project¶
Create your runner package:
Create setup.py:
from setuptools import setup, find_packages
setup(
name="zoo-myrunner",
version="0.1.0",
packages=find_packages(),
install_requires=[
"zoo-runner-common>=0.1.0",
# Add your specific dependencies
],
author="Your Name",
description="My CWL runner for ZOO-Project",
)
Step 2: Implement Runner¶
Create zoo_myrunner/runner.py:
from base_runner import BaseRunner
import logging
import subprocess
import json
import os
logger = logging.getLogger(__name__)
class MyRunner(BaseRunner):
"""My CWL runner implementation"""
def __init__(self, cwl, inputs, conf, outputs, execution_handler=None):
super().__init__(cwl, inputs, conf, outputs, execution_handler)
self.work_dir = conf.get("main", {}).get("tmpPath", "/tmp")
self.job_id = None
def execute(self):
"""Execute CWL workflow"""
try:
# Prepare execution
self.prepare()
self.update_status(10, "Preparing workflow")
# Validate inputs
if not self.validate_inputs():
raise ValueError("Input validation failed")
# Setup execution environment
self.update_status(20, "Setting up environment")
self._setup_environment()
# Submit workflow
self.update_status(30, "Submitting workflow")
self.job_id = self._submit_workflow()
# Notify handler of job ID
self.execution_handler.set_job_id(self.job_id)
# Monitor execution
self.update_status(40, "Executing workflow")
result = self._monitor_execution()
# Download outputs
self.update_status(80, "Downloading outputs")
self._download_outputs(result)
# Post-process
self.update_status(90, "Post-processing")
self._post_process()
# Complete
self.update_status(100, "Workflow complete")
return self.SERVICE_SUCCEEDED
except Exception as e:
logger.error(f"Execution failed: {e}", exc_info=True)
self.conf["lenv"]["message"] = str(e)
return self.SERVICE_FAILED
def _setup_environment(self):
"""Setup execution environment"""
# Get environment variables from handler
env_vars = self.execution_handler.get_pod_env_vars()
if env_vars:
os.environ.update(env_vars)
# Get additional parameters
params = self.execution_handler.get_additional_parameters()
self.max_cores = params.get("max_cores", 4)
self.max_ram = params.get("max_ram", "8G")
logger.info(f"Environment configured: cores={self.max_cores}, ram={self.max_ram}")
def _submit_workflow(self):
"""Submit workflow to backend"""
# Example: submit to a workflow engine
cmd = [
"my-workflow-tool", "submit",
"--cwl", self.cwl,
"--inputs", self._prepare_inputs_file(),
"--cores", str(self.max_cores),
"--memory", self.max_ram
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
job_id = result.stdout.strip()
logger.info(f"Workflow submitted: {job_id}")
return job_id
def _prepare_inputs_file(self):
"""Prepare inputs file for workflow"""
inputs_file = os.path.join(self.work_dir, "inputs.json")
# Convert ZOO inputs to CWL inputs format
cwl_inputs = {}
for key, value in self.inputs.items():
cwl_inputs[key] = value.get("value")
with open(inputs_file, "w") as f:
json.dump(cwl_inputs, f)
return inputs_file
def _monitor_execution(self):
"""Monitor workflow execution"""
import time
while True:
status = self._get_job_status()
if status["state"] == "completed":
logger.info("Workflow completed successfully")
return status
elif status["state"] == "failed":
raise RuntimeError(f"Workflow failed: {status.get('error')}")
elif status["state"] == "running":
progress = 40 + int(status.get("progress", 0) * 0.4)
self.update_status(progress, f"Running: {status.get('step', 'processing')}")
time.sleep(10)
def _get_job_status(self):
"""Get job status from backend"""
cmd = ["my-workflow-tool", "status", self.job_id]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return json.loads(result.stdout)
def _download_outputs(self, result):
"""Download workflow outputs"""
output_dir = os.path.join(self.work_dir, "outputs")
os.makedirs(output_dir, exist_ok=True)
# Download outputs from backend
cmd = ["my-workflow-tool", "outputs", self.job_id, "--dest", output_dir]
subprocess.run(cmd, capture_output=True, text=True, check=True)
# Populate ZOO outputs
for output_name in self.outputs.keys():
output_path = os.path.join(output_dir, f"{output_name}.json")
if os.path.exists(output_path):
with open(output_path) as f:
self.outputs[output_name]["value"] = f.read()
logger.info("Outputs downloaded")
def _post_process(self):
"""Post-process outputs"""
# Call handler post-execution hook
self.execution_handler.post_execution_hook(
log=None,
output=self.outputs,
usage_report=None,
tool_logs=None
)
# Additional post-processing
self.execution_handler.handle_outputs(None, self.outputs, None, None)
Step 3: Create ZOO Service¶
Create zoo_myrunner/service.py:
from zoo_myrunner.runner import MyRunner
from zoo_template_common import CommonExecutionHandler
def execute_workflow(conf, inputs, outputs):
"""
ZOO Service for executing CWL workflows with MyRunner.
Args:
conf: ZOO configuration dictionary
inputs: ZOO inputs dictionary
outputs: ZOO outputs dictionary
Returns:
int: ZOO service status code (3=success, 4=failure)
"""
try:
# Get CWL document
cwl = conf["lenv"].get("cwl_document")
if not cwl:
conf["lenv"]["message"] = "No CWL document provided"
return 4
# Create execution handler
handler = CommonExecutionHandler(conf, outputs)
# Create and execute runner
runner = MyRunner(cwl, inputs, conf, outputs, handler)
status = runner.execute()
return status
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Service failed: {e}", exc_info=True)
conf["lenv"]["message"] = str(e)
return 4
Step 4: Add Configuration¶
Create zoo_myrunner/__init__.py:
"""My CWL Runner for ZOO-Project"""
__version__ = "0.1.0"
from .runner import MyRunner
from .service import execute_workflow
__all__ = ["MyRunner", "execute_workflow"]
Step 5: Write Tests¶
Create tests/test_myrunner.py:
import pytest
from zoo_myrunner import MyRunner
from unittest.mock import Mock, patch
def test_runner_initialization():
"""Test runner initialization"""
cwl = "/path/to/workflow.cwl"
inputs = {"input1": {"value": "test"}}
conf = {"lenv": {"message": ""}, "main": {"tmpPath": "/tmp"}}
outputs = {}
runner = MyRunner(cwl, inputs, conf, outputs)
assert runner.cwl == cwl
assert runner.inputs == inputs
assert runner.conf == conf
assert runner.outputs == outputs
@patch('subprocess.run')
def test_workflow_submission(mock_run):
"""Test workflow submission"""
mock_run.return_value = Mock(stdout="job-12345", returncode=0)
runner = MyRunner("/workflow.cwl", {}, {"lenv": {}, "main": {}}, {})
runner._setup_environment()
job_id = runner._submit_workflow()
assert job_id == "job-12345"
assert mock_run.called
def test_input_preparation():
"""Test input file preparation"""
inputs = {
"input1": {"value": "value1"},
"input2": {"value": "value2"}
}
runner = MyRunner("/workflow.cwl", inputs, {"main": {"tmpPath": "/tmp"}}, {})
inputs_file = runner._prepare_inputs_file()
import json
with open(inputs_file) as f:
cwl_inputs = json.load(f)
assert cwl_inputs["input1"] == "value1"
assert cwl_inputs["input2"] == "value2"
Step 6: Add Documentation¶
Create README.md:
# My CWL Runner for ZOO-Project
CWL runner implementation using [your backend].
## Installation
```bash
pip install zoo-myrunner
Usage¶
from zoo_myrunner import MyRunner
from zoo_template_common import CommonExecutionHandler
handler = CommonExecutionHandler(conf, outputs)
runner = MyRunner(cwl, inputs, conf, outputs, handler)
status = runner.execute()
Configuration¶
Set environment variables:
MY_BACKEND_URL: Backend API URLMY_BACKEND_TOKEN: Authentication token
Features¶
- Automatic workflow submission
- Progress monitoring
- Output retrieval
- Error handling
Publish to PyPI:
Integration with zoo-cwl-runners¶
To integrate with the main runners package:
- Add to
zoo-cwl-runners/main_runner.py:
def select_runner():
"""Select appropriate runner"""
runner_type = os.environ.get("ZOO_RUNNER", "calrissian")
if runner_type == "myrunner":
from zoo_myrunner import MyRunner
return MyRunner
# ... other runners
- Add to dependencies:
Best Practices¶
- Error Handling: Catch and log all exceptions
- Status Updates: Keep users informed of progress
- Cleanup: Clean up temporary files and resources
- Testing: Write comprehensive unit and integration tests
- Documentation: Document configuration and usage
- Logging: Use structured logging for debugging
See Also¶
- BaseRunner API
- BaseRunner User Guide
- ZooConf User Guide
- zoo-cwl-runners Documentation - Real implementation examples
- ZOO-Project Documentation