import json
import inspect
import pandas as pd
import numpy as np
from functools import wraps
from datetime import datetime
from uuid import uuid4
from typing import Optional, cast
import psutil
import os
from .main import MatDatSciDf
import sys
from rdflib import Graph, Namespace
from ...InterfaceMDS.load_mds_ontology import load_mds_ontology_graph
from .metadata_manager import Metadata
import warnings
import requests
from ... import __version__
from .utility import normalize_iri,load_licenses
from IPython.core.getipython import get_ipython
import types
##### ANALYSIS TRACKER ######
[docs]
class AnalysisTracker:
"""
A system for auditing scientific analysis, capturing data provenance,
and generating semantic JSON-LD metadata.
"""
mds_graph = load_mds_ontology_graph()
def __init__(self,
proj_name: str,
home_path: str,
orcid: Optional[str] = "0000-0000-0000-0000",
metadata_template: Optional[dict] = None,
base_uri: Optional[str] = "https://cwrusdle.bitbucket.io/mds/",
ontology_graph: Optional[Graph] = None,
prefix: Optional[str] = "mds",
file_events: Optional[bool] = False) -> None:
"""
Initializes the tracker with project metadata and researcher identity.
Args:
proj_name: Human-readable name of the research project.
home_path: Root directory for storing all analysis artifacts.
orcid: Researcher's ORCID iD for provenance attribution.
Attempts to verify via Public API.
metadata_template: Metadata information about analysis parameters.
base_uri: The base URI for semantic namespace generation.
ontology_graph: A custom RDFLib Graph. Defaults to MDS ontology.
prefix: The prefix used for the base_uri in JSON-LD.
file_events: Option to save file events. Default to False.
"""
self.home_path = home_path
self.file_events_store = file_events
if orcid == "0000-0000-0000-0000" or orcid is None:
self.orcid = "0000-0000-0000-0000"
self.orcid_verified = False
print("⚠️ Using Placeholder ORCID. This is not recommended for data publication.")
else:
try:
clean_orcid = orcid.split("/")[-1].strip()
response = requests.get(f"https://pub.orcid.org/v3.0/{clean_orcid}",
headers={'Accept': 'application/json'},
timeout=5)
if response.status_code == 200:
self.orcid = clean_orcid
self.orcid_verified = True
else:
# Instead of crashing, we warn and mark as unverified
warnings.warn(f"❌ ORCID '{orcid}' not found. Analysis will be marked as UNVERIFIED.")
self.orcid = clean_orcid
self.orcid_verified = False
except requests.exceptions.RequestException:
warnings.warn("🌐 Connection Error: Could not verify ORCID. Tagging as UNVERIFIED.")
self.orcid = orcid
self.orcid_verified = False
self.base_uri = base_uri
self.prefix = prefix
self.analysis_id = f"run{str(uuid4().int)[-15:].zfill(15)}"
self.sources = []
self.proj_name = proj_name
self.file_events = []
self.imports =[]
self.activity_log = []
self.ontology = ontology_graph
if ontology_graph is None:
if AnalysisTracker.mds_graph is None:
print("""
MDS-Onto from source is not available, please parse ontology from a local file.
Run Analysis_instance.ontology.parse('path/to/ontology')
""")
user_defined_onto = Graph()
self.ontology = user_defined_onto
else:
self.ontology = AnalysisTracker.mds_graph
else:
self.ontology = ontology_graph
self.MDS = Namespace("https://cwrusdle.bitbucket.io/mds/")
self.QUDT = Namespace("http://qudt.org/schema/qudt/")
self.ontology.bind("mds", self.MDS)
self.metadata_template = metadata_template if metadata_template else {}
self.metadata_obj = Metadata(self.metadata_template)
[docs]
def get_context(self) -> dict:
"""
Defines the JSON-LD context mapping prefixes to namespace URIs.
Returns:
dict: A dictionary of semantic prefix mappings (e.g., prov, mds, qudt).
"""
return {
self.prefix: self.base_uri,
"qudt": "http://qudt.org/schema/qudt/",
"mds": "https://cwrusdle.bitbucket.io/mds/",
"skos": "http://www.w3.org/2004/02/skos/core#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"owl": "http://www.w3.org/2002/07/owl#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"prov": "http://www.w3.org/ns/prov#",
"dcterms": "http://purl.org/dc/terms/",
"cco": "https://www.commoncoreontologies.org/",
"unit": "https://qudt.org/vocab/unit/",
"obo": "http://purl.obolibrary.org/obo/"
}
# --- IMPORT DETECTION ---
@staticmethod
def _get_module_info(module_name: str):
"""
Pure utility to extract metadata from a loaded module.
Type-safe for Pyright/Pylance.
"""
root_name = module_name.split('.')[0]
mod = sys.modules.get(root_name)
info = {
'skos:prefLabel': root_name,
'dcterms:type': 'Software',
'dcterms:hasVersion': 'Unknown',
'dcterms:identifier': 'Unknown',
'dcterms:publisher': 'Unknown'
}
# 1. Check Standard Library
if root_name in sys.builtin_module_names:
info['dcterms:publisher'] = 'Python Standard Library'
if root_name == 'sys':
info['dcterms:hasVersion'] = sys.version.split()[0]
# 2. Check Filesystem Modules
elif mod:
# Safely get the file path to satisfy the type checker
f_path = getattr(mod, '__file__', None)
if f_path: # This check fixes the Pyright error
info['dcterms:identifier'] = f_path
info['dcterms:hasVersion'] = getattr(mod, '__version__',
getattr(mod, 'version', 'Unknown'))
# Now Pyright knows f_path is a string
if 'site-packages' in f_path or 'dist-packages' in f_path:
info['dcterms:publisher'] = 'Third Party Package'
else:
# Use os.getcwd() to identify local project modules
if os.getcwd() in f_path:
info['dcterms:publisher'] = 'User Module'
else:
info['dcterms:publisher'] = 'System/Local Environment'
else:
# Handle modules that exist in memory but have no file (e.g., dynamically created)
info['dcterms:publisher'] = 'In-Memory Module'
return info
@staticmethod
def _categorize_imports(software_list: list):
"""
Groups the flat software list into meaningful categories for the report.
"""
categorized = {
'third_party': [],
'standard_library': [],
'user_modules': [],
'other': []
}
for sw in software_list:
pub = sw.get('dcterms:publisher', 'Unknown')
if pub == 'Third Party Package':
categorized['third_party'].append(sw)
elif pub == 'Python Standard Library':
categorized['standard_library'].append(sw)
elif pub == 'User Module':
categorized['user_modules'].append(sw)
else:
categorized['other'].append(sw)
return categorized
[docs]
def detect_all_imports(self):
"""
Unified Environment Scanner for Jupyter and standard scripts.
Identifies top-level software dependencies currently available in the session.
"""
# 1. Access the current live namespace
try:
shell = get_ipython()
# Use Jupyter's user namespace if available, otherwise fallback to globals
namespace = shell.user_ns if shell else globals()
except ImportError:
namespace = globals()
found_software = []
seen_packages = set()
# 2. Iterate through every object currently accessible to the user
for name, obj in namespace.items():
# Ignore private variables and internal Jupyter tools
if name.startswith('_') or name == 'get_ipython':
continue
source_module = None
# Check if the object is a module (import pandas as pd)
if isinstance(obj, types.ModuleType):
source_module = obj.__name__
# Check if the object is an entity from a module (from pandas import DataFrame)
elif hasattr(obj, '__module__'):
source_module = getattr(obj, '__module__')
if source_module:
# Extract the root package name (e.g., 'pandas' from 'pandas.core.frame')
root_pkg = source_module.split('.')[0]
# 3. Only log unique, valid packages present in sys.modules
if root_pkg in sys.modules and root_pkg not in seen_packages:
# Skip the tracker itself and standard built-ins to reduce noise
if root_pkg == self.__class__.__module__ or root_pkg == 'builtins':
continue
# Fetch metadata using the static utility method
software_info = self._get_module_info(root_pkg)
# Assign a stable FAIR identifier
software_info['@id'] = f'{self.prefix}:Software_{root_pkg}'
found_software.append(software_info)
seen_packages.add(root_pkg)
self.imports = found_software
# --- WRAPPERS ---
[docs]
def track(self, func):
"""
A decorator to automatically wrap a function with provenance tracking.
Args:
func: The function to be decorated.
Returns:
function: The wrapped function that executes via run_and_track.
"""
@wraps(func)
def wrapper(*args, **kwargs):
return self.run_and_track(func, *args, **kwargs)
return wrapper
[docs]
def run_and_track(self, func, *args, **kwargs):
"""
Executes a function while auditing arguments, results, and environment.
This method acts as a high-level provenance wrapper. It captures the
"top-most" (direct) input IRIs from the function signature and the
direct output IRIs from the return value. While all internal data
structures (like nested dictionary keys) are routed and saved to the
global metadata log, only the direct IRIs are linked to the Activity
node via CCO and PROV-O properties.
The method performs the following audit steps:
1. Generates a unique 15-digit numeric activity ID.
2. Binds and routes direct function arguments to capture input IRIs.
3. Triggers a live environment scan (imports/sys.modules).
4. Executes the function while monitoring OS-level file handles.
5. Routes and captures return value IRIs.
6. Finalizes a Linked Data Activity node with prov:used and prov:generated.
Args:
func (callable): The scientific function or method to be executed.
*args: Positional arguments to be passed to the target function.
**kwargs: Keyword arguments to be passed to the target function.
Returns:
Any: The original return value of the wrapped function. If an
exception occurs, it returns None after logging the error
as a provenance event.
"""
# 1. Setup Activity Identity
activity_num = str(uuid4().int)[-15:]
run_id = f"{func.__name__}_activity{activity_num}_{self.analysis_id}"
activity_iri = f"{self.prefix}:{run_id}"
start_time = datetime.now().isoformat()
# Trackers for direct IRIs only
direct_input_iris = []
direct_output_iris = []
# 2. Capture Direct Input IRIs from Signature
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
for name, val in bound_args.arguments.items():
if name == 'self':
continue
# Capture the IRI string returned by the routing logic
iri = self._route_data(name, val, parent_id=run_id)
if iri:
direct_input_iris.append(iri)
# 3. Environment Audit
self.detect_all_imports()
process = psutil.Process(os.getpid())
try:
# Execute
result = func(*args, **kwargs)
end_time = datetime.now().isoformat()
# 4. Capture Direct Output IRIs
if isinstance(result, tuple):
for i, item in enumerate(result):
out_iri = self._route_data(f"{func.__name__}_output_{i}", item, parent_id=run_id)
if out_iri:
direct_output_iris.append(out_iri)
else:
out_iri = self._route_data(f"{func.__name__}_output", result, parent_id=run_id)
if out_iri:
direct_output_iris.append(out_iri)
# 5. Finalize Activity with Direct Links
self.activity_log.append({
"@id": activity_iri,
"@type": "cco:ont00000366", # Act of Information Processing
"rdfs:label": "Act of Information Processing",
"skos:altLabel": f"Execution of function {func.__name__}",
"prov:startedAtTime": start_time,
"prov:endedAtTime": end_time,
"cco:ont00001921": direct_input_iris, # Direct IRIs list
"cco:ont00001986": direct_output_iris # Direct IRIs list
})
# 6. Capture File Events linked to this Activity
if self.file_events_store:
for file in process.open_files():
mode = getattr(file, 'mode', 'r')
event_type = "read/import" if 'r' in mode else "write/modification"
self.file_events.append({
"@id": f"{self.prefix}:fileEvent{str(uuid4().int)[-15:]}_{self.analysis_id}",
"@type": "cco:ont00000958",
"mds:fileName": os.path.basename(file.path),
"mds:fileLocation": file.path,
"mds:fileEvent": event_type,
"prov:wasInformedBy": activity_iri,
"prov:generatedAtTime": datetime.now().isoformat()
})
metadata_template, matched_log, unmatched_log = self.create_metadata_template()
self.metadata_obj.update_bulk(metadata_template)
self.semantic_remapping(unmatched_log)
return result
except Exception as e:
error_msg = f"Error in {func.__name__}: {str(e)}"
print(f"⚠️ {error_msg}")
err_iri = self._route_data(f"{func.__name__}_ERROR", error_msg, parent_id=run_id)
self.activity_log.append({
"@id": activity_iri,
"prov:startedAtTime": start_time,
"cco:ont00001921": direct_input_iris,
"cco:ont00001986": [err_iri] if err_iri else []
})
return None
[docs]
def semantic_remapping(self, unmatched_log):
"""
Refines simple Python types by matching them against the
current metadata template's semantic types.
"""
# 1. Pre-process the template into a quick-lookup dictionary
# This prevents nested loops and significantly speeds up the process
metadata_template = self.metadata_obj.metadata_temp
graph_template = metadata_template.get("@graph", [])
# Create a map of {altLabel: semantic_type}
ontology_map = {
item.get('skos:altLabel'): item.get('@type')
for item in graph_template
if item.get('skos:altLabel')
}
updated_sources = []
for entry in self.sources:
arg_type = entry.get("mds:argumentType")
var_name = entry.get("skos:altLabel")
# Check if it's a simple type and we have a semantic match
if arg_type in ('int', 'float', 'str', 'bool'):
if var_name in unmatched_log:
semantic_type = "cco:ont00000958"
else:
semantic_type = ontology_map.get(var_name)
if semantic_type:
# Upgrade from generic cco:ont00000958 to ontology-backed terms if matched
entry['@type'] = semantic_type
# Always append the entry so we don't lose provenance data
updated_sources.append(entry)
self.sources = updated_sources
print(f"✅ Semantic remapping complete. Checked {len(self.sources)} entries.")
def _route_data(self, name, val, parent_id=None):
"""
The central dispatcher that directs data to specific tracking methods
based on the object's type (DataFrame, Dict, List, etc.).
Args:
name: The variable name or identifier.
val: The data object to be tracked.
parent_id: Optional identifier of the parent container for nesting.
"""
name = normalize_iri(name)
if isinstance(val, pd.DataFrame):
return self.track_dataframe(name, val, parent_id)
elif isinstance(val, dict):
return self.track_dict(name, val, parent_id)
elif isinstance(val, (list, np.ndarray)):
return self.track_list_array(name, val, parent_id)
elif isinstance(val, (str, int, float, bool)):
return self.track_simple_datatype(name, val, parent_id)
else:
return self.track_other(name, val, parent_id)
# --- TRACKING METHODS ---
[docs]
def track_simple_datatype(self, name, val, parent_id=None):
"""
Tracks primitive types (str, int, float, bool) and attempts to
map them to ontology terms using fuzzy matching.
Args:
name: Variable name.
val: The primitive value.
parent_id: ID of the containing process or object.
"""
self.sources.append({
"@id": f"{self.prefix}:{name}.{self.analysis_id}",
"@type": "cco:ont00000958",
"mds:argumentIdentifier": f"{name}.{self.analysis_id}",
"skos:altLabel": name,
"mds:argumentType": type(val).__name__,
"qudt:value": val,
"mds:containerIdentifier": {
"@id": f"{self.prefix}:{parent_id}"
}
})
return f"{self.prefix}:{name}.{self.analysis_id}"
[docs]
def track_dict(self, name, val, parent_id=None):
"""
Logs a dictionary's keys and recursively tracks its nested values.
Args:
name: Dictionary name.
val: The dictionary object.
parent_id: ID of the containing process or object.
"""
current_id = f"{name}.{self.analysis_id}"
self.sources.append({
"@id": f"{self.prefix}:{current_id}",
"@type": "cco:ont00000958",
"mds:argumentIdentifier": current_id,
"skos:altLabel": name,
"mds:argumentType": "dictionary",
"mds:keys": list(val.keys()),
"mds:containerIdentifier": {
"@id": f"{self.prefix}:{parent_id}"
} # Links to its container
})
for k, v in val.items():
# Recursively pass the current dict as the new parent
self._route_data(f"{name}/{k}", v, parent_id=current_id)
return f"{self.prefix}:{current_id}"
[docs]
def track_dataframe(self, name, df, parent_id=None):
"""
Logs structural metadata of a Pandas DataFrame, including column
names and row counts.
Args:
name: DataFrame name.
df: The pandas DataFrame object.
parent_id: ID of the containing process or object.
"""
self.sources.append({
"@id": f"{self.prefix}:{name}.{self.analysis_id}",
"@type": "cco:ont00000958",
"mds:argumentIdentifier": f"{name}.{self.analysis_id}",
"skos:altLabel": name,
"mds:argumentType": "dataframe",
"mds:columnsList": list(df.columns),
"mds:numberOfRows": len(df),
"mds:containerIdentifier": {
"@id": f"{self.prefix}:{parent_id}"
}
})
return f"{self.prefix}:{name}.{self.analysis_id}"
[docs]
def track_list_array(self, name, data, parent_id = None):
"""
Tracks the dimensions and size of lists and NumPy arrays.
Args:
name: Array or list name.
data: The sequence or array-like object.
parent_id: ID of the containing process or object.
"""
# 1. Handle NumPy Arrays
if hasattr(data, 'shape'):
dimensions = list(data.shape)
# 2. Handle Nested Lists (The recursive way)
elif isinstance(data, list):
dimensions = []
temp = data
while isinstance(temp, list) and len(temp) > 0:
dimensions.append(len(temp))
temp = temp[0]
else:
dimensions = [len(data)]
shape_str = "x".join(map(str, dimensions))
self.sources.append({
"@id": f"{self.prefix}:{name}.{self.analysis_id}",
"@type": "cco:ont00000958",
"mds:argumentIdentifier": f"{name}.{self.analysis_id}",
"skos:altLabel": name,
"mds:argumentType": type(data).__name__,
"mds:listSize": len(data),
"mds:arrayShape": shape_str,
"mds:containerIdentifier": {
"@id": f"{self.prefix}:{parent_id}"
}
})
return f"{self.prefix}:{name}.{self.analysis_id}"
[docs]
def track_other(self, name, obj, parent_id=None):
"""
Falls back to inspecting custom objects by logging their public
attributes as nested data.
Args:
name: Object name.
obj: The Python object to inspect.
parent_id: ID of the containing process or object.
"""
current_id = f"{name}.{self.analysis_id}"
# Log the object...
self.sources.append({
"@id": f"{self.prefix}:{current_id}",
"@type": "cco:ont00000958",
"mds:argumentIdentifier": current_id,
"skos:altLabel": name,
"mds:argumentType": type(obj).__name__,
"mds:containerIdentifier": {
"@id": f"{self.prefix}:{parent_id}"
}
})
# Inspect the object...
try:
for attr_name, attr_val in vars(obj).items():
if not attr_name.startswith('_') and (isinstance(attr_val, (int, float, str, bool, dict, list, pd.DataFrame) or hasattr(attr_val, '__dict__'))):
self._route_data(f"{name}/{attr_name}", attr_val, parent_id=current_id)
except TypeError:
pass
return f"{self.prefix}:{current_id}"
#### METADATA OBJECT WRAPPERS ####
[docs]
def create_analysis_jsonld(self, license: Optional[str] = None):
"""
Assembles all tracked data and file events into a valid JSON-LD string.
Returns:
str: A formatted JSON-LD string containing the analysis graph.
"""
orcid_verification = "ORCID iD verified." if self.orcid_verified else "ORCID iD not verified."
if(not license):
license_uri = "https://spdx.org/licenses/CC0-1.0.html"
print("No license provided. Default to CC0-1.0 (Public Domain)")
elif not license.startswith("http"):
# Load SPDX license list
spdx_data = load_licenses()
valid_ids = {lic["licenseId"] for lic in spdx_data["licenses"]}
# Check if the provided short ID is valid
if license not in valid_ids:
raise ValueError(
f"Invalid SPDX license ID '{license}'.\n"
f"Please use one from https://spdx.org/licenses/."
)
license_uri = f"https://spdx.org/licenses/{license}.html"
else:
# Full URI provided; assume it's valid
license_uri = license
output = {
"@context": self.get_context(),
"@graph":[
{
"@id": f"mds:{self.analysis_id}",
"@type": "mds:AnalyticalResult",
"dcterms:creator":{
"@id": f"https://orcid.org/{self.orcid}"
},
"dcterms:date": datetime.now().strftime("%Y-%m-%d"),
"dcterms:source": self.sources,
"dcterms:provenance": self.file_events,
"dcterms:description": orcid_verification,
"mds:hasStudyStage": "Analysis",
"dcterms:requires": self.imports if self.imports else [],
"obo:BFO_0000117": self.activity_log,
"dcterms:license": {"@id": license_uri}
},
]
}
return json.dumps(output, indent=2)
[docs]
def serialize_analysis_jsonld(self, license: Optional[str] = None):
"""
Writes the JSON-LD metadata to a physical file within the analysis directory.
"""
# 1. Define and create the directory
json_dir = os.path.join(self.home_path, "analysis_json")
os.makedirs(json_dir, exist_ok=True)
# 2. Construct the specific filename
filename = f"{self.proj_name}_{self.analysis_id}.json"
full_path = os.path.join(json_dir, filename)
# 3. Get the JSON-LD data and write to disk
jsonld_data = self.create_analysis_jsonld(license=license)
with open(full_path, 'w', encoding='utf-8') as f:
f.write(jsonld_data)
print(f"JSON-LD saved at {full_path}")
[docs]
def create_report(self) -> str:
"""
Generates a human-readable Markdown summary of the analysis
variables and file system activities.
Returns:
str: A Markdown formatted report.
"""
report = []
report.append(f"## Analysis Report: {self.proj_name}")
report.append(f"**Analysis ID:** `{self.analysis_id}`")
report.append(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"**Creator:** [{self.orcid}](https://orcid.org/{self.orcid}) ({'Verified' if self.orcid_verified else 'Unverified'})")
report.append("\n")
# 1. Inputs and Outputs
report.append("### 🧪 Data Sources & Variables")
if not self.sources:
report.append("_No variables tracked._")
else:
for s in self.sources:
shape_info = f" (Shape: {s.get('mds:arrayShape')})" if 'mds:arrayShape' in s else ""
val_info = f" = `{s.get('qudt:value')}`" if 'qudt:value' in s else ""
report.append(f"* **{s['skos:altLabel']}** ({s['mds:argumentType']}){shape_info}{val_info}")
# 2. Activity report
report.append('\n### 🕹️ Activity Report')
if not self.activity_log:
report.append('_No activity tracked_')
else:
for act in self.activity_log:
activity_info = f"{act.get('skos:altLabel')}"
start = f"{act.get('prov:startedAtTime')}"
end = f"{act.get('prov:endedAtTime')}"
report.append(f"* **{activity_info}**; Started at time **{start}**; Ended at time **{end}**; Performed by **{self.orcid}**")
# 3. System Imports
report.append("\n### 📂 Software Environment")
if not self.imports:
report.append("_No software environment tracked._")
else:
categorized = self._categorize_imports(self.imports)
sections = [
('third_party', '#### THIRD PARTY PACKAGES'),
('standard_library', '#### STANDARD LIBRARY'),
('user_modules', '#### USER/PROJECT MODULES'),
('other', '#### OTHER MODULES')
]
for cat_key, title in sections:
if not categorized.get(cat_key):
continue
report.append(f"\n{title}")
for sw in categorized[cat_key]:
# Extract data directly from the software node
name = sw.get('skos:prefLabel')
version = sw.get('dcterms:hasVersion')
path = sw.get('dcterms:identifier')
alias = sw.get('skos:altLabel')
# Formatting the summary line
alias_info = f" (accessed as '{alias}')" if alias else ""
report.append(f" • **{name}**{alias_info}")
# Meta info
if version and version != "Unknown":
report.append(f" └─ Version: {version}")
if path and path != "Unknown" and path != "Built-in":
report.append(f" └─ Location: {path}")
# 4. File System Events (The "Paper Trail")
report.append("\n### 📂 File System Activity")
if not self.file_events:
report.append("_No file system events detected._")
else:
report.append("| File Name | Event | mds:fileLocation |")
report.append("| :--- | :--- | :--- |")
for e in self.file_events:
report.append(f"| {e['mds:fileName']} | {e['mds:fileEvent']} | `{e['mds:fileLocation']}` |")
report.append("\n---")
return "\n".join(report)
[docs]
def save_report(self):
"""
Saves the human-readable Markdown report to the reports directory.
"""
report_dir = os.path.join(self.home_path, "reports")
os.makedirs(report_dir, exist_ok=True)
filename = f"{self.proj_name}_{self.analysis_id}_summary.md"
full_path = os.path.join(report_dir, filename)
with open(full_path, "w", encoding="utf-8") as f:
f.write(self.create_report())
print(f"Report saved at {full_path}")
[docs]
def create_arg_df(self):
"""
Flattens the tracked variables into a single-row Pandas DataFrame for
tabular comparison across different runs.
Returns:
pd.DataFrame: A DataFrame row containing run metadata and values.
"""
row_data = {
s["skos:altLabel"]: (s["qudt:value"] if s.get("mds:argumentType") in ['int', 'str', 'float', 'bool'] else s["mds:argumentIdentifier"])
for s in self.sources
}
# 2. Add your mandatory tracking columns
row_data["__rowkey__"] = self.analysis_id
row_data["ProjectTitle"] = self.proj_name
# 3. Create the DataFrame from this single-row dictionary
# Wrapping row_data in a list [] tells Pandas this is one row of data
return pd.DataFrame([row_data])
[docs]
class AnalysisGroup:
"""
Manages a collection of related AnalysisTracker instances, facilitating
group-level reporting and master graph generation.
"""
mds_graph = load_mds_ontology_graph()
def __init__(self,
proj_name: str,
home_path: str,
orcid: Optional[str] = "0000-0000-0000-0000",
metadata_template: Optional[dict] = None,
base_uri: Optional[str] = "https://cwrusdle.bitbucket.io/mds/",
ontology_graph: Optional[Graph] = None,
prefix: Optional[str] = "mds",
file_events: Optional[bool] = False) -> None:
"""
Initializes the group with shared project metadata.
Args:
proj_name: Name of the project group.
home_path: Root directory for all child analyses.
metadata_template: Metadata information about analysis parameters.
orcid: Researcher's ORCID iD.
base_uri: Base URI for semantic namespaces.
ontology_graph: Shared RDFLib Graph.
prefix: Prefix for the base URI.
file_events: Option to save file events. Default to False.
"""
self.analyses = {}
self.proj_name = proj_name
self.home_path = home_path
self.orcid = orcid
self.base_uri = base_uri
self.ontology = ontology_graph
if ontology_graph is None:
if AnalysisGroup.mds_graph is None:
print("""
MDS-Onto from source is not available, please parse ontology from a local file.
Run AnalysisGroup_instance.ontology.parse('path/to/ontology')
""")
user_defined_onto = Graph()
self.ontology = user_defined_onto
else:
self.ontology = AnalysisGroup.mds_graph
else:
self.ontology = ontology_graph
self.prefix = prefix
self.group_id = f"runGroup{str(uuid4().int)[-15:].zfill(15)}"
self.QUDT = Namespace("http://qudt.org/schema/qudt/")
self.MDS = Namespace("https://cwrusdle.bitbucket.io/mds/")
if metadata_template:
self.metadata_template = metadata_template
else:
self.metadata_template = {}
self.metadata_obj = Metadata(self.metadata_template)
self.store_file_events = file_events
[docs]
def get_context(self) -> dict:
"""
Defines the JSON-LD context for the group metadata.
Returns:
dict: Prefix to namespace URI mappings.
"""
return {
self.prefix: self.base_uri,
"qudt": "http://qudt.org/schema/qudt/",
"mds": "https://cwrusdle.bitbucket.io/mds/",
"skos": "http://www.w3.org/2004/02/skos/core#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"owl": "http://www.w3.org/2002/07/owl#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"prov": "http://www.w3.org/ns/prov#",
"dcterms": "http://purl.org/dc/terms/",
"cco": "https://www.commoncoreontologies.org/",
"unit": "https://qudt.org/vocab/unit/",
"obo": "http://purl.obolibrary.org/obo/"
}
[docs]
def track(self, func):
"""
A decorator to automatically wrap a function with provenance tracking.
Args:
func: The function to be decorated.
Returns:
function: The wrapped function that executes via run_and_track.
"""
@wraps(func)
def wrapper(*args, **kwargs):
return self.run_and_track(func, *args, **kwargs)
return wrapper
[docs]
def run_and_track(self, func, *args, tracker: Optional[AnalysisTracker] = None, **kwargs):
"""
Executes a function and stores metadata. Can use an existing tracker
to group multiple functions under one ID, or create a new one.
"""
# 1. Option: Use the injected tracker or create a new instance
analysis = tracker if tracker is not None else AnalysisTracker(
proj_name=self.proj_name,
home_path=self.home_path,
orcid=self.orcid,
metadata_template=self.metadata_template,
base_uri=self.base_uri,
ontology_graph=self.ontology,
prefix=self.prefix,
file_events=self.store_file_events
)
# 2. Execute the function via the tracker
analysis_result = analysis.run_and_track(func, *args, **kwargs)
# 3. Update Group-level registries
# We use the analysis_id as the key. If using the same tracker,
# this will update the existing entry rather than creating a new row.
self.analyses[analysis.analysis_id] = {
"analysis_obj": analysis,
"result": analysis_result,
"jsonld": analysis.create_analysis_jsonld(),
"report": analysis.create_report(),
"dataframe": analysis.create_arg_df()
}
# Generate and update semantic metadata
analysis_temp, _, _ = analysis.create_metadata_template()
self.metadata_obj.update_bulk(analysis_temp)
return analysis_result
[docs]
def create_group_arg_df(self) -> pd.DataFrame:
"""
Aggregates all individual analysis DataFrames into a single
master DataFrame.
Returns:
pd.DataFrame: Concatenated data from all tracked analyses.
"""
if not self.analyses:
warnings.warn("No analyses have been tracked in this group yet.")
return pd.DataFrame()
df_list = [meta["dataframe"] for meta in self.analyses.values()]
group_df = pd.concat(df_list, axis=0, ignore_index=True, sort=False)
cols = group_df.columns.tolist()
metadata_cols = ["ProjectTitle", "__rowkey__"]
existing_meta = [c for c in metadata_cols if c in cols]
data_cols = [c for c in cols if c not in existing_meta]
result = group_df[existing_meta + data_cols]
return cast(pd.DataFrame, result)
[docs]
def create_MatDatSciDf(self):
"""
Converts the group data into a MatDatSciDf object, integrating
ontology-mapped metadata.
Returns:
MatDatSciDf: The semantic-aware DataFrame object.
"""
metadata_template, matched_log, unmatched_log = self.create_metadata_template()
arg_df = self.create_group_arg_df()
arg_MatDatSciDf = MatDatSciDf(df = arg_df,
metadata_template=metadata_template,
ontology_graph=self.ontology,
matched_log=matched_log,
unmatched_log=unmatched_log)
return arg_MatDatSciDf
[docs]
def create_group_report(self):
"""
Consolidates individual analysis reports into one master Markdown document.
Returns:
str: A full Markdown report for the entire group.
"""
group_report = []
group_report.append(f"# Group Analysis Report: {self.proj_name}")
group_report.append(f"**Group Analysis ID:** `{self.group_id}`")
group_report.append(f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
group_report.append("\n---\n")
for analysis_id, meta in self.analyses.items():
report = meta["report"]
group_report.append(report)
group_report.append(f"Generated by FAIRLinked version {__version__}")
group_report.append("\n---")
return "\n".join(group_report)
[docs]
def save_report(self):
"""
Saves the consolidated group report to a dedicated group directory.
"""
report_dir = os.path.join(self.home_path, self.group_id)
os.makedirs(report_dir, exist_ok=True)
filename = f"{self.proj_name}_{self.group_id}_summary.md"
full_path = os.path.join(report_dir, filename)
with open(full_path, "w", encoding="utf-8") as f:
f.write(self.create_group_report())
print(f"Report saved at {full_path}")
[docs]
def save_jsonld(self):
"""
Serializes all individual analysis JSON-LDs and creates a
master graph file that links all components to the group activity.
"""
combined_nodes = []
# Create a list of references to show "Components" of the group
analysis_refs = [{"@id": f"mds:{aid}"} for aid in self.analyses.keys()]
# 1. Loop through the dictionary using .items()
for analysis_id, meta in self.analyses.items():
# Trigger the individual tracker's serialization
meta["analysis_obj"].serialize_analysis_jsonld()
# Load the individual graph
individual_data = json.loads(meta["jsonld"])
if "@graph" in individual_data:
for node in individual_data["@graph"]:
# Link the primary Analysis Activity to this Group
if node["@id"] == f"mds:{analysis_id}":
node["group"] = {"@id": f"mds:{self.group_id}"}
combined_nodes.extend(individual_data["@graph"])
# 2. Define the Group Metadata Node
group_node = {
"@id": f"mds:{self.group_id}",
"@type": "mds:AnalyticalResult",
"dcterms:title": self.proj_name,
"dcterms:creator": {"@id": f"https://orcid.org/{self.orcid}"},
"dcterms:date": datetime.now().strftime("%Y-%m-%d"),
"mds:hasAnalysisComponent": analysis_refs,
"mds:hasStudyStage": "Analysis"
}
# 3. Final Master Graph Assembly
master_output = {
"@context": self.get_context(),
"@graph": [group_node] + combined_nodes
}
# 4. Save to file
group_json_dir = os.path.join(self.home_path, self.group_id, "_group_json")
os.makedirs(group_json_dir, exist_ok=True)
filename = f"{self.proj_name}_{self.group_id}_master_graph.json"
with open(os.path.join(group_json_dir, filename), "w", encoding="utf-8") as f:
json.dump(master_output, f, indent=2)
print(f"JSON-LD saved at {os.path.join(group_json_dir, filename)}")
#### METADATA OBJECT WRAPPERS ####