Skip to content

ZooConf API Reference

The zoo_conf module provides utilities for working with CWL workflows and ZOO-Project configuration.

Classes

ZooConf

Main configuration handler for ZOO-Project services.

ZooConf

Source code in zoo_runner_common/zoo_conf.py
class ZooConf:
    def __init__(self, conf):
        self.conf = conf
        self.workflow_id = self.conf["lenv"]["Identifier"]

ZooInputs

Handler for ZOO-Project service inputs with type conversion and validation.

ZooInputs

Source code in zoo_runner_common/zoo_conf.py
class ZooInputs:
    def __init__(self, inputs):
        # this conversion is necessary
        # because zoo converts array of length 1 to a string
        for inp in inputs:
            if (
                "maxOccurs" in inputs[inp].keys()
                and int(inputs[inp]["maxOccurs"]) > 1
                and not isinstance(inputs[inp]["value"], list)
            ):
                inputs[inp]["value"] = [inputs[inp]["value"]]

        self.inputs = inputs

    def get_input_value(self, key):
        try:
            return self.inputs[key]["value"]
        except KeyError as exc:
            raise exc
        except TypeError:
            pass

    def get_processing_parameters(self, workflow=None):
        """Returns a list with the input parameters keys

        Args:
            workflow: Optional CWL workflow object (currently unused, for future compatibility)
        """
        import json

        res = {}
        allowed_types = ["integer", "float", "boolean", "double"]

        for key, value in self.inputs.items():
            if "format" in value and not("dataType" in value and value["dataType"] in allowed_types):
                res[key] = {
                    "format": value["format"],
                    "value": value["value"],
                }
            elif "dataType" in value:
                if isinstance(value["dataType"], list):
                    if value["dataType"][0] in allowed_types:
                        if value["dataType"][0] in ["double", "float"]:
                            res[key] = [float(item) for item in value["value"]]
                        elif value["dataType"][0] == "integer":
                            res[key] = [int(item) for item in value["value"]]
                        elif value["dataType"][0] == "boolean":
                            res[key] = [bool(item) for item in value["value"]]
                    else:
                        res[key] = value["value"]
                else:
                    if value["value"] == "NULL":
                        res[key] = None
                    else:
                        if value["dataType"] in ["double", "float"]:
                            res[key] = float(value["value"])
                        elif value["dataType"] == "integer":
                            res[key] = int(value["value"])
                        elif value["dataType"] == "boolean":
                            res[key] = bool(value["value"])
                        else:
                            res[key] = value["value"]
            else:
                if "cache_file" in value:
                    if "isArray" in value and value["isArray"] == "true":
                        res[key] = []
                        for i in range(len(value["value"])):
                            res[key].append({
                                "format": value["mimeType"][i] if "mimeType" in value else "text/plain",
                                "value": value["value"][i],
                            })
                    else:
                        res[key] = {
                            "format": value.get("mimeType", "text/plain"),
                            "value": value["value"]
                        }
                else:
                    if "lowerCorner" in value and "upperCorner" in value:
                        res[key] = {
                            "format": "ogc-bbox",
                            "bbox": json.loads(value["value"]),
                            "crs": value["crs"].replace("http://www.opengis.net/def/crs/OGC/1.3/", "")
                        }
                    else:
                        res[key] = value["value"]
        return res
get_processing_parameters(workflow=None)

Returns a list with the input parameters keys

Parameters:

Name Type Description Default
workflow

Optional CWL workflow object (currently unused, for future compatibility)

