Skip to content

CommonExecutionHandler API Reference

CommonExecutionHandler

Bases: ExecutionHandler

Simple execution handler for ZOO-Project CWL workflows.

This class provides basic functionality for handling CWL workflow execution with STAC catalog output processing. For more specific use cases (e.g., EOEPCA with Workspace API integration), extend this class and override the hooks.

Source code in zoo_template_common/common_execution_handler.py
class CommonExecutionHandler(ExecutionHandler):
    """Simple execution handler for ZOO-Project CWL workflows.

    This class provides basic functionality for handling CWL workflow execution
    with STAC catalog output processing. For more specific use cases (e.g., EOEPCA
    with Workspace API integration), extend this class and override the hooks.
    """

    def pre_execution_hook(self):
        """Hook to run before execution. Override in subclasses for custom behavior."""
        logger.info("Pre execution hook")

    def setOutput(self, outputName, values):
        """Process and set output values from STAC catalog."""
        output = self.outputs[outputName]
        logger.info(f"Read catalog from STAC Catalog URI: {output} -> {values}")

        if not isinstance(values[outputName], list):
            logger.info(f"values[{outputName}] is not a list, transform to an array")
            values[outputName] = [values[outputName]]

        items = []
        collection_id = self.get_additional_parameters()["sub_path"]
        logger.info(f"Create collection with ID {collection_id}")

        for i in range(len(values[outputName])):
            if values[outputName][i] is None:
                break
            value_uri = str(values[outputName][i]["value"]).strip()
            logger.info(f"setOutput: reading STAC catalog from '{value_uri}'")
            # If the URI is not a JSON file (e.g. raw staged file like .png, .parquet),
            # try to find catalog.json at the canonical path, or create a minimal item.
            if not value_uri.endswith(".json"):
                logger.warning(
                    f"Output '{outputName}' value '{value_uri}' is not a STAC catalog JSON. "
                    f"Creating a minimal STAC item directly."
                )
                basename = os.path.basename(str(value_uri).rstrip("/"))
                mime_type, _ = mimetypes.guess_type(basename)
                if mime_type is None:
                    mime_type = "application/octet-stream"
                item = Item(
                    id=outputName,
                    geometry=None,
                    bbox=None,
                    datetime=datetime.now(tz=timezone.utc),
                    properties={},
                )
                item.add_asset("data", Asset(
                    href=str(value_uri),
                    media_type=mime_type,
                    roles=["data"],
                ))
                item.collection_id = collection_id
                items.append(item)
                continue
            cat: Catalog = read_file(value_uri)

            collection = None

            try:
                logger.info(f"Catalog : {dir(cat)}")
                collection: Collection = next(cat.get_all_collections())
            except Exception:
                logger.error("No collection found in the output catalog")
                output["collection"] = json.dumps({}, indent=2)
                return

            logger.info(f"Got collection {collection.id} from processing outputs")

            for item in collection.get_all_items():
                logger.info(f"Processing item {item.id}")

                for asset_key in item.assets.keys():
                    logger.info(f"Processing asset {asset_key}")

                    temp_asset = item.assets[asset_key].to_dict()
                    temp_asset["storage:platform"] = (
                        self.get_additional_parameters().get(
                            "storage_platform", "default"
                        )
                    )
                    temp_asset["storage:requester_pays"] = False
                    temp_asset["storage:tier"] = "Standard"
                    temp_asset["storage:region"] = self.get_additional_parameters().get(
                        "region_name", "default"
                    )
                    temp_asset["storage:endpoint"] = (
                        self.get_additional_parameters().get("endpoint_url", "")
                    )
                    item.assets[asset_key] = item.assets[asset_key].from_dict(
                        temp_asset
                    )

                item.collection_id = collection_id
                items.append(item.clone())

        item_collection = ItemCollection(items=items)
        logger.info("Created feature collection from items")

        # Trap the case of no output collection
        if item_collection is None:
            logger.error("The output collection is empty")
            output["collection"] = json.dumps({}, indent=2)
            return

        # Set the feature collection to be returned
        output["collection"] = item_collection.to_dict()
        output["collection"]["id"] = collection_id

    def post_execution_hook(self, log, output, usage_report, tool_logs):
        """Hook to run after execution. Sets up S3 environment and processes outputs."""
        # Unset HTTP proxy or else the S3 client will use it and fail
        os.environ.pop("HTTP_PROXY", None)

        # Set S3 environment variables from additional parameters
        additional_params = self.get_additional_parameters()
        os.environ["AWS_S3_REGION"] = additional_params.get("region_name", "")
        os.environ["AWS_S3_ENDPOINT"] = additional_params.get("endpoint_url", "")
        os.environ["AWS_ACCESS_KEY_ID"] = additional_params.get("aws_access_key_id", "")
        os.environ["AWS_SECRET_ACCESS_KEY"] = additional_params.get(
            "aws_secret_access_key", ""
        )

        logger.info("Post execution hook")

        from zoo_template_common.custom_stac_io import CustomStacIO

        StacIO.set_default(CustomStacIO)

        for i in self.outputs:
            logger.info(f"Output {i}: {self.outputs[i]}")
            if "mimeType" in self.outputs[i]:
                self.setOutput(i, output)
            else:
                logger.warning(f"Output {i} has no mimeType, skipping...")
                self.outputs[i]["value"] = str(output[i])

    @staticmethod
    def local_get_file(fileName):
        """Read and load the contents of a yaml file."""
        try:
            with open(fileName) as file:
                data = yaml.safe_load(file)
            return data
        except (FileNotFoundError, yaml.YAMLError, yaml.scanner.ScannerError):
            return {}

    def get_pod_env_vars(self) -> dict[str, str]:
        """Get environment variables for the pod spawned by calrissian."""
        logger.info("get_pod_env_vars")
        return self.conf.get("pod_env_vars", {})

    def get_pod_node_selector(self) -> dict[str, str]:
        """Get node selector for the pod spawned by calrissian."""
        logger.info("get_pod_node_selector")
        return self.conf.get("pod_node_selector", {})

    def get_additional_parameters(self) -> dict[str, str]:
        """Get additional parameters for the execution."""
        logger.info("get_additional_parameters")
        additional_parameters = self.conf.get("additional_parameters", {})
        additional_parameters["sub_path"] = self.conf["lenv"]["usid"]
        return additional_parameters

    def get_secrets(self):
        """Get secrets for the pod spawned by calrissian."""
        logger.info("get_secrets")
        secrets = {
            "imagePullSecrets": self.local_get_file(
                "/assets/pod_imagePullSecrets.yaml"
            ),
            "additionalImagePullSecrets": self.local_get_file(
                "/assets/pod_additionalImagePullSecrets.yaml"
            ),
        }
        return secrets

    def handle_outputs(self, log, output, usage_report, tool_logs):
        """Handle the output files of the execution and register tool logs."""
        try:
            logger.info("handle_outputs")

            # Create service logs entries
            services_logs = [
                {
                    "url": os.path.join(
                        self.conf["main"]["tmpUrl"],
                        f"{self.conf['lenv']['Identifier']}-{self.conf['lenv']['usid']}",
                        os.path.basename(tool_log),
                    ),
                    "title": f"Tool log {os.path.basename(tool_log)}",
                    "rel": "related",
                }
                for tool_log in tool_logs
            ]

            cindex = 0
            if "service_logs" in self.conf:
                cindex = 1

            for i in range(len(services_logs)):
                okeys = ["url", "title", "rel"]
                keys = ["url", "title", "rel"]
                if cindex > 0:
                    for j in range(len(keys)):
                        keys[j] = keys[j] + "_" + str(cindex)
                if "service_logs" not in self.conf:
                    self.conf["service_logs"] = {}
                for j in range(len(keys)):
                    self.conf["service_logs"][keys[j]] = services_logs[i][okeys[j]]
                cindex += 1
                logger.warning(f"service_logs: {self.conf['service_logs']}")

            self.conf["service_logs"]["length"] = str(cindex)
            logger.info(f"service_logs: {self.conf['service_logs']}")

        except Exception as e:
            logger.error("ERROR in handle_outputs...")
            logger.error(traceback.format_exc())
            raise (e)

