from rdflib import Graph, Namespace, URIRef, RDF, OWL
from typing import Dict, Set, List, Tuple, Union
from collections import defaultdict
from FAIRLinked.QBWorkflow.utility import NAMESPACE_MAP, LIGHT_COLORS, CATEGORY_COLORS, ONTO_CORE_CATEGORIES
###############################################################################
# Global Namespaces
###############################################################################
MDS = Namespace(NAMESPACE_MAP['mds'])
RDFS = Namespace(NAMESPACE_MAP['rdfs'])
SKOS = Namespace(NAMESPACE_MAP['skos'])
###############################################################################
# Helper Functions
###############################################################################
[docs]
def update_category_colors(categories: Set[str]) -> None:
"""
Updates the global CATEGORY_COLORS dictionary with color assignments for each category.
The function ensures that each category (in prefixed form e.g. 'mds:tool') is assigned
a unique color from the LIGHT_COLORS palette, cycling through colors if needed.
High-level logic:
1. Clear existing color assignments
2. Convert full URIs to prefixed form (e.g. 'http://...#tool' -> 'mds:tool')
3. Assign colors from palette to each prefixed category
4. Update ONTO_CORE_CATEGORIES with final set of categories
Args:
categories (Set[str]): Set of category URIs to assign colors to
(e.g. {'http://...#tool', 'http://...#recipe'})
Global Effects:
- Updates CATEGORY_COLORS with mappings like {'mds:tool': 'FFE6E6', 'mds:recipe': 'E6FFE6'}
- Updates ONTO_CORE_CATEGORIES with prefixed category names
"""
global CATEGORY_COLORS, ONTO_CORE_CATEGORIES
# Clear existing mappings
CATEGORY_COLORS.clear()
colors = list(LIGHT_COLORS.values())
# Convert categories to prefixed form (e.g. 'mds:tool')
prefixed_categories = {get_prefixed_name(category) for category in categories}
# Assign colors cycling through palette
for i, category in enumerate(prefixed_categories):
color_idx = i % len(colors)
CATEGORY_COLORS[category] = colors[color_idx]
# Update core categories set
ONTO_CORE_CATEGORIES.clear()
ONTO_CORE_CATEGORIES.update(prefixed_categories)
[docs]
def get_prefixed_name(uri: Union[str, URIRef]) -> str:
"""
Converts a full URI to its corresponding prefixed form using the global namespace mappings.
Algorithm:
1. Convert the URIRef to a string if needed.
2. Iterate over NAMESPACE_MAP to find a prefix whose namespace is a prefix of the given URI.
3. If found, return prefix:LocalName. Otherwise, return the original URI string.
Args:
uri (Union[str, URIRef]): The URI to convert.
Returns:
str: Prefixed form of the URI (e.g. 'mds:SampleSize').
"""
for prefix, ns in NAMESPACE_MAP.items():
if str(uri).startswith(ns):
return f"{prefix}:{str(uri)[len(ns):]}"
return str(uri)
###############################################################################
# Core Ontology Analysis Functions
###############################################################################
[docs]
def find_leaf_nodes(lowest_level_ontology_path: str) -> Set[URIRef]:
"""
Identifies leaf nodes in the lowest-level ontology. Leaf nodes are classes that do not serve
as a superclass of any other class within the MDS namespace.
Algorithm:
1. Parse the lowest-level ontology and build an RDF graph.
2. Gather all classes (subjects of RDFS.subClassOf) and their superclasses.
3. Classes that never appear as an RDFS.subClassOf object are considered leaf nodes.
Time Complexity:
O(N + E) where N is the number of classes and E the number of subclass relations.
Space Complexity:
O(N) for storing classes and relationships.
Args:
lowest_level_ontology_path (str): Path to the low-level ontology (.ttl file).
Returns:
Set[URIRef]: A set of URIs representing leaf classes.
"""
graph = Graph()
graph.parse(lowest_level_ontology_path, format='ttl')
# Collect MDS superclasses
superclasses = set(
obj for obj in graph.objects(None, RDFS.subClassOf) if str(obj).startswith(str(MDS))
)
# Collect MDS classes (appear as subjects of RDFS.subClassOf)
all_classes = set(
subj for subj in graph.subjects(RDFS.subClassOf, None) if str(subj).startswith(str(MDS))
)
all_classes.update(superclasses)
# Leaf nodes are classes not appearing as superclass of any other class
leaf_nodes = all_classes - superclasses
return leaf_nodes
[docs]
def get_top_level_terms_from_combined(combined_ontology_path: str) -> Set[str]:
"""
Derives top-level categories directly from the combined ontology. This removes the need for a separate
top-level ontology file.
A top-level category is defined as a class that appears as a 'broader' concept (object of SKOS.broader)
but does not appear as a narrower concept for any other class within the MDS namespace.
If no such classes are found, we consider classes with no broader relations as top-level.
Algorithm:
1. Parse the combined ontology.
2. For all triples (narrower SKOS.broader broader), record narrower and broader classes.
3. Top-level categories are those that appear as broader but never as narrower.
4. If none found this way, fallback to classes that never appear as narrower at all.
Time Complexity:
O(N + E) where N is number of classes and E is number of SKOS.broader relationships.
Space Complexity:
O(N) for storing class sets and relationships.
Args:
combined_ontology_path (str): Path to the combined MDS ontology (.ttl file).
Returns:
Set[str]: A set of URIs for top-level category classes.
"""
graph = Graph()
graph.parse(combined_ontology_path, format='ttl')
narrower_classes = set()
broader_classes = set()
# Collect SKOS.broader relationships
for subj in graph.subjects(SKOS.broader, None):
if str(subj).startswith(str(MDS)):
for obj in graph.objects(subj, SKOS.broader):
if str(obj).startswith(str(MDS)):
narrower_classes.add(str(subj))
broader_classes.add(str(obj))
# Top-level: appear as broader but not as narrower
top_level_categories = {c for c in broader_classes if c not in narrower_classes}
if not top_level_categories:
# Fallback: Consider classes that never appear as narrower at all
all_mds_classes = {str(c) for c in graph.subjects(RDF.type, OWL.Class) if str(c).startswith(str(MDS))}
# potential tops are those not in narrower_classes
potential_tops = all_mds_classes - narrower_classes
top_level_categories = potential_tops
return top_level_categories
[docs]
def classify_leaf_nodes(combined_ontology_path: str,
leaf_nodes: Set[URIRef],
top_level_terms: Set[str]) -> Tuple[Dict[str, List[str]], List[str]]:
"""
Classifies each leaf node into a top-level category by traversing upward along `rdfs:subClassOf` and
`skos:broader` relationships until a known top-level category is found.
Algorithm:
1. Parse the combined ontology into a graph.
2. For each leaf node, recursively follow `rdfs:subClassOf` and `skos:broader` upwards.
3. If a top-level category is reached, classify the leaf under that category.
4. If no top-level category is found, mark the leaf as missing.
This uses memoization to avoid repeated traversals of the same class.
Time Complexity:
O(N + E) with memoization, where N is number of nodes and E is number of edges.
Space Complexity:
O(N) for memoization and classification structures.
Args:
combined_ontology_path (str): Path to the combined ontology (.ttl file).
leaf_nodes (Set[URIRef]): Set of leaf node URIs identified from the low-level ontology.
top_level_terms (Set[str]): Set of URIs representing top-level categories.
Returns:
Tuple[Dict[str, List[str]], List[str]]:
- Dictionary mapping top-level category URIs to a list of leaf node URIs.
- List of URIs for leaf nodes that couldn't be mapped.
"""
graph = Graph()
graph.parse(combined_ontology_path, format='ttl')
classification = {}
memoization = {}
missing_top_terms = []
def trace_to_top(term_uri, visited=None):
if visited is None:
visited = set()
term_str = str(term_uri)
# Check if current term is already known top-level
if term_str in top_level_terms:
memoization[term_uri] = term_uri
return term_uri
# Avoid cycles
if term_uri in visited:
return None
visited.add(term_uri)
# Traverse via rdfs:subClassOf
for superclass in graph.objects(term_uri, RDFS.subClassOf):
top_term = trace_to_top(superclass, visited)
if top_term:
memoization[term_uri] = top_term
return top_term
# Traverse via skos:broader
for broader_term in graph.objects(term_uri, SKOS.broader):
top_term = trace_to_top(broader_term, visited)
if top_term:
memoization[term_uri] = top_term
return top_term
memoization[term_uri] = None
return None
# Classify each leaf node
for leaf in leaf_nodes:
top_term = trace_to_top(leaf)
if top_term:
top_term_str = str(top_term)
leaf_str = str(leaf)
classification.setdefault(top_term_str, []).append(leaf_str)
else:
missing_top_terms.append(str(leaf))
return classification, missing_top_terms
[docs]
def get_classification(lowest_level_ontology_path: str,
combined_ontology_path: str) -> Tuple[Dict[str, List[str]], List[str]]:
"""
High-level function that coordinates:
1. Finding leaf nodes from the low-level ontology.
2. Identifying top-level categories directly from the combined ontology.
3. Classifying leaf nodes under these top-level categories.
4. Updating category colors and converting URIs to prefixed forms.
Args:
lowest_level_ontology_path (str): Path to the lowest-level MDS ontology (.ttl file).
combined_ontology_path (str): Path to the combined MDS ontology (.ttl file).
Returns:
Tuple[Dict[str, List[str]], List[str]]:
- classification_prefixed: A dictionary with prefixed category URIs as keys and lists of prefixed leaf nodes as values.
- missing_top_terms_prefixed: A list of prefixed URIs for terms that couldn't be mapped.
"""
# Step 1: Identify leaf nodes
leaf_nodes = find_leaf_nodes(lowest_level_ontology_path)
# Step 2: Identify top-level categories from combined ontology
top_level_terms = get_top_level_terms_from_combined(combined_ontology_path)
# Step 3: Classify leaf nodes
classification, missing_top_terms = classify_leaf_nodes(combined_ontology_path, leaf_nodes, top_level_terms)
# Step 4: Update category colors based on discovered categories
update_category_colors(set(classification.keys()))
# Convert URIs to prefixed names and strip prefixes from terms
classification_prefixed = {}
for category_uri, leaf_uris in classification.items():
category_prefixed = get_prefixed_name(category_uri)
leaf_terms = [get_prefixed_name(uri).split(':')[1] for uri in leaf_uris]
classification_prefixed[category_prefixed] = leaf_terms
# Ensure category color exists
if category_prefixed not in CATEGORY_COLORS:
print(f"Warning: No color assigned for category {category_prefixed}")
missing_top_terms_prefixed = [get_prefixed_name(uri) for uri in missing_top_terms]
# print(classification_prefixed)
return classification_prefixed, missing_top_terms_prefixed