Source code for FAIRLinked.QBWorkflow.rdf_to_df

import os
import json
import re
from rdflib import Graph, URIRef, Literal, Namespace
from rdflib.namespace import RDF, SKOS, DCTERMS, XSD
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

from FAIRLinked.QBWorkflow.utility import NAMESPACE_MAP

[docs] def parse_rdf_to_df(file_path: str, variable_metadata_json_path: str, arrow_output_path: str) -> tuple: """ Description: Parses one or multiple RDF Data Cube file(s) (TTL or JSON-LD) into a single Pandas DataFrame plus a consolidated variable_metadata dictionary. This function supports both "row-by-row" style RDF (each row => separate qb:DataSet) and "entire" style RDF (one qb:DataSet with many slices), as well as any mixture of them (multiple DataSets across multiple files). After parsing each file's DataSets, it merges the partial DataFrames and merges partial metadata: - Merges units from different Observations - Merges altLabels, categories, and measure/dimension flags Then sorts the resulting DataFrame and writes: 1) The final DataFrame => Parquet 2) The final variable_metadata => JSON Finally, prints summary stats and previews the first row. Algorithm (High-Level): 1. Gather all valid RDF files (.ttl/.jsonld/.json-ld) from either a single file path or a directory (recursively). 2. Initialize an empty list of partial DataFrames (all_dfs) and an empty dictionary for final_variable_metadata. 3. For each RDF file: a. Determine the rdflib parse format ('turtle' or 'json-ld'). b. Parse the Graph. c. Pass the Graph to _parse_single_rdf_graph(...) which may produce: (partial_df, partial_metadata). d. Concatenate partial_df to the global list (if not empty). e. Merge partial_metadata into final_variable_metadata, unifying measure units, altLabels, categories, etc. 4. Concatenate all partial DataFrames if any => final_df. 5. Sort final_df by "ExperimentId" if present. 6. Reorder columns by (Category, ColumnName), with "ExperimentId" forced to front if it exists. 7. Convert final_df => PyArrow Table => Parquet => arrow_output_path. 8. Dump final_variable_metadata => JSON => variable_metadata_json_path. 9. Print summary stats & preview. Args: file_path (str): Path to either a single .ttl/.jsonld file or a folder containing multiple .ttl/.jsonld files. variable_metadata_json_path (str): Destination to write the final variable_metadata as JSON. arrow_output_path (str): Destination to write the final PyArrow Table (saved in Parquet format). Returns: (pa.Table, dict): pa.Table => The final table of observations, after merging across all files. dict => The final merged variable_metadata mapping each column => metadata. """ # 1) Collect RDF files rdf_files = _collect_rdf_files(file_path) if not rdf_files: raise ValueError(f"No RDF files (.ttl, .jsonld, .json-ld) found in '{file_path}'") # We'll store partial DataFrames + partial metadata from each file all_dfs = [] final_variable_metadata = {} # 2) Parse each RDF file for f in rdf_files: rdf_format = _guess_rdf_format(f) print(f"\nParsing file: {f} as {rdf_format} ...") # Parse graph g = Graph() g.parse(source=f, format=rdf_format) # Each file can contain multiple qb:DataSets, gather them partial_df, partial_metadata = _parse_single_rdf_graph(g) # if partial_df has data => accumulate if partial_df is not None and not partial_df.empty: all_dfs.append(partial_df) # unify partial_metadata => final_variable_metadata for var_name, pm in partial_metadata.items(): if var_name not in final_variable_metadata: final_variable_metadata[var_name] = pm else: # unify measure units existing_units = set(final_variable_metadata[var_name].get("Unit", [])) new_units = set(pm.get("Unit", [])) final_variable_metadata[var_name]["Unit"] = sorted(existing_units.union(new_units)) # unify altLabel, category, IsMeasure if (not final_variable_metadata[var_name].get("AltLabel") and pm.get("AltLabel")): final_variable_metadata[var_name]["AltLabel"] = pm["AltLabel"] if (not final_variable_metadata[var_name].get("Category") and pm.get("Category")): final_variable_metadata[var_name]["Category"] = pm["Category"] if (final_variable_metadata[var_name].get("IsMeasure","No")=="No" and pm.get("IsMeasure","No")=="Yes"): final_variable_metadata[var_name]["IsMeasure"] = "Yes" # ExistingURI generally consistent, so we won't overwrite # 2b) Combine all partial DataFrames if all_dfs: final_df = pd.concat(all_dfs, ignore_index=True) else: final_df = pd.DataFrame() # 3) Sort final_df by 'ExperimentId' if present if "ExperimentId" in final_df.columns: final_df.sort_values(by="ExperimentId", inplace=True) # 4) Reorder columns by (Category, ColumnName), ensuring 'ExperimentId' is front if present var_categories = { vn: (final_variable_metadata[vn].get("Category") or "") for vn in final_variable_metadata } all_cols = list(final_df.columns) if "ExperimentId" in all_cols: all_cols.remove("ExperimentId") # Sort by category, then colName all_cols.sort(key=lambda c: (var_categories.get(c, ""), c)) if "ExperimentId" in final_df.columns: final_cols = ["ExperimentId"] + all_cols else: final_cols = all_cols final_df = final_df[final_cols] # 5) Convert => PyArrow final_table = pa.Table.from_pandas(final_df, preserve_index=False) # 6) Save variable_metadata => JSON with open(variable_metadata_json_path, "w", encoding="utf-8") as outf: json.dump(final_variable_metadata, outf, indent=2, ensure_ascii=False) # 7) Save table => Parquet pq.write_table(final_table, arrow_output_path) # 8) Print final stats & preview _print_final_stats_and_preview(final_df, var_categories, rdf_files, file_path) return final_table, final_variable_metadata
# ------------------------------------------------------------------------------ # PARSING ONE RDF GRAPH => DataFrame, Metadata # ------------------------------------------------------------------------------ def _parse_single_rdf_graph(graph: Graph) -> tuple: """ Description: Given a single rdflib.Graph that may contain multiple qb:DataSet URIs, parse each DataSet, gather partial DataFrames, unify partial metadata, and combine them into a single DataFrame + metadata for this one Graph. Algorithm: 1) Identify all qb:DataSet URIs in graph. 2) For each qb:DataSet => find qb:structure => parse dims/measures => partial_meta 3) Also parse all slices & observations => partial DataFrame 4) Merge partial DataFrames => merged_df 5) Merge partial metadata => partial_meta_master Args: graph (Graph): The rdflib.Graph loaded from a single RDF file. Returns: (pd.DataFrame, dict): DataFrame => combined data from all qb:DataSets in this file dict => combined variable_metadata from these DataSets """ # The URIRef for qb:DataSet qb_dataset_uri = URIRef(NAMESPACE_MAP['qb'] + "DataSet") datasets = set(graph.subjects(RDF.type, qb_dataset_uri)) if not datasets: # no dataset => skip return pd.DataFrame(), {} partial_frames = [] partial_meta_master = {} # For each dataset found for ds in datasets: qb_structure_uri = URIRef(NAMESPACE_MAP['qb'] + "structure") dsd_uri_list = list(graph.objects(ds, qb_structure_uri)) if not dsd_uri_list: # dataset with no qb:structure => skip continue dsd_uri = dsd_uri_list[0] # parse dimension & measure definitions => partial_meta dims, meas, partial_meta = _extract_dims_meas_from_dsd(graph, dsd_uri) # unify partial_meta => partial_meta_master for var_name, pm in partial_meta.items(): if var_name not in partial_meta_master: partial_meta_master[var_name] = pm else: # unify measure units existing_units = set(partial_meta_master[var_name].get("Unit", [])) new_units = set(pm.get("Unit", [])) partial_meta_master[var_name]["Unit"] = sorted(existing_units.union(new_units)) if (not partial_meta_master[var_name].get("AltLabel") and pm.get("AltLabel")): partial_meta_master[var_name]["AltLabel"] = pm["AltLabel"] if (not partial_meta_master[var_name].get("Category") and pm.get("Category")): partial_meta_master[var_name]["Category"] = pm["Category"] if (partial_meta_master[var_name].get("IsMeasure","No")=="No" and pm.get("IsMeasure","No")=="Yes"): partial_meta_master[var_name]["IsMeasure"] = "Yes" # ExistingURI presumably consistent # parse slices + observations => partial_df partial_df = _extract_data_for_dataset(graph, ds, dims, meas, partial_meta) if partial_df is not None and not partial_df.empty: partial_frames.append(partial_df) # combine partial_frames => merged_df if partial_frames: merged_df = pd.concat(partial_frames, ignore_index=True) else: merged_df = pd.DataFrame() return merged_df, partial_meta_master def _extract_dims_meas_from_dsd(graph: Graph, dsd_uri: URIRef) -> tuple: """ Description: For a qb:DataStructureDefinition, find dimension and measure definitions. Build partial variable_metadata with "IsMeasure", "AltLabel", "Category", "Unit" (empty), "ExistingURI". Skip qb:measureType dimension. Algorithm: 1) Identify all qb:component blank nodes from dsd_uri. 2) Each component may have either qb:dimension or qb:measure. 3) If it's dimensionProperty => add to dim_list If measureProperty => add to meas_list 4) altLabel => partial_meta[var_name]["AltLabel"], category => partial_meta[var_name]["Category"] 5) Return (dim_list, meas_list, partial_meta). Args: graph (Graph) dsd_uri (URIRef) Returns: (dimensions (list), measures (list), partial_variable_metadata (dict)) """ measure_type_uri_str = NAMESPACE_MAP['qb'] + "measureType" qb_comp_uri = URIRef(NAMESPACE_MAP['qb'] + "component") qb_dim_prop = URIRef(NAMESPACE_MAP['qb'] + "DimensionProperty") qb_meas_prop = URIRef(NAMESPACE_MAP['qb'] + "MeasureProperty") dim_list = [] meas_list = [] var_meta = {} # For each qb:component triple => usually comp_bnode for comp_bnode in graph.objects(dsd_uri, qb_comp_uri): dimension_prop = None measure_prop = None # check if comp_bnode has qb:dimension or qb:measure for p, o in graph.predicate_objects(comp_bnode): if str(p) == (NAMESPACE_MAP['qb'] + "dimension"): dimension_prop = o elif str(p) == (NAMESPACE_MAP['qb'] + "measure"): measure_prop = o prop_uri = dimension_prop if dimension_prop else measure_prop if not prop_uri: continue # skip measureType dimension if str(prop_uri) == measure_type_uri_str: continue # see if prop_uri is dimension or measure prop_types = list(graph.objects(prop_uri, RDF.type)) var_name = _uri_to_var_name(prop_uri) if qb_dim_prop in prop_types: dim_list.append(var_name) is_measure_str = "No" elif qb_meas_prop in prop_types: meas_list.append(var_name) is_measure_str = "Yes" else: # unknown continue # altLabel if any alt_label_obj = list(graph.objects(prop_uri, SKOS.altLabel)) alt_label = str(alt_label_obj[0]) if alt_label_obj else None # category if any cat_prop = URIRef(NAMESPACE_MAP['mds'] + "category") cat_obj = list(graph.objects(prop_uri, cat_prop)) category_val = None if cat_obj: cat_str = str(cat_obj[0]) category_val = cat_str.split('#')[-1].split('/')[-1] var_meta[var_name] = { "IsMeasure": is_measure_str, "AltLabel": alt_label, "Category": category_val, "Unit": [], "ExistingURI": str(prop_uri) } return (dim_list, meas_list, var_meta) def _extract_data_for_dataset( graph: Graph, dataset_uri: URIRef, dimensions: list, measures: list, variable_metadata: dict ) -> pd.DataFrame: """ Description: For a single qb:DataSet (URI), gather all qb:Slice URIs => gather Observations => build a wide DataFrame. Each row in the DataFrame corresponds to a unique combination of dimension values (keys), storing each measure in columns. Also captures unit references. Algorithm: 1) Find slices via (dataset_uri, qb:slice, slice_uri). 2) For each slice => gather Observations => map observation => slice. 3) For each Observation: a) identify measure property (via measureType). b) gather measure value from (observation, measureProp). c) gather dimension values from slice, store in dim_values dict. d) record in dimension_grouped_data[dimension_key][measure_name] = measure_value e) capture unit if sdmx-attribute:unitMeasure is present 4) Convert dimension_grouped_data => pd.DataFrame. Args: graph (Graph): The RDF graph for the entire file. dataset_uri (URIRef): The qb:DataSet node dimensions (list): dimension column names measures (list): measure column names variable_metadata (dict): partial metadata to update with any discovered units Returns: pd.DataFrame: wide-format of dimension + measure columns. May be empty if no data found. """ qb_slice_uri = URIRef(NAMESPACE_MAP['qb'] + "slice") qb_observation_uri = URIRef(NAMESPACE_MAP['qb'] + "observation") measure_type_uri = URIRef(NAMESPACE_MAP['qb'] + "measureType") # gather slices in this dataset slices_in_dataset = [] for s in graph.objects(dataset_uri, qb_slice_uri): slices_in_dataset.append(s) # map observation => slice obs_to_slice = {} for sl in slices_in_dataset: for obs in graph.objects(sl, qb_observation_uri): obs_to_slice[obs] = sl dimension_grouped_data = {} unit_measure_uri = URIRef(NAMESPACE_MAP['sdmx-attribute'] + "unitMeasure") for obs in obs_to_slice: # find measure property => measure_name measure_prop = None for mo in graph.objects(obs, measure_type_uri): measure_prop = mo break if not measure_prop: continue measure_name = _uri_to_var_name(measure_prop) if measure_name not in measures: # skip unknown measure continue # measure_value measure_val = None for p, val in graph.predicate_objects(obs): if p == measure_prop: measure_val = val.toPython() if isinstance(val, Literal) else str(val) break if measure_val is None: continue # dimension values from slice slice_uri = obs_to_slice[obs] dim_values = {} if slice_uri: for dim_name in dimensions: dim_uri = URIRef(variable_metadata[dim_name]["ExistingURI"]) for obj in graph.objects(slice_uri, dim_uri): dim_values[dim_name] = obj.toPython() if isinstance(obj, Literal) else str(obj) # store in dimension_grouped_data dim_key = tuple((dn, dim_values.get(dn)) for dn in dimensions) if dim_key not in dimension_grouped_data: dimension_grouped_data[dim_key] = dim_values.copy() dimension_grouped_data[dim_key][measure_name] = measure_val # capture observation-level unit if any for obj in graph.objects(obs, unit_measure_uri): unit_val = str(obj) if "Unit" in variable_metadata[measure_name]: if unit_val not in variable_metadata[measure_name]["Unit"]: variable_metadata[measure_name]["Unit"].append(unit_val) if dimension_grouped_data: data_rows = list(dimension_grouped_data.values()) df = pd.DataFrame(data_rows) else: df = pd.DataFrame() return df # ------------------------------------------------------------------------------ # UTILITY: FILE & FORMAT # ------------------------------------------------------------------------------ def _collect_rdf_files(file_path: str) -> list: """ Description: Collects all .ttl/.jsonld/.json-ld files from 'file_path' if it's a directory (recursively), or just checks if 'file_path' is one valid file. Algorithm: 1) If directory => recursively os.walk, gather matching files. 2) If single file => check extension, add if valid. 3) Return list of absolute file paths found. Args: file_path (str): either a directory or single file path Returns: list of str: The matched RDF file paths """ valid_exts = ('.ttl', '.jsonld', '.json-ld') found = [] if os.path.isdir(file_path): # walk recursively for root, dirs, files in os.walk(file_path): for fname in files: ext = os.path.splitext(fname)[1].lower() if ext in valid_exts: found.append(os.path.join(root, fname)) else: # single file ext = os.path.splitext(file_path)[1].lower() if ext in valid_exts: found.append(os.path.abspath(file_path)) return found def _guess_rdf_format(file_path: str) -> str: """ Description: Guesses the RDF format for rdflib.parse based on extension. Returns 'turtle' for .ttl, 'json-ld' for .jsonld/.json-ld, else 'turtle'. """ ext = os.path.splitext(file_path)[1].lower() if ext == '.ttl': return 'turtle' elif ext in ('.jsonld', '.json-ld'): return 'json-ld' return 'turtle' def _uri_to_var_name(uri_val) -> str: """ Description: Splits a URI on '#' or '/', returning the last part as the variable name. e.g. http://example.org#ExperimentId => "ExperimentId" """ uri_str = str(uri_val) part = uri_str.split('#')[-1] part = part.split('/')[-1] return part # ------------------------------------------------------------------------------ # PRINTING FINAL STATS & PREVIEW # ------------------------------------------------------------------------------ def _print_final_stats_and_preview(df: pd.DataFrame, var_categories: dict, rdf_files: list, file_path: str) -> None: """ Description: Prints a summary of the final merged DataFrame: number of rows, columns, distinct categories, plus a preview of the first row if any data. Args: df (pd.DataFrame): The final merged DataFrame. var_categories (dict): column => category rdf_files (list): The list of all parsed RDF file paths file_path (str): The original user-supplied path (file or folder). """ num_rows = len(df) num_cols = len(df.columns) # gather categories for columns that exist in df distinct_categories = set() for col in df.columns: ccat = var_categories.get(col, "") if ccat: distinct_categories.add(ccat) print("\n=== Final Conversion Stats ===") if len(rdf_files) == 1: print(f"Source: Single RDF file => {rdf_files[0]}") else: print(f"Source: {len(rdf_files)} RDF files from => {file_path}") print(f"Total Rows (Experiments): {num_rows}") print(f"Total Columns (Variables): {num_cols}") if distinct_categories: cats_sorted = sorted(distinct_categories) print(f"Distinct Categories Found: {len(cats_sorted)} ({', '.join(cats_sorted)})") else: print("Distinct Categories Found: 0") if num_rows > 0: print("\n=== First Row Preview ===") print(df.iloc[0].to_dict()) else: print("\nNo data rows found.")