get_additional_parameters()

Get additional parameters for the execution.

Source code in zoo_template_common/common_execution_handler.py
def get_additional_parameters(self) -> dict[str, str]:
    """Get additional parameters for the execution."""
    logger.info("get_additional_parameters")
    additional_parameters = self.conf.get("additional_parameters", {})
    additional_parameters["sub_path"] = self.conf["lenv"]["usid"]
    return additional_parameters

get_pod_env_vars()

Get environment variables for the pod spawned by calrissian.

Source code in zoo_template_common/common_execution_handler.py
def get_pod_env_vars(self) -> dict[str, str]:
    """Get environment variables for the pod spawned by calrissian."""
    logger.info("get_pod_env_vars")
    return self.conf.get("pod_env_vars", {})

get_pod_node_selector()

Get node selector for the pod spawned by calrissian.

Source code in zoo_template_common/common_execution_handler.py
def get_pod_node_selector(self) -> dict[str, str]:
    """Get node selector for the pod spawned by calrissian."""
    logger.info("get_pod_node_selector")
    return self.conf.get("pod_node_selector", {})

get_secrets()

Get secrets for the pod spawned by calrissian.

Source code in zoo_template_common/common_execution_handler.py
def get_secrets(self):
    """Get secrets for the pod spawned by calrissian."""
    logger.info("get_secrets")
    secrets = {
        "imagePullSecrets": self.local_get_file(
            "/assets/pod_imagePullSecrets.yaml"
        ),
        "additionalImagePullSecrets": self.local_get_file(
            "/assets/pod_additionalImagePullSecrets.yaml"
        ),
    }
    return secrets