None
Source code in zoo_runner_common/zoo_conf.py
def get_processing_parameters(self, workflow=None):
    """Returns a list with the input parameters keys

    Args:
        workflow: Optional CWL workflow object (currently unused, for future compatibility)
    """
    import json

    res = {}
    allowed_types = ["integer", "float", "boolean", "double"]

    for key, value in self.inputs.items():
        if "format" in value and not("dataType" in value and value["dataType"] in allowed_types):
            res[key] = {
                "format": value["format"],
                "value": value["value"],
            }
        elif "dataType" in value:
            if isinstance(value["dataType"], list):
                if value["dataType"][0] in allowed_types:
                    if value["dataType"][0] in ["double", "float"]:
                        res[key] = [float(item) for item in value["value"]]
                    elif value["dataType"][0] == "integer":
                        res[key] = [int(item) for item in value["value"]]
                    elif value["dataType"][0] == "boolean":
                        res[key] = [bool(item) for item in value["value"]]
                else:
                    res[key] = value["value"]
            else:
                if value["value"] == "NULL":
                    res[key] = None
                else:
                    if value["dataType"] in ["double", "float"]:
                        res[key] = float(value["value"])
                    elif value["dataType"] == "integer":
                        res[key] = int(value["value"])
                    elif value["dataType"] == "boolean":
                        res[key] = bool(value["value"])
                    else:
                        res[key] = value["value"]
        else:
            if "cache_file" in value:
                if "isArray" in value and value["isArray"] == "true":
                    res[key] = []
                    for i in range(len(value["value"])):
                        res[key].append({
                            "format": value["mimeType"][i] if "mimeType" in value else "text/plain",
                            "value": value["value"][i],
                        })
                else:
                    res[key] = {
                        "format": value.get("mimeType", "text/plain"),
                        "value": value["value"]
                    }
            else:
                if "lowerCorner" in value and "upperCorner" in value:
                    res[key] = {
                        "format": "ogc-bbox",
                        "bbox": json.loads(value["value"]),
                        "crs": value["crs"].replace("http://www.opengis.net/def/crs/OGC/1.3/", "")
                    }
                else:
                    res[key] = value["value"]
    return res

Methods

get_input_value(key)

Get the value of a specific input parameter.

Parameters:

  • key (str): The input parameter key

Returns:

  • The input value

Raises:

  • KeyError: If the input key doesn't exist

Example:

zoo_inputs = ZooInputs(inputs)
url = zoo_inputs.get_input_value("data_url")
get_processing_parameters()

Returns a dictionary of all input parameters with proper type conversion.

Handles:

  • Numeric types (int, float, double)
  • Booleans
  • Files with MIME types
  • Arrays

Returns:

  • dict: Dictionary of input parameters ready for CWL execution

Example:

zoo_inputs = ZooInputs(inputs)
params = zoo_inputs.get_processing_parameters()
# {'threshold': 0.5, 'input_file': {'class': 'File', 'path': '/tmp/data.tif', 'format': 'image/tiff'}}

ZooOutputs

Handler for ZOO-Project service outputs.

ZooOutputs

Source code in zoo_runner_common/zoo_conf.py
class ZooOutputs:
    def __init__(self, outputs):
        self.outputs = outputs
        # decuce the output key
        output_keys = list(self.outputs.keys())
        if len(output_keys) > 0:
            self.output_key = output_keys[0]
        else:
            self.output_key = "stac"
            if "stac" not in self.outputs.keys():
                self.outputs["stac"] = {}

    def get_output_parameters(self):
        """Returns a list with the output parameters keys"""
        return {key: value["value"] for key, value in self.outputs.items()}

    def set_output(self, value):
        """set the output result value"""
        self.outputs[self.output_key]["value"] = value
get_output_parameters()

Returns a list with the output parameters keys

Source code in zoo_runner_common/zoo_conf.py
def get_output_parameters(self):
    """Returns a list with the output parameters keys"""
    return {key: value["value"] for key, value in self.outputs.items()}
set_output(value)

set the output result value

Source code in zoo_runner_common/zoo_conf.py
def set_output(self, value):
    """set the output result value"""
    self.outputs[self.output_key]["value"] = value

Methods

get_output_parameters()

Returns a dictionary of all output parameters.

Returns:

  • dict: Dictionary mapping output keys to their values

Example:

zoo_outputs = ZooOutputs(outputs)
output_params = zoo_outputs.get_output_parameters()
set_output(value)

Set the output result value for the primary output.

Parameters:

  • value: The output value to set

Example:

zoo_outputs = ZooOutputs(outputs)
zoo_outputs.set_output("/tmp/results/catalog.json")

CWL Workflow Classes

CWLWorkflow

Parser and utility class for CWL workflows.

CWLWorkflow

