Skip to content

Basic Usage

This guide covers the basic usage of zoo-template-common in your ZOO services.

Simple Handler

The most basic usage extends CommonExecutionHandler:

from zoo_template_common import CommonExecutionHandler

class MyExecutionHandler(CommonExecutionHandler):
    """Custom execution handler"""

    def __init__(self, conf, outputs=None):
        super().__init__(conf, outputs)

Pre-Execution Hook

Override pre_execution_hook() to perform setup before workflow execution:

class MyExecutionHandler(CommonExecutionHandler):
    def pre_execution_hook(self):
        """Setup before execution"""
        # Load configuration
        self.config = self.load_config()

        # Validate inputs
        if not self.validate_inputs():
            raise ValueError("Invalid inputs")

        # Setup environment
        os.environ["CUSTOM_VAR"] = "value"

Post-Execution Hook

Override post_execution_hook() to process results after workflow execution:

class MyExecutionHandler(CommonExecutionHandler):
    def post_execution_hook(self, log, output, usage_report, tool_logs):
        """Process results after execution"""
        # Call parent to setup S3 environment
        super().post_execution_hook(log, output, usage_report, tool_logs)

        # Custom processing
        self.process_outputs(output)
        self.send_notification()

Output Processing

Use setOutput() to process STAC catalogs:

class MyExecutionHandler(CommonExecutionHandler):
    def process_results(self, results_path):
        """Process workflow results"""
        # setOutput handles STAC catalog processing
        self.setOutput("result", {
            "stac": results_path,
            "type": "application/json"
        })

Pod Configuration

Customize pod environment and node selection:

class MyExecutionHandler(CommonExecutionHandler):
    def get_pod_env_vars(self):
        """Custom pod environment variables"""
        env_vars = super().get_pod_env_vars()
        env_vars.update({
            "CUSTOM_ENV": "value",
            "API_KEY": self.get_secret("api_key")
        })
        return env_vars

    def get_pod_node_selector(self):
        """Select specific nodes"""
        return {
            "workload": "processing",
            "gpu": "true"
        }

Additional Parameters

Add custom parameters to workflow execution:

class MyExecutionHandler(CommonExecutionHandler):
    def get_additional_parameters(self):
        """Custom workflow parameters"""
        params = super().get_additional_parameters()
        params["max_cores"] = 8
        params["max_ram"] = "16G"
        return params

Secrets Management

Load secrets from YAML files:

class MyExecutionHandler(CommonExecutionHandler):
    def setup_credentials(self):
        """Load and use secrets"""
        secrets = self.get_secrets()

        api_key = secrets.get("api_key")
        db_password = secrets.get("db_password")

        # Use secrets
        self.configure_api(api_key)
        self.connect_database(db_password)

Secrets are loaded from: - /var/etc/secrets/processing_secrets.yaml - /var/etc/zoo-services-user/processing_secrets.yaml

Complete Example

from zoo_template_common import CommonExecutionHandler
import os

class ProcessingHandler(CommonExecutionHandler):
    """Handler for data processing workflows"""

    def pre_execution_hook(self):
        """Setup processing environment"""
        # Load secrets
        secrets = self.get_secrets()
        os.environ["S3_KEY"] = secrets.get("s3_access_key", "")
        os.environ["S3_SECRET"] = secrets.get("s3_secret_key", "")

        # Validate configuration
        required = ["input_data", "output_bucket"]
        for key in required:
            if key not in self.conf["lenv"]:
                raise ValueError(f"Missing required parameter: {key}")

    def post_execution_hook(self, log, output, usage_report, tool_logs):
        """Process results"""
        # Setup S3 environment
        super().post_execution_hook(log, output, usage_report, tool_logs)

        # Log execution stats
        self.log_stats(usage_report)

        # Register tool logs
        self.handle_outputs(log, output, usage_report, tool_logs)

    def get_pod_env_vars(self):
        """Configure pod environment"""
        env_vars = super().get_pod_env_vars()
        env_vars["PROCESSING_MODE"] = "batch"
        env_vars["LOG_LEVEL"] = "INFO"
        return env_vars

    def get_pod_node_selector(self):
        """Select compute nodes"""
        return {"node-type": "compute-optimized"}

    def get_additional_parameters(self):
        """Workflow parameters"""
        return {
            "max_cores": 16,
            "max_ram": "32G",
            "timeout": 3600
        }

    def log_stats(self, usage_report):
        """Log execution statistics"""
        if usage_report:
            print(f"Execution time: {usage_report.get('duration', 'N/A')}")
            print(f"Memory used: {usage_report.get('memory', 'N/A')}")

Error Handling

Handle errors gracefully:

class MyExecutionHandler(CommonExecutionHandler):
    def safe_execute(self):
        """Execute with error handling"""
        try:
            self.pre_execution_hook()
            result = self.run_workflow()
            self.post_execution_hook(None, result, None, None)
            return True
        except ValueError as e:
            self.conf["lenv"]["message"] = f"Validation error: {e}"
            return False
        except Exception as e:
            self.conf["lenv"]["message"] = f"Execution failed: {e}"
            return False

Next Steps