handle_outputs(log, output, usage_report, tool_logs)

Handle the output files of the execution and register tool logs.

Source code in zoo_template_common/common_execution_handler.py
def handle_outputs(self, log, output, usage_report, tool_logs):
    """Handle the output files of the execution and register tool logs."""
    try:
        logger.info("handle_outputs")

        # Create service logs entries
        services_logs = [
            {
                "url": os.path.join(
                    self.conf["main"]["tmpUrl"],
                    f"{self.conf['lenv']['Identifier']}-{self.conf['lenv']['usid']}",
                    os.path.basename(tool_log),
                ),
                "title": f"Tool log {os.path.basename(tool_log)}",
                "rel": "related",
            }
            for tool_log in tool_logs
        ]

        cindex = 0
        if "service_logs" in self.conf:
            cindex = 1

        for i in range(len(services_logs)):
            okeys = ["url", "title", "rel"]
            keys = ["url", "title", "rel"]
            if cindex > 0:
                for j in range(len(keys)):
                    keys[j] = keys[j] + "_" + str(cindex)
            if "service_logs" not in self.conf:
                self.conf["service_logs"] = {}
            for j in range(len(keys)):
                self.conf["service_logs"][keys[j]] = services_logs[i][okeys[j]]
            cindex += 1
            logger.warning(f"service_logs: {self.conf['service_logs']}")

        self.conf["service_logs"]["length"] = str(cindex)
        logger.info(f"service_logs: {self.conf['service_logs']}")

    except Exception as e:
        logger.error("ERROR in handle_outputs...")
        logger.error(traceback.format_exc())
        raise (e)

local_get_file(fileName) staticmethod

Read and load the contents of a yaml file.

Source code in zoo_template_common/common_execution_handler.py
@staticmethod
def local_get_file(fileName):
    """Read and load the contents of a yaml file."""
    try:
        with open(fileName) as file:
            data = yaml.safe_load(file)
        return data
    except (FileNotFoundError, yaml.YAMLError, yaml.scanner.ScannerError):
        return {}

post_execution_hook(log, output, usage_report, tool_logs)

Hook to run after execution. Sets up S3 environment and processes outputs.

Source code in zoo_template_common/common_execution_handler.py
def post_execution_hook(self, log, output, usage_report, tool_logs):
    """Hook to run after execution. Sets up S3 environment and processes outputs."""
    # Unset HTTP proxy or else the S3 client will use it and fail
    os.environ.pop("HTTP_PROXY", None)

    # Set S3 environment variables from additional parameters
    additional_params = self.get_additional_parameters()
    os.environ["AWS_S3_REGION"] = additional_params.get("region_name", "")
    os.environ["AWS_S3_ENDPOINT"] = additional_params.get("endpoint_url", "")
    os.environ["AWS_ACCESS_KEY_ID"] = additional_params.get("aws_access_key_id", "")
    os.environ["AWS_SECRET_ACCESS_KEY"] = additional_params.get(
        "aws_secret_access_key", ""
    )

    logger.info("Post execution hook")

    from zoo_template_common.custom_stac_io import CustomStacIO

    StacIO.set_default(CustomStacIO)

    for i in self.outputs:
        logger.info(f"Output {i}: {self.outputs[i]}")
        if "mimeType" in self.outputs[i]:
            self.setOutput(i, output)
        else:
            logger.warning(f"Output {i} has no mimeType, skipping...")
            self.outputs[i]["value"] = str(output[i])

pre_execution_hook()

Hook to run before execution. Override in subclasses for custom behavior.

