ZooConf API Reference
The zoo_conf module provides utilities for working with CWL workflows and ZOO-Project configuration.
Classes
ZooConf
Main configuration handler for ZOO-Project services.
ZooConf
Source code in zoo_runner_common/zoo_conf.py
| class ZooConf:
def __init__(self, conf):
self.conf = conf
self.workflow_id = self.conf["lenv"]["Identifier"]
|
Handler for ZOO-Project service inputs with type conversion and validation.
Source code in zoo_runner_common/zoo_conf.py
| class ZooInputs:
def __init__(self, inputs):
# this conversion is necessary
# because zoo converts array of length 1 to a string
for inp in inputs:
if (
"maxOccurs" in inputs[inp].keys()
and int(inputs[inp]["maxOccurs"]) > 1
and not isinstance(inputs[inp]["value"], list)
):
inputs[inp]["value"] = [inputs[inp]["value"]]
self.inputs = inputs
def get_input_value(self, key):
try:
return self.inputs[key]["value"]
except KeyError as exc:
raise exc
except TypeError:
pass
def get_processing_parameters(self, workflow=None):
"""Returns a list with the input parameters keys
Args:
workflow: Optional CWL workflow object (currently unused, for future compatibility)
"""
import json
res = {}
allowed_types = ["integer", "float", "boolean", "double"]
for key, value in self.inputs.items():
if "format" in value and not("dataType" in value and value["dataType"] in allowed_types):
res[key] = {
"format": value["format"],
"value": value["value"],
}
elif "dataType" in value:
if isinstance(value["dataType"], list):
if value["dataType"][0] in allowed_types:
if value["dataType"][0] in ["double", "float"]:
res[key] = [float(item) for item in value["value"]]
elif value["dataType"][0] == "integer":
res[key] = [int(item) for item in value["value"]]
elif value["dataType"][0] == "boolean":
res[key] = [bool(item) for item in value["value"]]
else:
res[key] = value["value"]
else:
if value["value"] == "NULL":
res[key] = None
else:
if value["dataType"] in ["double", "float"]:
res[key] = float(value["value"])
elif value["dataType"] == "integer":
res[key] = int(value["value"])
elif value["dataType"] == "boolean":
res[key] = bool(value["value"])
else:
res[key] = value["value"]
else:
if "cache_file" in value:
if "isArray" in value and value["isArray"] == "true":
res[key] = []
for i in range(len(value["value"])):
res[key].append({
"format": value["mimeType"][i] if "mimeType" in value else "text/plain",
"value": value["value"][i],
})
else:
res[key] = {
"format": value.get("mimeType", "text/plain"),
"value": value["value"]
}
else:
if "lowerCorner" in value and "upperCorner" in value:
res[key] = {
"format": "ogc-bbox",
"bbox": json.loads(value["value"]),
"crs": value["crs"].replace("http://www.opengis.net/def/crs/OGC/1.3/", "")
}
else:
res[key] = value["value"]
return res
|
Returns a list with the input parameters keys
Parameters:
| Name |
Type |
Description |
Default |
workflow
|
|
Optional CWL workflow object (currently unused, for future compatibility)
|
None
|
Source code in zoo_runner_common/zoo_conf.py
| def get_processing_parameters(self, workflow=None):
"""Returns a list with the input parameters keys
Args:
workflow: Optional CWL workflow object (currently unused, for future compatibility)
"""
import json
res = {}
allowed_types = ["integer", "float", "boolean", "double"]
for key, value in self.inputs.items():
if "format" in value and not("dataType" in value and value["dataType"] in allowed_types):
res[key] = {
"format": value["format"],
"value": value["value"],
}
elif "dataType" in value:
if isinstance(value["dataType"], list):
if value["dataType"][0] in allowed_types:
if value["dataType"][0] in ["double", "float"]:
res[key] = [float(item) for item in value["value"]]
elif value["dataType"][0] == "integer":
res[key] = [int(item) for item in value["value"]]
elif value["dataType"][0] == "boolean":
res[key] = [bool(item) for item in value["value"]]
else:
res[key] = value["value"]
else:
if value["value"] == "NULL":
res[key] = None
else:
if value["dataType"] in ["double", "float"]:
res[key] = float(value["value"])
elif value["dataType"] == "integer":
res[key] = int(value["value"])
elif value["dataType"] == "boolean":
res[key] = bool(value["value"])
else:
res[key] = value["value"]
else:
if "cache_file" in value:
if "isArray" in value and value["isArray"] == "true":
res[key] = []
for i in range(len(value["value"])):
res[key].append({
"format": value["mimeType"][i] if "mimeType" in value else "text/plain",
"value": value["value"][i],
})
else:
res[key] = {
"format": value.get("mimeType", "text/plain"),
"value": value["value"]
}
else:
if "lowerCorner" in value and "upperCorner" in value:
res[key] = {
"format": "ogc-bbox",
"bbox": json.loads(value["value"]),
"crs": value["crs"].replace("http://www.opengis.net/def/crs/OGC/1.3/", "")
}
else:
res[key] = value["value"]
return res
|
Methods
Get the value of a specific input parameter.
Parameters:
key (str): The input parameter key
Returns:
Raises:
KeyError: If the input key doesn't exist
Example:
zoo_inputs = ZooInputs(inputs)
url = zoo_inputs.get_input_value("data_url")
get_processing_parameters()
Returns a dictionary of all input parameters with proper type conversion.
Handles:
- Numeric types (int, float, double)
- Booleans
- Files with MIME types
- Arrays
Returns:
dict: Dictionary of input parameters ready for CWL execution
Example:
zoo_inputs = ZooInputs(inputs)
params = zoo_inputs.get_processing_parameters()
# {'threshold': 0.5, 'input_file': {'class': 'File', 'path': '/tmp/data.tif', 'format': 'image/tiff'}}
ZooOutputs
Handler for ZOO-Project service outputs.
ZooOutputs
Source code in zoo_runner_common/zoo_conf.py
| class ZooOutputs:
def __init__(self, outputs):
self.outputs = outputs
# decuce the output key
output_keys = list(self.outputs.keys())
if len(output_keys) > 0:
self.output_key = output_keys[0]
else:
self.output_key = "stac"
if "stac" not in self.outputs.keys():
self.outputs["stac"] = {}
def get_output_parameters(self):
"""Returns a list with the output parameters keys"""
return {key: value["value"] for key, value in self.outputs.items()}
def set_output(self, value):
"""set the output result value"""
self.outputs[self.output_key]["value"] = value
|
get_output_parameters()
Returns a list with the output parameters keys
Source code in zoo_runner_common/zoo_conf.py
| def get_output_parameters(self):
"""Returns a list with the output parameters keys"""
return {key: value["value"] for key, value in self.outputs.items()}
|
set_output(value)
set the output result value
Source code in zoo_runner_common/zoo_conf.py
| def set_output(self, value):
"""set the output result value"""
self.outputs[self.output_key]["value"] = value
|
Methods
get_output_parameters()
Returns a dictionary of all output parameters.
Returns:
dict: Dictionary mapping output keys to their values
Example:
zoo_outputs = ZooOutputs(outputs)
output_params = zoo_outputs.get_output_parameters()
set_output(value)
Set the output result value for the primary output.
Parameters:
value: The output value to set
Example:
zoo_outputs = ZooOutputs(outputs)
zoo_outputs.set_output("/tmp/results/catalog.json")
CWL Workflow Classes
CWLWorkflow
Parser and utility class for CWL workflows.
CWLWorkflow
Source code in zoo_runner_common/zoo_conf.py
| class CWLWorkflow:
def __init__(self, cwl, workflow_id):
self.raw_cwl = cwl
self.workflow_id = workflow_id
# Load the entire CWL document and convert to v1.2
# Use load_cwl_from_yaml instead of load_document_by_yaml for proper version conversion
from cwl_loader import load_cwl_from_yaml
parsed_cwl = load_cwl_from_yaml(cwl, uri="io://", cwl_version='v1.2', sort=True)
# Ensure self.cwl is always a list containing all CWL elements
if not isinstance(parsed_cwl, list):
parsed_cwl = [parsed_cwl]
self.cwl = parsed_cwl
def get_version(self):
return self.raw_cwl.get("s:softwareVersion", "")
def get_label(self):
return self.get_workflow().label
def get_doc(self):
return self.get_workflow().doc
def get_workflow(self) -> cwl_utils.parser.cwl_v1_0.Workflow:
# returns a cwl_utils.parser.cwl_v1_0.Workflow)
ids = [elem.id.split("#")[-1] for elem in self.cwl]
return self.cwl[ids.index(self.workflow_id)]
def get_object_by_id(self, id):
ids = [elem.id.split("#")[-1] for elem in self.cwl]
# Remove leading '#' if present, and also remove 'io://' prefix if present
search_id = id.lstrip("#").replace("io://", "")
return self.cwl[ids.index(search_id)]
def get_workflow_inputs(self, mandatory=False):
inputs = []
for inp in self.get_workflow().inputs:
if mandatory:
# Use type_ instead of type (cwl-utils API change)
inp_type = getattr(inp, 'type_', getattr(inp, 'type', None))
if inp.default is not None or inp_type == ["null", "string"]:
continue
else:
inputs.append(inp.id.split("/")[-1])
else:
inputs.append(inp.id.split("/")[-1])
return inputs
@staticmethod
def has_scatter_requirement(workflow):
return any(
isinstance(
requirement,
(
cwl_utils.parser.cwl_v1_0.ScatterFeatureRequirement,
cwl_utils.parser.cwl_v1_1.ScatterFeatureRequirement,
cwl_utils.parser.cwl_v1_2.ScatterFeatureRequirement,
),
)
for requirement in workflow.requirements
)
@staticmethod
def get_resource_requirement(elem):
"""Gets the ResourceRequirement out of a CommandLineTool or Workflow
Args:
elem (CommandLineTool or Workflow): CommandLineTool or Workflow
Returns:
cwl_utils.parser.cwl_v1_2.ResourceRequirement or ResourceRequirement
"""
resource_requirement = []
# look for requirements
if elem.requirements is not None:
resource_requirement = [
requirement
for requirement in elem.requirements
if isinstance(
requirement,
(
cwl_utils.parser.cwl_v1_0.ResourceRequirement,
cwl_utils.parser.cwl_v1_1.ResourceRequirement,
cwl_utils.parser.cwl_v1_2.ResourceRequirement,
),
)
]
if len(resource_requirement) == 1:
return resource_requirement[0]
# look for hints
if elem.hints is not None:
resource_requirement = []
for hint in elem.hints:
# Handle both dict and object types
if isinstance(hint, dict):
if hint.get("class") == "ResourceRequirement":
resource_requirement.append(ResourceRequirement.from_dict(hint))
elif hasattr(hint, 'class_'):
if hint.class_ == "ResourceRequirement":
resource_requirement.append(hint)
if len(resource_requirement) == 1:
return resource_requirement[0]
def eval_resource(self):
resources = {
"coresMin": [],
"coresMax": [],
"ramMin": [],
"ramMax": [],
"tmpdirMin": [],
"tmpdirMax": [],
"outdirMin": [],
"outdirMax": [],
}
for elem in self.cwl:
if isinstance(
elem,
(
cwl_utils.parser.cwl_v1_0.Workflow,
cwl_utils.parser.cwl_v1_1.Workflow,
cwl_utils.parser.cwl_v1_2.Workflow,
),
):
if resource_requirement := self.get_resource_requirement(elem):
for resource_type in [
"coresMin",
"coresMax",
"ramMin",
"ramMax",
"tmpdirMin",
"tmpdirMax",
"outdirMin",
"outdirMax",
]:
if getattr(resource_requirement, resource_type):
resources[resource_type].append(
getattr(resource_requirement, resource_type)
)
for step in elem.steps:
if resource_requirement := self.get_resource_requirement(
self.get_object_by_id(step.run[1:])
):
multiplier = (
int(os.getenv("SCATTER_MULTIPLIER", 2))
if step.scatter
else 1
)
for resource_type in [
"coresMin",
"coresMax",
"ramMin",
"ramMax",
"tmpdirMin",
"tmpdirMax",
"outdirMin",
"outdirMax",
]:
if getattr(resource_requirement, resource_type):
resources[resource_type].append(
getattr(resource_requirement, resource_type)
* multiplier
)
return resources
|
get_resource_requirement(elem)
staticmethod
Gets the ResourceRequirement out of a CommandLineTool or Workflow
Parameters:
| Name |
Type |
Description |
Default |
elem
|
CommandLineTool or Workflow
|
CommandLineTool or Workflow
|
required
|
Returns:
| Type |
Description |
|
|
cwl_utils.parser.cwl_v1_2.ResourceRequirement or ResourceRequirement
|
Source code in zoo_runner_common/zoo_conf.py
| @staticmethod
def get_resource_requirement(elem):
"""Gets the ResourceRequirement out of a CommandLineTool or Workflow
Args:
elem (CommandLineTool or Workflow): CommandLineTool or Workflow
Returns:
cwl_utils.parser.cwl_v1_2.ResourceRequirement or ResourceRequirement
"""
resource_requirement = []
# look for requirements
if elem.requirements is not None:
resource_requirement = [
requirement
for requirement in elem.requirements
if isinstance(
requirement,
(
cwl_utils.parser.cwl_v1_0.ResourceRequirement,
cwl_utils.parser.cwl_v1_1.ResourceRequirement,
cwl_utils.parser.cwl_v1_2.ResourceRequirement,
),
)
]
if len(resource_requirement) == 1:
return resource_requirement[0]
# look for hints
if elem.hints is not None:
resource_requirement = []
for hint in elem.hints:
# Handle both dict and object types
if isinstance(hint, dict):
if hint.get("class") == "ResourceRequirement":
resource_requirement.append(ResourceRequirement.from_dict(hint))
elif hasattr(hint, 'class_'):
if hint.class_ == "ResourceRequirement":
resource_requirement.append(hint)
if len(resource_requirement) == 1:
return resource_requirement[0]
|
Methods
get_version()
Get the workflow version from CWL metadata.
Returns:
str: Software version from s:softwareVersion field
get_label()
Get the workflow label.
Returns:
get_doc()
Get the workflow documentation.
Returns:
str: Workflow documentation string
get_workflow()
Get the parsed CWL workflow object.
Returns:
cwl_utils.parser.cwl_v1_0.Workflow: The parsed workflow object
Get list of workflow input parameter names.
Parameters:
mandatory (bool): If True, only return mandatory inputs (no defaults)
Returns:
list: List of input parameter names
Example:
cwl_workflow = CWLWorkflow(cwl_dict, "main-workflow")
all_inputs = cwl_workflow.get_workflow_inputs()
# ['input_data', 'threshold', 'output_format']
required_inputs = cwl_workflow.get_workflow_inputs(mandatory=True)
# ['input_data']
eval_resource()
Evaluate and aggregate resource requirements from the workflow and all steps.
Considers:
- Workflow-level resource requirements
- Step-level resource requirements
- Scatter operations (multiplied by
SCATTER_MULTIPLIER env var, default: 2)
Returns:
dict: Dictionary with aggregated resource requirements:
coresMin, coresMax: CPU cores
ramMin, ramMax: RAM in MB
tmpdirMin, tmpdirMax: Temporary directory space in MB
outdirMin, outdirMax: Output directory space in MB
Example:
cwl_workflow = CWLWorkflow(cwl_dict, "main-workflow")
resources = cwl_workflow.eval_resource()
# {
# 'coresMin': [2, 4],
# 'ramMin': [2048, 4096],
# ...
# }
get_resource_requirement(elem)
Static method to extract ResourceRequirement from a CWL element.
Parameters:
elem: CWL CommandLineTool or Workflow object
Returns:
- ResourceRequirement object or None
ResourceRequirement
Data class for CWL resource requirements (used for hints).
ResourceRequirement
Source code in zoo_runner_common/zoo_conf.py
| @attr.s
class ResourceRequirement:
coresMin = attr.ib(default=None)
coresMax = attr.ib(default=None)
ramMin = attr.ib(default=None)
ramMax = attr.ib(default=None)
tmpdirMin = attr.ib(default=None)
tmpdirMax = attr.ib(default=None)
outdirMin = attr.ib(default=None)
outdirMax = attr.ib(default=None)
@classmethod
def from_dict(cls, env):
return cls(
**{k: v for k, v in env.items() if k in inspect.signature(cls).parameters}
)
|
Attributes:
coresMin (int): Minimum CPU cores
coresMax (int): Maximum CPU cores
ramMin (int): Minimum RAM in MB
ramMax (int): Maximum RAM in MB
tmpdirMin (int): Minimum temporary directory space in MB
tmpdirMax (int): Maximum temporary directory space in MB
outdirMin (int): Minimum output directory space in MB
outdirMax (int): Maximum output directory space in MB
Usage Examples
Basic Configuration Handling
from zoo_conf import ZooConf, ZooInputs, ZooOutputs
def my_service(conf, inputs, outputs):
"""ZOO Service function"""
# Parse configuration
zoo_conf = ZooConf(conf)
workflow_id = zoo_conf.workflow_id
# Handle inputs
zoo_inputs = ZooInputs(inputs)
params = zoo_inputs.get_processing_parameters()
# Handle outputs
zoo_outputs = ZooOutputs(outputs)
zoo_outputs.set_output("/path/to/result")
return 3 # SERVICE_SUCCEEDED
CWL Workflow Analysis
from zoo_conf import CWLWorkflow
# Load CWL workflow
cwl_dict = {
"cwlVersion": "v1.0",
"$graph": [...],
"s:softwareVersion": "1.0.0"
}
workflow = CWLWorkflow(cwl_dict, "main-workflow")
# Get metadata
print(f"Version: {workflow.get_version()}")
print(f"Label: {workflow.get_label()}")
# Get inputs
mandatory_inputs = workflow.get_workflow_inputs(mandatory=True)
print(f"Required inputs: {mandatory_inputs}")
# Evaluate resources
resources = workflow.eval_resource()
total_cores = sum(resources['coresMin'])
total_ram = sum(resources['ramMin'])
print(f"Requires {total_cores} cores and {total_ram} MB RAM")
Type Conversion Example
# ZOO inputs with various types
inputs = {
"threshold": {
"dataType": "float",
"value": "0.75"
},
"iterations": {
"dataType": "integer",
"value": "10"
},
"input_file": {
"cache_file": "/tmp/data.tif",
"mimeType": "image/tiff"
},
"bands": {
"dataType": "string",
"maxOccurs": "3",
"value": ["B02", "B03", "B04"]
}
}
zoo_inputs = ZooInputs(inputs)
params = zoo_inputs.get_processing_parameters()
# Automatic type conversion:
# {
# 'threshold': 0.75, # float
# 'iterations': 10, # int
# 'input_file': { # File object
# 'class': 'File',
# 'path': '/tmp/data.tif',
# 'format': 'image/tiff'
# },
# 'bands': ['B02', 'B03', 'B04'] # array
# }
See Also