Source code for FAIRLinked.QBWorkflow.data_parser

import openpyxl
import pandas as pd
from FAIRLinked.QBWorkflow.utility import (
    ALT_LABEL_INSTR,
    UNIT_INSTR,
    IS_MEASURE_INSTR,
    EXISTING_URI_INSTR,
)

[docs] def read_excel_template(file_path): """ Reads the Excel data file generated by generate_data_xlsx_template and returns: - A flat dictionary containing metadata for each variable, including the category. - A DataFrame containing the experimental data with variable names (without category prefixes). Args: file_path (str): The path to the Excel file. Returns: tuple: A tuple containing: - variable_metadata (dict): Flat dictionary with variable names as keys, and values being metadata dictionaries including 'Category'. - df (DataFrame): Experimental data with variable names as columns (without category prefixes). """ # Instruction texts mapping instruction_texts = { "AltLabel": ALT_LABEL_INSTR, "Unit": UNIT_INSTR, "IsMeasure": IS_MEASURE_INSTR, "ExistingURI": EXISTING_URI_INSTR } # Mapping of instruction texts to shorter keys instruction_mappings = {v: k for k, v in instruction_texts.items()} # Load the workbook and select the active worksheet wb = openpyxl.load_workbook(file_path, data_only=True) ws = wb.active # Read static instructions from rows 2-5 in Column A static_instructions = {} for row in range(2, 6): # Rows 2 to 5 inclusive cell_value = ws.cell(row=row, column=1).value if cell_value: instruction_text = cell_value.strip() # Map to shorter instruction keys instruction_key = instruction_mappings.get(instruction_text, instruction_text) static_instructions[row] = instruction_key else: static_instructions[row] = None # Create a flat dictionary for variable metadata variable_metadata = {} # {variable_name: {Category: ..., instruction1: value1, ...}} # Get the category headers from row 1, only considering merged cells categories = [] category_columns = {} # {category: (start_col, end_col)} merged_cells = ws.merged_cells.ranges # Process merged cells for categories for merged_cell in merged_cells: if merged_cell.min_row == 1: start_col = merged_cell.min_col end_col = merged_cell.max_col # Exclude Column 2 (Instruction cell) if start_col == 2: continue category_name = ws.cell(row=1, column=start_col).value if category_name: categories.append(category_name) category_columns[category_name] = (start_col, end_col) # Build a mapping from column indices to categories column_to_category = {} for category, (start_col, end_col) in category_columns.items(): for col_idx in range(start_col, end_col + 1): column_to_category[col_idx] = category # Now read the experimental data into a DataFrame data_values = [] data_columns = [] # Iterate over all columns from column 1 to ws.max_column to maintain order for col in range(1, ws.max_column + 1): if col == 1: continue # Skip Column 1 (instructions) variable_name = ws.cell(row=6, column=col).value if variable_name: category_found = column_to_category.get(col) # Clean variable name clean_variable = str(variable_name).strip().replace(" ", "_") data_columns.append((col, clean_variable)) # Build metadata for this variable metadata = {'Category': category_found} for row in range(2, 6): # Rows 2-5 (Static instructions) instruction_key = static_instructions.get(row) cell_value = ws.cell(row=row, column=col).value if cell_value is not None: value = str(cell_value).strip() else: value = None if instruction_key: metadata[instruction_key] = value # Add to variable_metadata with variable name as key variable_metadata[clean_variable] = metadata else: # Variable name is missing in row 6 pass # Skip columns without variable names # Read data rows for row in ws.iter_rows(min_row=7, max_row=ws.max_row): row_values = [] for col, variable_name in data_columns: cell_value = row[col - 1].value # Adjust for zero-based index row_values.append(cell_value) data_values.append(row_values) # Create the DataFrame df = pd.DataFrame(data_values, columns=[var_name for _, var_name in data_columns]) # Remove any columns with all NaN values df.dropna(axis=1, how='all', inplace=True) # Reset index df.reset_index(drop=True, inplace=True) # Ensure 'ExperimentId' is present and filled if 'ExperimentId' not in df.columns or df['ExperimentId'].isnull().any(): df['ExperimentId'] = range(1, len(df) + 1) return variable_metadata, df