Source code in zoo_runner_common/zoo_conf.py
class CWLWorkflow:
    def __init__(self, cwl, workflow_id):
        self.raw_cwl = cwl
        self.workflow_id = workflow_id

        # Load the entire CWL document and convert to v1.2
        # Use load_cwl_from_yaml instead of load_document_by_yaml for proper version conversion
        from cwl_loader import load_cwl_from_yaml

        parsed_cwl = load_cwl_from_yaml(cwl, uri="io://", cwl_version='v1.2', sort=True)

        # Ensure self.cwl is always a list containing all CWL elements
        if not isinstance(parsed_cwl, list):
            parsed_cwl = [parsed_cwl]

        self.cwl = parsed_cwl

    def get_version(self):

        return self.raw_cwl.get("s:softwareVersion", "")

    def get_label(self):

        return self.get_workflow().label

    def get_doc(self):

        return self.get_workflow().doc

    def get_workflow(self) -> cwl_utils.parser.cwl_v1_0.Workflow:
        # returns a cwl_utils.parser.cwl_v1_0.Workflow)
        ids = [elem.id.split("#")[-1] for elem in self.cwl]

        return self.cwl[ids.index(self.workflow_id)]

    def get_object_by_id(self, id):
        ids = [elem.id.split("#")[-1] for elem in self.cwl]
        # Remove leading '#' if present, and also remove 'io://' prefix if present
        search_id = id.lstrip("#").replace("io://", "")
        return self.cwl[ids.index(search_id)]

    def get_workflow_inputs(self, mandatory=False):
        inputs = []
        for inp in self.get_workflow().inputs:
            if mandatory:
                # Use type_ instead of type (cwl-utils API change)
                inp_type = getattr(inp, 'type_', getattr(inp, 'type', None))
                if inp.default is not None or inp_type == ["null", "string"]:
                    continue
                else:
                    inputs.append(inp.id.split("/")[-1])
            else:
                inputs.append(inp.id.split("/")[-1])
        return inputs

    @staticmethod
    def has_scatter_requirement(workflow):
        return any(
            isinstance(
                requirement,
                (
                    cwl_utils.parser.cwl_v1_0.ScatterFeatureRequirement,
                    cwl_utils.parser.cwl_v1_1.ScatterFeatureRequirement,
                    cwl_utils.parser.cwl_v1_2.ScatterFeatureRequirement,
                ),
            )
            for requirement in workflow.requirements
        )

    @staticmethod
    def get_resource_requirement(elem):
        """Gets the ResourceRequirement out of a CommandLineTool or Workflow

        Args:
            elem (CommandLineTool or Workflow): CommandLineTool or Workflow

        Returns:
            cwl_utils.parser.cwl_v1_2.ResourceRequirement or ResourceRequirement
        """
        resource_requirement = []

        # look for requirements
        if elem.requirements is not None:
            resource_requirement = [
                requirement
                for requirement in elem.requirements
                if isinstance(
                    requirement,
                    (
                        cwl_utils.parser.cwl_v1_0.ResourceRequirement,
                        cwl_utils.parser.cwl_v1_1.ResourceRequirement,
                        cwl_utils.parser.cwl_v1_2.ResourceRequirement,
                    ),
                )
            ]

            if len(resource_requirement) == 1:
                return resource_requirement[0]

        # look for hints
        if elem.hints is not None:
            resource_requirement = []
            for hint in elem.hints:
                # Handle both dict and object types
                if isinstance(hint, dict):
                    if hint.get("class") == "ResourceRequirement":
                        resource_requirement.append(ResourceRequirement.from_dict(hint))
                elif hasattr(hint, 'class_'):
                    if hint.class_ == "ResourceRequirement":
                        resource_requirement.append(hint)

            if len(resource_requirement) == 1:
                return resource_requirement[0]

    def eval_resource(self):
        resources = {
            "coresMin": [],
            "coresMax": [],
            "ramMin": [],
            "ramMax": [],
            "tmpdirMin": [],
            "tmpdirMax": [],
            "outdirMin": [],
            "outdirMax": [],
        }

        for elem in self.cwl:
            if isinstance(
                elem,
                (
                    cwl_utils.parser.cwl_v1_0.Workflow,
                    cwl_utils.parser.cwl_v1_1.Workflow,
                    cwl_utils.parser.cwl_v1_2.Workflow,
                ),
            ):
                if resource_requirement := self.get_resource_requirement(elem):
                    for resource_type in [
                        "coresMin",
                        "coresMax",
                        "ramMin",
                        "ramMax",
                        "tmpdirMin",
                        "tmpdirMax",
                        "outdirMin",
                        "outdirMax",
                    ]:
                        if getattr(resource_requirement, resource_type):
                            resources[resource_type].append(
                                getattr(resource_requirement, resource_type)
                            )
                for step in elem.steps:
                    if resource_requirement := self.get_resource_requirement(
                        self.get_object_by_id(step.run[1:])
                    ):
                        multiplier = (
                            int(os.getenv("SCATTER_MULTIPLIER", 2))
                            if step.scatter
                            else 1
                        )
                        for resource_type in [
                            "coresMin",
                            "coresMax",
                            "ramMin",
                            "ramMax",
                            "tmpdirMin",
                            "tmpdirMax",
                            "outdirMin",
                            "outdirMax",
                        ]:
                            if getattr(resource_requirement, resource_type):
                                resources[resource_type].append(
                                    getattr(resource_requirement, resource_type)
                                    * multiplier
                                )
        return resources