Source code in zoo_template_common/common_execution_handler.py
def pre_execution_hook(self):
    """Hook to run before execution. Override in subclasses for custom behavior."""
    logger.info("Pre execution hook")

setOutput(outputName, values)

Process and set output values from STAC catalog.

Source code in zoo_template_common/common_execution_handler.py
def setOutput(self, outputName, values):
    """Process and set output values from STAC catalog."""
    output = self.outputs[outputName]
    logger.info(f"Read catalog from STAC Catalog URI: {output} -> {values}")

    if not isinstance(values[outputName], list):
        logger.info(f"values[{outputName}] is not a list, transform to an array")
        values[outputName] = [values[outputName]]

    items = []
    collection_id = self.get_additional_parameters()["sub_path"]
    logger.info(f"Create collection with ID {collection_id}")

    for i in range(len(values[outputName])):
        if values[outputName][i] is None:
            break
        value_uri = str(values[outputName][i]["value"]).strip()
        logger.info(f"setOutput: reading STAC catalog from '{value_uri}'")
        # If the URI is not a JSON file (e.g. raw staged file like .png, .parquet),
        # try to find catalog.json at the canonical path, or create a minimal item.
        if not value_uri.endswith(".json"):
            logger.warning(
                f"Output '{outputName}' value '{value_uri}' is not a STAC catalog JSON. "
                f"Creating a minimal STAC item directly."
            )
            basename = os.path.basename(str(value_uri).rstrip("/"))
            mime_type, _ = mimetypes.guess_type(basename)
            if mime_type is None:
                mime_type = "application/octet-stream"
            item = Item(
                id=outputName,
                geometry=None,
                bbox=None,
                datetime=datetime.now(tz=timezone.utc),
                properties={},
            )
            item.add_asset("data", Asset(
                href=str(value_uri),
                media_type=mime_type,
                roles=["data"],
            ))
            item.collection_id = collection_id
            items.append(item)
            continue
        cat: Catalog = read_file(value_uri)

        collection = None

        try:
            logger.info(f"Catalog : {dir(cat)}")
            collection: Collection = next(cat.get_all_collections())
        except Exception:
            logger.error("No collection found in the output catalog")
            output["collection"] = json.dumps({}, indent=2)
            return

        logger.info(f"Got collection {collection.id} from processing outputs")

        for item in collection.get_all_items():
            logger.info(f"Processing item {item.id}")

            for asset_key in item.assets.keys():
                logger.info(f"Processing asset {asset_key}")

                temp_asset = item.assets[asset_key].to_dict()
                temp_asset["storage:platform"] = (
                    self.get_additional_parameters().get(
                        "storage_platform", "default"
                    )
                )
                temp_asset["storage:requester_pays"] = False
                temp_asset["storage:tier"] = "Standard"
                temp_asset["storage:region"] = self.get_additional_parameters().get(
                    "region_name", "default"
                )
                temp_asset["storage:endpoint"] = (
                    self.get_additional_parameters().get("endpoint_url", "")
                )
                item.assets[asset_key] = item.assets[asset_key].from_dict(
                    temp_asset
                )

            item.collection_id = collection_id
            items.append(item.clone())

    item_collection = ItemCollection(items=items)
    logger.info("Created feature collection from items")

    # Trap the case of no output collection
    if item_collection is None:
        logger.error("The output collection is empty")
        output["collection"] = json.dumps({}, indent=2)
        return

    # Set the feature collection to be returned
    output["collection"] = item_collection.to_dict()
    output["collection"]["id"] = collection_id

Class Overview

CommonExecutionHandler is the base class for all CWL workflow execution handlers in ZOO-Project services.

Constructor

def __init__(self, conf, outputs=None)

Parameters:

  • conf (dict): ZOO configuration dictionary containing service configuration and environment
  • outputs (dict, optional): ZOO outputs dictionary for storing results

Example:

handler = CommonExecutionHandler(conf, outputs)

Methods

pre_execution_hook()

Called before workflow execution. Override to perform setup tasks.

def pre_execution_hook(self):
    """Hook called before execution"""
    pass

Use cases: - Load configuration - Validate inputs - Setup environment variables - Initialize connections

Example:

def pre_execution_hook(self):
    secrets = self.get_secrets()
    os.environ["API_KEY"] = secrets.get("api_key")

post_execution_hook()

Called after workflow execution. Override to process results.

def post_execution_hook(self, log, output, usage_report, tool_logs):
    """Hook called after execution"""
    # Setup S3 environment
    os.environ["S3_ENDPOINT"] = self.conf.get("eoepca", {}).get("S3", {}).get("url", "")

