Source code for FAIRLinked.RDFTableConversion.csv_to_jsonld_mapper

import pandas as pd
import json
import re
import os
import difflib
from rdflib import Graph, Namespace, URIRef
from rdflib.namespace import RDF, RDFS, OWL, SKOS
from ..InterfaceMDS.load_mds_ontology import load_mds_ontology_graph
import requests
from .MDS_DF.main import MatDatSciDf

[docs] def normalize(text): """ Normalize a text string by converting it to lowercase and removing non-alphanumeric characters. Args: text (str): Input text to normalize. Returns: str: Normalized string. """ return re.sub(r'[^a-zA-Z0-9]', '', text.lower())
[docs] def get_local_name(uri): uri_str = str(uri) # Split by / or # and get the last part if '/' in uri_str: return uri_str.split('/')[-1] elif '#' in uri_str: return uri_str.split('#')[-1] return uri_str
[docs] def extract_terms_from_ontology(ontology_graph): """ Extract terms from an RDF graph representing an OWL ontology. Args: ontology_graph (rdflib.Graph): The ontology RDF graph. Returns: list[dict]: A list of dictionaries containing term IRIs, original labels, and normalized labels. """ MDS = Namespace("https://cwrusdle.bitbucket.io/mds/") terms = [] for s in ontology_graph.subjects(RDF.type, OWL.Class): # Get both altLabels and rdfs:labels labels = list(ontology_graph.objects(s, SKOS.altLabel)) + list(ontology_graph.objects(s, RDFS.label)) # Get definitions term_definitions = list(ontology_graph.objects(s, SKOS.definition)) definition = str(term_definitions[0]) if term_definitions else "" study_stage = list(ontology_graph.objects(s, MDS.hasStudyStage)) for label in labels: label_str = str(label).strip() terms.append({ "iri": str(s), "label": label_str, "normalized": normalize(label_str), "definition": definition, "study_stage": study_stage }) return terms
[docs] def find_best_match(column, ontology_terms): """ Find the best matching ontology term for a given column name. Args: column (str): The name of the column from the CSV file. ontology_terms (list[dict]): List of extracted ontology terms. Returns: dict or None: The best-matching ontology term, or None if no good match is found. """ norm_col = normalize(column) # First, try exact normalized match matches = [term for term in ontology_terms if term["normalized"] == norm_col] if matches: return matches[0] # Otherwise, find close match using difflib all_norm = [term["normalized"] for term in ontology_terms] close_matches = difflib.get_close_matches(norm_col, all_norm, n=1, cutoff=0.8) if close_matches: match_norm = close_matches[0] return next(term for term in ontology_terms if term["normalized"] == match_norm) return None
[docs] def extract_qudt_units(url="https://qudt.org/vocab/unit/"): """ Extract all units from the QUDT ontology programmatically. Args: url: The URL of the QUDT unit vocabulary Returns: Dictionary containing unit information """ print(f"Fetching QUDT ontology from: {url}") try: # Fetch the ontology data response = requests.get(url, headers={'Accept': 'text/turtle'}) response.raise_for_status() content = response.text print(f"Successfully fetched {len(content)} characters of data\n") # Extract units using regex patterns # Pattern to match unit definitions: unit:UNIT_NAME unit_pattern = r'unit:([A-Z0-9_\-]+)\s*\n\s*a\s+qudt:(?:Unit|DerivedUnit)' # Find all unit names units = re.findall(unit_pattern, content) # Dictionary to store unit details unit_details = {} # For each unit, extract additional information for unit_name in units: # Create a pattern to find the unit's definition block unit_block_pattern = rf'unit:{re.escape(unit_name)}\s*\n(.*?)(?=\nunit:|$)' match = re.search(unit_block_pattern, content, re.DOTALL) if match: unit_block = match.group(1) # Extract symbol symbol_match = re.search(r'qudt:symbol\s+"([^"]+)"', unit_block) symbol = symbol_match.group(1) if symbol_match else None # Extract label(s) label_matches = re.findall(r'rdfs:label\s+"([^"]+)"(?:@\w+)?', unit_block) label = label_matches[0] if label_matches else unit_name # Extract description desc_match = re.search(r'dcterms:description\s+"([^"]+)"', unit_block) description = desc_match.group(1) if desc_match else None # Extract UCUM code ucum_match = re.search(r'qudt:ucumCode\s+"([^"]+)"', unit_block) ucum_code = ucum_match.group(1) if ucum_match else None # Extract conversion multiplier conv_match = re.search(r'qudt:conversionMultiplier\s+([\d.E\-+]+)', unit_block) conversion = conv_match.group(1) if conv_match else None unit_details[unit_name] = { 'name': unit_name, 'label': label, 'symbol': symbol, 'ucum_code': ucum_code, 'conversion_multiplier': conversion, 'description': description[:100] + '...' if description and len(description) > 100 else description } return unit_details except requests.RequestException as e: print(f"Error fetching data for units: {e}") return {}
[docs] def extract_quantity_kinds(): try: url = "https://qudt.org/vocab/quantitykind/" # Fetch the ontology data response = requests.get(url, headers={'Accept': 'text/turtle'}) response.raise_for_status() g = Graph() g.parse(data=response.text, format='turtle') predicate = URIRef("http://qudt.org/schema/qudt/applicableUnit") kinds = {} for subject in g.subjects(predicate=predicate): # Get all objects for this subject-predicate pair s= normalize(get_local_name(subject)) kinds[s] = [get_local_name(obj) for obj in g.objects(subject=subject, predicate=predicate)] return kinds except Exception as e: print(e) return {}
[docs] def prompt_for_missing_fields(col, unit, study_stage, ontology_graph, units): print(f"\n-- Getting metadata for column: {col} --") print("(Type 'skip' or 'exit' to default to UNITLESS)") # --- Part 1: Unit Selection Loop --- while True: user_input = input(f"Search unit/UCUM for '{col}': ").strip() # If user wants to stop or skip, default to UNITLESS and break the loop if user_input.lower() in ['exit', 'quit', 'stop', 'skip', '']: unit = "UNITLESS" break # Search Logic matches = [] for key, details in units.items(): if (user_input == details.get('ucum_code') or user_input.lower() == details.get('label').lower()): matches.append(key) if len(matches) == 1: unit = matches[0] break # Found it! Move on to next fields elif len(matches) > 1: print("\nMultiple matches found. Select a number or type 'back':") for i, m in enumerate(matches, 1): print(f" {i}. {units[m]['label']} ({m})") choice = input("> ") if choice.lower() == 'back': continue if choice.isdigit() and 1 <= int(choice) <= len(matches): unit = matches[int(choice) - 1] break else: print(f"No match for '{user_input}'. Try again or type 'exit' to use UNITLESS.") # --- Part 2: Study Stage --- valid_study_stages = [ "Synthesis", "Formulation", "Material Processing", "Sample", "Tool", "Recipe", "Result", "Analysis", "Modeling", "" ] norm_study_stages = [normalize(ss) for ss in valid_study_stages] # Initial check if not study_stage or normalize(study_stage) not in norm_study_stages: print("\nPlease enter a valid study stage or press 'enter' to skip: ") for ss in valid_study_stages: if ss: print(f" - {ss}") study_stage = input("Stage: ") # Keep asking until valid while normalize(study_stage) not in norm_study_stages: study_stage = input("Invalid stage. Please try again (or 'enter' to skip): ") # Normalize back to the pretty-print version study_stage = valid_study_stages[norm_study_stages.index(normalize(study_stage))] # --- Part 3: Notes --- notes = input("Please enter notes or press 'Enter' to skip: ") return unit, study_stage, notes
[docs] def get_license(): return input("Please enter license")
[docs] def jsonld_template_generator(csv_path, ontology_graph, output_path, matched_log_path, unmatched_log_path, skip_prompts=False): """ Use a CSV file into a JSON-LD template that user can fill out column metadata. Args: csv_path (str): Path to the CSV file to generate JSON-LD template. ontology_graph (rdflib.Graph): The ontology RDF graph for matching terms. output_path (str): Path to write the resulting JSON-LD file. matched_log_path (str): Path to write the log of columns that matched the ontology. unmatched_log_path (str): Path to write the log of columns that can't be found in the ontology. skip_prompts (bool): Allow users to skip metadata prompts """ df = pd.read_csv(csv_path) mds_df = MatDatSciDf( df = df, metadata_rows=True, ontology_graph=ontology_graph ) metadata_template, matched_log, unmatched_log = mds_df.template_generator(skip_prompts=skip_prompts) os.makedirs(os.path.dirname(output_path), exist_ok=True) os.makedirs(os.path.dirname(matched_log_path), exist_ok=True) os.makedirs(os.path.dirname(unmatched_log_path), exist_ok=True) # Write JSON-LD with open(output_path, "w") as f: json.dump(metadata_template, f, indent=2) # Write matched log with open(matched_log_path, "w") as f: f.write("\n".join(matched_log)) # Write unmatched log (remove duplicates with set) with open(unmatched_log_path, "w") as f: f.write("\n".join(sorted(set(unmatched_log)))) # BUG FIX: previously had stray '-' before 'fix'
[docs] def jsonld_temp_gen_interface(args): print(args.ontology_path) if args.ontology_path == "default": ontology_graph = Graph() ontology_graph = load_mds_ontology_graph() else: ontology_graph = Graph() ontology_graph.parse(source=args.ontology_path) matched_path = os.path.join(args.log_path, "matched.txt") unmatched_path = os.path.join(args.log_path, "unmatched.txt") jsonld_template_generator(csv_path=args.csv_path, ontology_graph=ontology_graph, output_path=args.output_path, matched_log_path=matched_path, unmatched_log_path=unmatched_path, skip_prompts=args.skip_prompts)