get_resource_requirement(elem) staticmethod

Gets the ResourceRequirement out of a CommandLineTool or Workflow

Parameters:

Name Type Description Default
elem CommandLineTool or Workflow

CommandLineTool or Workflow

required

Returns:

Type Description

cwl_utils.parser.cwl_v1_2.ResourceRequirement or ResourceRequirement

Source code in zoo_runner_common/zoo_conf.py
@staticmethod
def get_resource_requirement(elem):
    """Gets the ResourceRequirement out of a CommandLineTool or Workflow

    Args:
        elem (CommandLineTool or Workflow): CommandLineTool or Workflow

    Returns:
        cwl_utils.parser.cwl_v1_2.ResourceRequirement or ResourceRequirement
    """
    resource_requirement = []

    # look for requirements
    if elem.requirements is not None:
        resource_requirement = [
            requirement
            for requirement in elem.requirements
            if isinstance(
                requirement,
                (
                    cwl_utils.parser.cwl_v1_0.ResourceRequirement,
                    cwl_utils.parser.cwl_v1_1.ResourceRequirement,
                    cwl_utils.parser.cwl_v1_2.ResourceRequirement,
                ),
            )
        ]

        if len(resource_requirement) == 1:
            return resource_requirement[0]

    # look for hints
    if elem.hints is not None:
        resource_requirement = []
        for hint in elem.hints:
            # Handle both dict and object types
            if isinstance(hint, dict):
                if hint.get("class") == "ResourceRequirement":
                    resource_requirement.append(ResourceRequirement.from_dict(hint))
            elif hasattr(hint, 'class_'):
                if hint.class_ == "ResourceRequirement":
                    resource_requirement.append(hint)

        if len(resource_requirement) == 1:
            return resource_requirement[0]

Methods

get_version()

Get the workflow version from CWL metadata.

Returns:

  • str: Software version from s:softwareVersion field
get_label()

Get the workflow label.

Returns:

  • str: Workflow label
get_doc()

Get the workflow documentation.

Returns:

  • str: Workflow documentation string
get_workflow()

Get the parsed CWL workflow object.

Returns:

  • cwl_utils.parser.cwl_v1_0.Workflow: The parsed workflow object
get_workflow_inputs(mandatory=False)

Get list of workflow input parameter names.

Parameters:

  • mandatory (bool): If True, only return mandatory inputs (no defaults)

Returns:

  • list: List of input parameter names

Example:

cwl_workflow = CWLWorkflow(cwl_dict, "main-workflow")
all_inputs = cwl_workflow.get_workflow_inputs()
# ['input_data', 'threshold', 'output_format']

required_inputs = cwl_workflow.get_workflow_inputs(mandatory=True)
# ['input_data']
eval_resource()

Evaluate and aggregate resource requirements from the workflow and all steps.

Considers:

  • Workflow-level resource requirements
  • Step-level resource requirements
  • Scatter operations (multiplied by SCATTER_MULTIPLIER env var, default: 2)

