Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added __pycache__/normalizer.cpython-314.pyc
Binary file not shown.
Binary file added __pycache__/schemas.cpython-314.pyc
Binary file not shown.
Binary file added __pycache__/validator.cpython-314.pyc
Binary file not shown.
162 changes: 162 additions & 0 deletions mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import pandas as pd
from rapidfuzz import fuzz

from schemas import SCHEMA_SYNONYMS
from normalizer import normalize_column
from validator import REQUIRED_FIELDS


def map_column(column_name):

normalized = normalize_column(column_name)

best_match = None
highest_score = 0
matched_synonym = None

for schema_field, synonyms in SCHEMA_SYNONYMS.items():

for synonym in synonyms:

score = fuzz.ratio(normalized, synonym)

if score > highest_score:
highest_score = score
best_match = schema_field
matched_synonym = synonym

if highest_score >= 90:
status = "AUTO_MAPPED"

elif highest_score >= 70:
status = "REVIEW_REQUIRED"

else:
status = "UNKNOWN"

return {
"original_column": column_name,
"normalized_column": normalized,
"mapped_to": best_match,
"matched_synonym": matched_synonym,
"confidence": highest_score,
"status": status
}


def detect_schema(columns):

normalized_columns = [
normalize_column(col)
for col in columns
]

mutation_keywords = [
"gene",
"gene_symbol",
"hugo_symbol",
"chromosome",
"start_position",
"end_position"
]

mutation_score = 0

for col in normalized_columns:

if col in mutation_keywords:
mutation_score += 1

if mutation_score > 0:
return "MUTATION"

return "UNKNOWN"


def validate_mappings(mapped_results, schema_type):

mapped_fields = [
item["mapped_to"]
for item in mapped_results
]

required_fields = REQUIRED_FIELDS.get(schema_type, [])

errors = []

for field in required_fields:

if field not in mapped_fields:

errors.append(
f"Missing required field: {field}"
)

return errors


def print_preview(results):

print("\nCOLUMN MAPPINGS:\n")

for result in results:

print(
f"{result['original_column']}"
f" --> "
f"{result['mapped_to']}"
f" ({result['confidence']}%)"
)


def process_file(csv_file):

df = pd.read_csv(csv_file)

columns = list(df.columns)

schema_type = detect_schema(columns)

print(f"\nDetected Schema: {schema_type}")

mapped_results = []

for col in columns:

result = map_column(col)

mapped_results.append(result)

print_preview(mapped_results)

errors = validate_mappings(
mapped_results,
schema_type
)

print("\nVALIDATION:\n")

if not errors:

print("Validation Passed ✅")

else:

for error in errors:
print(error)


if __name__ == "__main__":

import sys

if len(sys.argv) != 2:

print(
"Usage:\n"
"python mapper.py input.csv"
)

else:

process_file(sys.argv[1])
203 changes: 203 additions & 0 deletions normalizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import re
import pandas as pd


# -----------------------------------------------------------------------------
# Canonical cBioPortal column mappings
# -----------------------------------------------------------------------------

COLUMN_MAP = {

# Patient identifiers
"patient": "PATIENT_ID",
"patient_id": "PATIENT_ID",
"patient id": "PATIENT_ID",
"sample_patient": "PATIENT_ID",

# Sample identifiers
"sample": "SAMPLE_ID",
"sample_id": "SAMPLE_ID",
"sample id": "SAMPLE_ID",

# Sex / gender
"gender": "SEX",
"sex": "SEX",

# Age
"age": "AGE",
"age_at_diagnosis": "AGE",

# Cancer type
"cancer_type": "CANCER_TYPE",
"tumor_type": "CANCER_TYPE",

# Survival
"os_status": "OS_STATUS",
"os_months": "OS_MONTHS",
}


# -----------------------------------------------------------------------------
# Normalize text helper
# -----------------------------------------------------------------------------

def normalize_text(text):
"""
Normalize text for matching:
- lowercase
- strip spaces
- replace underscores
"""

return (
str(text)
.strip()
.lower()
.replace("_", " ")
)


# -----------------------------------------------------------------------------
# NEW SINGLE COLUMN NORMALIZER
# -----------------------------------------------------------------------------

def normalize_column(column_name):
"""
Normalize a SINGLE column name.

Example:
Tumor Sample Barcode
->
tumor_sample_barcode
"""

column_name = str(column_name).strip().lower()

column_name = re.sub(r"[\s\-]+", "_", column_name)

column_name = re.sub(r"[^a-z0-9_]", "", column_name)

column_name = re.sub(r"_+", "_", column_name)

return column_name


# -----------------------------------------------------------------------------
# Normalize column names in dataframe
# -----------------------------------------------------------------------------

def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
"""
Rename messy columns into canonical cBioPortal columns.
"""

new_columns = {}

for col in df.columns:

normalized = normalize_text(col)

if normalized in COLUMN_MAP:

new_columns[col] = COLUMN_MAP[normalized]

else:

# fallback → uppercase cleaned version
new_columns[col] = (
normalized
.replace(" ", "_")
.upper()
)

df = df.rename(columns=new_columns)

return df


# -----------------------------------------------------------------------------
# Normalize values inside columns
# -----------------------------------------------------------------------------

def normalize_values(df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize categorical values.
"""

# Normalize SEX column
if "SEX" in df.columns:

sex_map = {
"m": "MALE",
"male": "MALE",
"f": "FEMALE",
"female": "FEMALE",
}

df["SEX"] = (
df["SEX"]
.astype(str)
.str.strip()
.str.lower()
.map(lambda x: sex_map.get(x, x.upper()))
)

# Normalize OS_STATUS
if "OS_STATUS" in df.columns:

os_map = {
"0": "LIVING",
"1": "DECEASED",
"living": "LIVING",
"deceased": "DECEASED",
}

df["OS_STATUS"] = (
df["OS_STATUS"]
.astype(str)
.str.strip()
.str.lower()
.map(lambda x: os_map.get(x, x.upper()))
)

return df


# -----------------------------------------------------------------------------
# Main normalization pipeline
# -----------------------------------------------------------------------------

def normalize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""
Full normalization pipeline.
"""

df = normalize_columns(df)

df = normalize_values(df)

return df


# -----------------------------------------------------------------------------
# Example local testing
# -----------------------------------------------------------------------------

if __name__ == "__main__":

data = {
"patient": ["P1", "P2"],
"gender": ["male", "f"],
"age": [45, 60],
}

df = pd.DataFrame(data)

print("ORIGINAL DATAFRAME")
print(df)

print("\nNORMALIZED DATAFRAME")

normalized_df = normalize_dataframe(df)

print(normalized_df)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ python-docx>=1.1.0
plotly>=5.18.0
requests>=2.31.0
python-dotenv>=1.0.0
rapidfuzz
Loading