Parameters:

  • log: Execution log
  • output: Workflow output data
  • usage_report: Resource usage statistics
  • tool_logs: Tool-specific logs

Use cases: - Process workflow outputs - Save results - Send notifications - Cleanup resources

Example:

def post_execution_hook(self, log, output, usage_report, tool_logs):
    super().post_execution_hook(log, output, usage_report, tool_logs)
    self.save_results(output)
    self.send_notification("completed")

setOutput()

Process and set output values, handles STAC catalog processing.

def setOutput(self, outputName, values)

Parameters:

  • outputName (str): Name of the output
  • values (dict): Output values containing STAC catalog or data

Behavior:

  • If values contains "stac" key, processes as STAC catalog
  • Reads STAC catalog file
  • Converts ItemCollections to Catalogs
  • Updates output dictionary

Example:

handler.setOutput("result", {
    "stac": "/tmp/catalog.json",
    "type": "application/json"
})

get_pod_env_vars()

Get environment variables for pod/container execution.

def get_pod_env_vars(self) -> dict

Returns: - dict: Environment variables to set in execution pod

Default: - Empty dictionary

Example:

def get_pod_env_vars(self):
    env_vars = super().get_pod_env_vars()
    env_vars["CUSTOM_VAR"] = "value"
    env_vars["LOG_LEVEL"] = "DEBUG"
    return env_vars

get_pod_node_selector()

Get node selector for pod scheduling.

def get_pod_node_selector(self) -> dict

Returns: - dict: Node selector labels for Kubernetes scheduling

Default: - Empty dictionary

Example:

def get_pod_node_selector(self):
    return {
        "workload": "processing",
        "gpu": "true"
    }

get_additional_parameters()

Get additional parameters for workflow execution.

def get_additional_parameters(self) -> dict

Returns: - dict: Additional parameters for the workflow

Default: - Empty dictionary

Example:

def get_additional_parameters(self):
    return {
        "max_cores": 8,
        "max_ram": "16G",
        "timeout": 3600
    }

get_secrets()

Load secrets from YAML configuration files.

def get_secrets(self) -> dict

Returns: - dict: Secrets loaded from configuration files

Searched paths: 1. /var/etc/secrets/processing_secrets.yaml 2. /var/etc/zoo-services-user/processing_secrets.yaml

Example:

secrets = handler.get_secrets()
api_key = secrets.get("api_key")
db_password = secrets.get("db_password")

handle_outputs()

Register tool logs as workflow outputs.

def handle_outputs(self, log, output, usage_report, tool_logs)

Parameters:

  • log: Execution log
  • output: Workflow outputs
  • usage_report: Resource usage
  • tool_logs: Tool execution logs

Example:

handler.handle_outputs(log, output, usage_report, tool_logs)

local_get_file()

Static method to load YAML file.

@staticmethod
def local_get_file(fileName: str) -> dict

Parameters:

  • fileName (str): Path to YAML file

Returns: - dict: Parsed YAML content

Example:

config = CommonExecutionHandler.local_get_file("/path/to/config.yaml")

Attributes

conf

ZOO configuration dictionary.

handler.conf  # dict

Structure:

{
    "lenv": {
        "message": "",  # Error/status messages
        "Identifier": "service-id"
    },
    "main": {
        "tmpPath": "/tmp"
    },
    "inputs": {...},  # Service inputs
    "eoepca": {  # EOEPCA configuration
        "S3": {
            "url": "https://s3.example.com"
        }
    }
}

outputs

ZOO outputs dictionary.

handler.outputs  # dict or None

Structure:

{
    "result": {
        "value": "...",
        "mimeType": "application/json"
    }
}

Usage Pattern

Typical usage pattern:

class MyHandler(CommonExecutionHandler):
    def pre_execution_hook(self):
        # Setup
        super().pre_execution_hook()
        self.setup_environment()

    def post_execution_hook(self, log, output, usage_report, tool_logs):
        # Process results
        super().post_execution_hook(log, output, usage_report, tool_logs)
        self.process_results(output)

    def get_pod_env_vars(self):
        env = super().get_pod_env_vars()
        env["CUSTOM"] = "value"
        return env

# Use in ZOO service
def my_service(conf, inputs, outputs):
    handler = MyHandler(conf, outputs)
    handler.pre_execution_hook()
    # Execute workflow...
    handler.post_execution_hook(None, outputs, None, None)
    return 3  # SUCCESS

See Also