Returns:

  • dict: Dictionary with aggregated resource requirements:
    • coresMin, coresMax: CPU cores
    • ramMin, ramMax: RAM in MB
    • tmpdirMin, tmpdirMax: Temporary directory space in MB
    • outdirMin, outdirMax: Output directory space in MB

Example:

cwl_workflow = CWLWorkflow(cwl_dict, "main-workflow")
resources = cwl_workflow.eval_resource()
# {
#     'coresMin': [2, 4],
#     'ramMin': [2048, 4096],
#     ...
# }
get_resource_requirement(elem)

Static method to extract ResourceRequirement from a CWL element.

Parameters:

  • elem: CWL CommandLineTool or Workflow object

Returns:

  • ResourceRequirement object or None

ResourceRequirement

Data class for CWL resource requirements (used for hints).

ResourceRequirement

Source code in zoo_runner_common/zoo_conf.py
@attr.s
class ResourceRequirement:
    coresMin = attr.ib(default=None)
    coresMax = attr.ib(default=None)
    ramMin = attr.ib(default=None)
    ramMax = attr.ib(default=None)
    tmpdirMin = attr.ib(default=None)
    tmpdirMax = attr.ib(default=None)
    outdirMin = attr.ib(default=None)
    outdirMax = attr.ib(default=None)

    @classmethod
    def from_dict(cls, env):
        return cls(
            **{k: v for k, v in env.items() if k in inspect.signature(cls).parameters}
        )

Attributes:

  • coresMin (int): Minimum CPU cores
  • coresMax (int): Maximum CPU cores
  • ramMin (int): Minimum RAM in MB
  • ramMax (int): Maximum RAM in MB
  • tmpdirMin (int): Minimum temporary directory space in MB
  • tmpdirMax (int): Maximum temporary directory space in MB
  • outdirMin (int): Minimum output directory space in MB
  • outdirMax (int): Maximum output directory space in MB

Usage Examples

Basic Configuration Handling

from zoo_conf import ZooConf, ZooInputs, ZooOutputs

def my_service(conf, inputs, outputs):
    """ZOO Service function"""
    # Parse configuration
    zoo_conf = ZooConf(conf)
    workflow_id = zoo_conf.workflow_id

    # Handle inputs
    zoo_inputs = ZooInputs(inputs)
    params = zoo_inputs.get_processing_parameters()

    # Handle outputs
    zoo_outputs = ZooOutputs(outputs)
    zoo_outputs.set_output("/path/to/result")

    return 3  # SERVICE_SUCCEEDED

CWL Workflow Analysis

from zoo_conf import CWLWorkflow

# Load CWL workflow
cwl_dict = {
    "cwlVersion": "v1.0",
    "$graph": [...],
    "s:softwareVersion": "1.0.0"
}

workflow = CWLWorkflow(cwl_dict, "main-workflow")

# Get metadata
print(f"Version: {workflow.get_version()}")
print(f"Label: {workflow.get_label()}")

# Get inputs
mandatory_inputs = workflow.get_workflow_inputs(mandatory=True)
print(f"Required inputs: {mandatory_inputs}")

# Evaluate resources
resources = workflow.eval_resource()
total_cores = sum(resources['coresMin'])
total_ram = sum(resources['ramMin'])
print(f"Requires {total_cores} cores and {total_ram} MB RAM")

Type Conversion Example

# ZOO inputs with various types
inputs = {
    "threshold": {
        "dataType": "float",
        "value": "0.75"
    },
    "iterations": {
        "dataType": "integer",
        "value": "10"
    },
    "input_file": {
        "cache_file": "/tmp/data.tif",
        "mimeType": "image/tiff"
    },
    "bands": {
        "dataType": "string",
        "maxOccurs": "3",
        "value": ["B02", "B03", "B04"]
    }
}

zoo_inputs = ZooInputs(inputs)
params = zoo_inputs.get_processing_parameters()

# Automatic type conversion:
# {
#     'threshold': 0.75,           # float
#     'iterations': 10,             # int
#     'input_file': {               # File object
#         'class': 'File',
#         'path': '/tmp/data.tif',
#         'format': 'image/tiff'
#     },
#     'bands': ['B02', 'B03', 'B04']  # array
# }

See Also