CommonExecutionHandler API Reference
CommonExecutionHandler
Bases: ExecutionHandler
Simple execution handler for ZOO-Project CWL workflows.
This class provides basic functionality for handling CWL workflow execution
with STAC catalog output processing. For more specific use cases (e.g., EOEPCA
with Workspace API integration), extend this class and override the hooks.
Source code in zoo_template_common/common_execution_handler.py
| class CommonExecutionHandler(ExecutionHandler):
"""Simple execution handler for ZOO-Project CWL workflows.
This class provides basic functionality for handling CWL workflow execution
with STAC catalog output processing. For more specific use cases (e.g., EOEPCA
with Workspace API integration), extend this class and override the hooks.
"""
def pre_execution_hook(self):
"""Hook to run before execution. Override in subclasses for custom behavior."""
logger.info("Pre execution hook")
def setOutput(self, outputName, values):
"""Process and set output values from STAC catalog."""
output = self.outputs[outputName]
logger.info(f"Read catalog from STAC Catalog URI: {output} -> {values}")
if not isinstance(values[outputName], list):
logger.info(f"values[{outputName}] is not a list, transform to an array")
values[outputName] = [values[outputName]]
items = []
collection_id = self.get_additional_parameters()["sub_path"]
logger.info(f"Create collection with ID {collection_id}")
for i in range(len(values[outputName])):
if values[outputName][i] is None:
break
value_uri = str(values[outputName][i]["value"]).strip()
logger.info(f"setOutput: reading STAC catalog from '{value_uri}'")
# If the URI is not a JSON file (e.g. raw staged file like .png, .parquet),
# try to find catalog.json at the canonical path, or create a minimal item.
if not value_uri.endswith(".json"):
logger.warning(
f"Output '{outputName}' value '{value_uri}' is not a STAC catalog JSON. "
f"Creating a minimal STAC item directly."
)
basename = os.path.basename(str(value_uri).rstrip("/"))
mime_type, _ = mimetypes.guess_type(basename)
if mime_type is None:
mime_type = "application/octet-stream"
item = Item(
id=outputName,
geometry=None,
bbox=None,
datetime=datetime.now(tz=timezone.utc),
properties={},
)
item.add_asset("data", Asset(
href=str(value_uri),
media_type=mime_type,
roles=["data"],
))
item.collection_id = collection_id
items.append(item)
continue
cat: Catalog = read_file(value_uri)
collection = None
try:
logger.info(f"Catalog : {dir(cat)}")
collection: Collection = next(cat.get_all_collections())
except Exception:
logger.error("No collection found in the output catalog")
output["collection"] = json.dumps({}, indent=2)
return
logger.info(f"Got collection {collection.id} from processing outputs")
for item in collection.get_all_items():
logger.info(f"Processing item {item.id}")
for asset_key in item.assets.keys():
logger.info(f"Processing asset {asset_key}")
temp_asset = item.assets[asset_key].to_dict()
temp_asset["storage:platform"] = (
self.get_additional_parameters().get(
"storage_platform", "default"
)
)
temp_asset["storage:requester_pays"] = False
temp_asset["storage:tier"] = "Standard"
temp_asset["storage:region"] = self.get_additional_parameters().get(
"region_name", "default"
)
temp_asset["storage:endpoint"] = (
self.get_additional_parameters().get("endpoint_url", "")
)
item.assets[asset_key] = item.assets[asset_key].from_dict(
temp_asset
)
item.collection_id = collection_id
items.append(item.clone())
item_collection = ItemCollection(items=items)
logger.info("Created feature collection from items")
# Trap the case of no output collection
if item_collection is None:
logger.error("The output collection is empty")
output["collection"] = json.dumps({}, indent=2)
return
# Set the feature collection to be returned
output["collection"] = item_collection.to_dict()
output["collection"]["id"] = collection_id
def post_execution_hook(self, log, output, usage_report, tool_logs):
"""Hook to run after execution. Sets up S3 environment and processes outputs."""
# Unset HTTP proxy or else the S3 client will use it and fail
os.environ.pop("HTTP_PROXY", None)
# Set S3 environment variables from additional parameters
additional_params = self.get_additional_parameters()
os.environ["AWS_S3_REGION"] = additional_params.get("region_name", "")
os.environ["AWS_S3_ENDPOINT"] = additional_params.get("endpoint_url", "")
os.environ["AWS_ACCESS_KEY_ID"] = additional_params.get("aws_access_key_id", "")
os.environ["AWS_SECRET_ACCESS_KEY"] = additional_params.get(
"aws_secret_access_key", ""
)
logger.info("Post execution hook")
from zoo_template_common.custom_stac_io import CustomStacIO
StacIO.set_default(CustomStacIO)
for i in self.outputs:
logger.info(f"Output {i}: {self.outputs[i]}")
if "mimeType" in self.outputs[i]:
self.setOutput(i, output)
else:
logger.warning(f"Output {i} has no mimeType, skipping...")
self.outputs[i]["value"] = str(output[i])
@staticmethod
def local_get_file(fileName):
"""Read and load the contents of a yaml file."""
try:
with open(fileName) as file:
data = yaml.safe_load(file)
return data
except (FileNotFoundError, yaml.YAMLError, yaml.scanner.ScannerError):
return {}
def get_pod_env_vars(self) -> dict[str, str]:
"""Get environment variables for the pod spawned by calrissian."""
logger.info("get_pod_env_vars")
return self.conf.get("pod_env_vars", {})
def get_pod_node_selector(self) -> dict[str, str]:
"""Get node selector for the pod spawned by calrissian."""
logger.info("get_pod_node_selector")
return self.conf.get("pod_node_selector", {})
def get_additional_parameters(self) -> dict[str, str]:
"""Get additional parameters for the execution."""
logger.info("get_additional_parameters")
additional_parameters = self.conf.get("additional_parameters", {})
additional_parameters["sub_path"] = self.conf["lenv"]["usid"]
return additional_parameters
def get_secrets(self):
"""Get secrets for the pod spawned by calrissian."""
logger.info("get_secrets")
secrets = {
"imagePullSecrets": self.local_get_file(
"/assets/pod_imagePullSecrets.yaml"
),
"additionalImagePullSecrets": self.local_get_file(
"/assets/pod_additionalImagePullSecrets.yaml"
),
}
return secrets
def handle_outputs(self, log, output, usage_report, tool_logs):
"""Handle the output files of the execution and register tool logs."""
try:
logger.info("handle_outputs")
# Create service logs entries
services_logs = [
{
"url": os.path.join(
self.conf["main"]["tmpUrl"],
f"{self.conf['lenv']['Identifier']}-{self.conf['lenv']['usid']}",
os.path.basename(tool_log),
),
"title": f"Tool log {os.path.basename(tool_log)}",
"rel": "related",
}
for tool_log in tool_logs
]
cindex = 0
if "service_logs" in self.conf:
cindex = 1
for i in range(len(services_logs)):
okeys = ["url", "title", "rel"]
keys = ["url", "title", "rel"]
if cindex > 0:
for j in range(len(keys)):
keys[j] = keys[j] + "_" + str(cindex)
if "service_logs" not in self.conf:
self.conf["service_logs"] = {}
for j in range(len(keys)):
self.conf["service_logs"][keys[j]] = services_logs[i][okeys[j]]
cindex += 1
logger.warning(f"service_logs: {self.conf['service_logs']}")
self.conf["service_logs"]["length"] = str(cindex)
logger.info(f"service_logs: {self.conf['service_logs']}")
except Exception as e:
logger.error("ERROR in handle_outputs...")
logger.error(traceback.format_exc())
raise (e)
|
get_additional_parameters()
Get additional parameters for the execution.
Source code in zoo_template_common/common_execution_handler.py
| def get_additional_parameters(self) -> dict[str, str]:
"""Get additional parameters for the execution."""
logger.info("get_additional_parameters")
additional_parameters = self.conf.get("additional_parameters", {})
additional_parameters["sub_path"] = self.conf["lenv"]["usid"]
return additional_parameters
|
get_pod_env_vars()
Get environment variables for the pod spawned by calrissian.
Source code in zoo_template_common/common_execution_handler.py
| def get_pod_env_vars(self) -> dict[str, str]:
"""Get environment variables for the pod spawned by calrissian."""
logger.info("get_pod_env_vars")
return self.conf.get("pod_env_vars", {})
|
get_pod_node_selector()
Get node selector for the pod spawned by calrissian.
Source code in zoo_template_common/common_execution_handler.py
| def get_pod_node_selector(self) -> dict[str, str]:
"""Get node selector for the pod spawned by calrissian."""
logger.info("get_pod_node_selector")
return self.conf.get("pod_node_selector", {})
|
get_secrets()
Get secrets for the pod spawned by calrissian.
Source code in zoo_template_common/common_execution_handler.py
| def get_secrets(self):
"""Get secrets for the pod spawned by calrissian."""
logger.info("get_secrets")
secrets = {
"imagePullSecrets": self.local_get_file(
"/assets/pod_imagePullSecrets.yaml"
),
"additionalImagePullSecrets": self.local_get_file(
"/assets/pod_additionalImagePullSecrets.yaml"
),
}
return secrets
|
handle_outputs(log, output, usage_report, tool_logs)
Handle the output files of the execution and register tool logs.
Source code in zoo_template_common/common_execution_handler.py
| def handle_outputs(self, log, output, usage_report, tool_logs):
"""Handle the output files of the execution and register tool logs."""
try:
logger.info("handle_outputs")
# Create service logs entries
services_logs = [
{
"url": os.path.join(
self.conf["main"]["tmpUrl"],
f"{self.conf['lenv']['Identifier']}-{self.conf['lenv']['usid']}",
os.path.basename(tool_log),
),
"title": f"Tool log {os.path.basename(tool_log)}",
"rel": "related",
}
for tool_log in tool_logs
]
cindex = 0
if "service_logs" in self.conf:
cindex = 1
for i in range(len(services_logs)):
okeys = ["url", "title", "rel"]
keys = ["url", "title", "rel"]
if cindex > 0:
for j in range(len(keys)):
keys[j] = keys[j] + "_" + str(cindex)
if "service_logs" not in self.conf:
self.conf["service_logs"] = {}
for j in range(len(keys)):
self.conf["service_logs"][keys[j]] = services_logs[i][okeys[j]]
cindex += 1
logger.warning(f"service_logs: {self.conf['service_logs']}")
self.conf["service_logs"]["length"] = str(cindex)
logger.info(f"service_logs: {self.conf['service_logs']}")
except Exception as e:
logger.error("ERROR in handle_outputs...")
logger.error(traceback.format_exc())
raise (e)
|
local_get_file(fileName)
staticmethod
Read and load the contents of a yaml file.
Source code in zoo_template_common/common_execution_handler.py
| @staticmethod
def local_get_file(fileName):
"""Read and load the contents of a yaml file."""
try:
with open(fileName) as file:
data = yaml.safe_load(file)
return data
except (FileNotFoundError, yaml.YAMLError, yaml.scanner.ScannerError):
return {}
|
post_execution_hook(log, output, usage_report, tool_logs)
Hook to run after execution. Sets up S3 environment and processes outputs.
Source code in zoo_template_common/common_execution_handler.py
| def post_execution_hook(self, log, output, usage_report, tool_logs):
"""Hook to run after execution. Sets up S3 environment and processes outputs."""
# Unset HTTP proxy or else the S3 client will use it and fail
os.environ.pop("HTTP_PROXY", None)
# Set S3 environment variables from additional parameters
additional_params = self.get_additional_parameters()
os.environ["AWS_S3_REGION"] = additional_params.get("region_name", "")
os.environ["AWS_S3_ENDPOINT"] = additional_params.get("endpoint_url", "")
os.environ["AWS_ACCESS_KEY_ID"] = additional_params.get("aws_access_key_id", "")
os.environ["AWS_SECRET_ACCESS_KEY"] = additional_params.get(
"aws_secret_access_key", ""
)
logger.info("Post execution hook")
from zoo_template_common.custom_stac_io import CustomStacIO
StacIO.set_default(CustomStacIO)
for i in self.outputs:
logger.info(f"Output {i}: {self.outputs[i]}")
if "mimeType" in self.outputs[i]:
self.setOutput(i, output)
else:
logger.warning(f"Output {i} has no mimeType, skipping...")
self.outputs[i]["value"] = str(output[i])
|
pre_execution_hook()
Hook to run before execution. Override in subclasses for custom behavior.
Source code in zoo_template_common/common_execution_handler.py
| def pre_execution_hook(self):
"""Hook to run before execution. Override in subclasses for custom behavior."""
logger.info("Pre execution hook")
|
setOutput(outputName, values)
Process and set output values from STAC catalog.
Source code in zoo_template_common/common_execution_handler.py
| def setOutput(self, outputName, values):
"""Process and set output values from STAC catalog."""
output = self.outputs[outputName]
logger.info(f"Read catalog from STAC Catalog URI: {output} -> {values}")
if not isinstance(values[outputName], list):
logger.info(f"values[{outputName}] is not a list, transform to an array")
values[outputName] = [values[outputName]]
items = []
collection_id = self.get_additional_parameters()["sub_path"]
logger.info(f"Create collection with ID {collection_id}")
for i in range(len(values[outputName])):
if values[outputName][i] is None:
break
value_uri = str(values[outputName][i]["value"]).strip()
logger.info(f"setOutput: reading STAC catalog from '{value_uri}'")
# If the URI is not a JSON file (e.g. raw staged file like .png, .parquet),
# try to find catalog.json at the canonical path, or create a minimal item.
if not value_uri.endswith(".json"):
logger.warning(
f"Output '{outputName}' value '{value_uri}' is not a STAC catalog JSON. "
f"Creating a minimal STAC item directly."
)
basename = os.path.basename(str(value_uri).rstrip("/"))
mime_type, _ = mimetypes.guess_type(basename)
if mime_type is None:
mime_type = "application/octet-stream"
item = Item(
id=outputName,
geometry=None,
bbox=None,
datetime=datetime.now(tz=timezone.utc),
properties={},
)
item.add_asset("data", Asset(
href=str(value_uri),
media_type=mime_type,
roles=["data"],
))
item.collection_id = collection_id
items.append(item)
continue
cat: Catalog = read_file(value_uri)
collection = None
try:
logger.info(f"Catalog : {dir(cat)}")
collection: Collection = next(cat.get_all_collections())
except Exception:
logger.error("No collection found in the output catalog")
output["collection"] = json.dumps({}, indent=2)
return
logger.info(f"Got collection {collection.id} from processing outputs")
for item in collection.get_all_items():
logger.info(f"Processing item {item.id}")
for asset_key in item.assets.keys():
logger.info(f"Processing asset {asset_key}")
temp_asset = item.assets[asset_key].to_dict()
temp_asset["storage:platform"] = (
self.get_additional_parameters().get(
"storage_platform", "default"
)
)
temp_asset["storage:requester_pays"] = False
temp_asset["storage:tier"] = "Standard"
temp_asset["storage:region"] = self.get_additional_parameters().get(
"region_name", "default"
)
temp_asset["storage:endpoint"] = (
self.get_additional_parameters().get("endpoint_url", "")
)
item.assets[asset_key] = item.assets[asset_key].from_dict(
temp_asset
)
item.collection_id = collection_id
items.append(item.clone())
item_collection = ItemCollection(items=items)
logger.info("Created feature collection from items")
# Trap the case of no output collection
if item_collection is None:
logger.error("The output collection is empty")
output["collection"] = json.dumps({}, indent=2)
return
# Set the feature collection to be returned
output["collection"] = item_collection.to_dict()
output["collection"]["id"] = collection_id
|
Class Overview
CommonExecutionHandler is the base class for all CWL workflow execution handlers in ZOO-Project services.
Constructor
def __init__(self, conf, outputs=None)
Parameters:
conf (dict): ZOO configuration dictionary containing service configuration and environment
outputs (dict, optional): ZOO outputs dictionary for storing results
Example:
handler = CommonExecutionHandler(conf, outputs)
Methods
pre_execution_hook()
Called before workflow execution. Override to perform setup tasks.
def pre_execution_hook(self):
"""Hook called before execution"""
pass
Use cases:
- Load configuration
- Validate inputs
- Setup environment variables
- Initialize connections
Example:
def pre_execution_hook(self):
secrets = self.get_secrets()
os.environ["API_KEY"] = secrets.get("api_key")
post_execution_hook()
Called after workflow execution. Override to process results.
def post_execution_hook(self, log, output, usage_report, tool_logs):
"""Hook called after execution"""
# Setup S3 environment
os.environ["S3_ENDPOINT"] = self.conf.get("eoepca", {}).get("S3", {}).get("url", "")
Parameters:
log: Execution log
output: Workflow output data
usage_report: Resource usage statistics
tool_logs: Tool-specific logs
Use cases:
- Process workflow outputs
- Save results
- Send notifications
- Cleanup resources
Example:
def post_execution_hook(self, log, output, usage_report, tool_logs):
super().post_execution_hook(log, output, usage_report, tool_logs)
self.save_results(output)
self.send_notification("completed")
setOutput()
Process and set output values, handles STAC catalog processing.
def setOutput(self, outputName, values)
Parameters:
outputName (str): Name of the output
values (dict): Output values containing STAC catalog or data
Behavior:
- If
values contains "stac" key, processes as STAC catalog
- Reads STAC catalog file
- Converts ItemCollections to Catalogs
- Updates output dictionary
Example:
handler.setOutput("result", {
"stac": "/tmp/catalog.json",
"type": "application/json"
})
get_pod_env_vars()
Get environment variables for pod/container execution.
def get_pod_env_vars(self) -> dict
Returns:
- dict: Environment variables to set in execution pod
Default:
- Empty dictionary
Example:
def get_pod_env_vars(self):
env_vars = super().get_pod_env_vars()
env_vars["CUSTOM_VAR"] = "value"
env_vars["LOG_LEVEL"] = "DEBUG"
return env_vars
get_pod_node_selector()
Get node selector for pod scheduling.
def get_pod_node_selector(self) -> dict
Returns:
- dict: Node selector labels for Kubernetes scheduling
Default:
- Empty dictionary
Example:
def get_pod_node_selector(self):
return {
"workload": "processing",
"gpu": "true"
}
get_additional_parameters()
Get additional parameters for workflow execution.
def get_additional_parameters(self) -> dict
Returns:
- dict: Additional parameters for the workflow
Default:
- Empty dictionary
Example:
def get_additional_parameters(self):
return {
"max_cores": 8,
"max_ram": "16G",
"timeout": 3600
}
get_secrets()
Load secrets from YAML configuration files.
def get_secrets(self) -> dict
Returns:
- dict: Secrets loaded from configuration files
Searched paths:
1. /var/etc/secrets/processing_secrets.yaml
2. /var/etc/zoo-services-user/processing_secrets.yaml
Example:
secrets = handler.get_secrets()
api_key = secrets.get("api_key")
db_password = secrets.get("db_password")
handle_outputs()
Register tool logs as workflow outputs.
def handle_outputs(self, log, output, usage_report, tool_logs)
Parameters:
log: Execution log
output: Workflow outputs
usage_report: Resource usage
tool_logs: Tool execution logs
Example:
handler.handle_outputs(log, output, usage_report, tool_logs)
local_get_file()
Static method to load YAML file.
@staticmethod
def local_get_file(fileName: str) -> dict
Parameters:
fileName (str): Path to YAML file
Returns:
- dict: Parsed YAML content
Example:
config = CommonExecutionHandler.local_get_file("/path/to/config.yaml")
Attributes
conf
ZOO configuration dictionary.
Structure:
{
"lenv": {
"message": "", # Error/status messages
"Identifier": "service-id"
},
"main": {
"tmpPath": "/tmp"
},
"inputs": {...}, # Service inputs
"eoepca": { # EOEPCA configuration
"S3": {
"url": "https://s3.example.com"
}
}
}
outputs
ZOO outputs dictionary.
handler.outputs # dict or None
Structure:
{
"result": {
"value": "...",
"mimeType": "application/json"
}
}
Usage Pattern
Typical usage pattern:
class MyHandler(CommonExecutionHandler):
def pre_execution_hook(self):
# Setup
super().pre_execution_hook()
self.setup_environment()
def post_execution_hook(self, log, output, usage_report, tool_logs):
# Process results
super().post_execution_hook(log, output, usage_report, tool_logs)
self.process_results(output)
def get_pod_env_vars(self):
env = super().get_pod_env_vars()
env["CUSTOM"] = "value"
return env
# Use in ZOO service
def my_service(conf, inputs, outputs):
handler = MyHandler(conf, outputs)
handler.pre_execution_hook()
# Execute workflow...
handler.post_execution_hook(None, outputs, None, None)
return 3 # SUCCESS
See Also