prediction_scan
# Step 1: Load and inspect current data
import pandas as pd
import numpy as np
from pathlib import Path
import re
OUTPUT_DIR = Path(r"C:\Users\Hp\scan_prescription\prescription_scan\outputs")
LAB_CSV = OUTPUT_DIR / 'lab_reports.csv'
print("="*70)
print("STEP 1: Loading Current Dataset")
print("="*70)
df_raw = pd.read_csv(LAB_CSV)
print(f"✓ Loaded {len(df_raw)} rows from {LAB_CSV.name}")
print(f"✓ Columns: {list(df_raw.columns)}")
print(f"✓ Tests with names: {df_raw['test_name'].notna().sum()}")
print(f"✓ Unique tests: {df_raw['test_name'].nunique()}")
print(f"✓ Unique images: {df_raw['image'].nunique()}")
print("\n" + "="*70)
print("Data Quality Overview:")
print("="*70)
for col in df_raw.columns:
non_null = df_raw[col].notna().sum()
pct = (non_null / len(df_raw)) * 100
print(f" {col:20s}: {non_null:5d} / {len(df_raw)} ({pct:5.1f}%)")
df_raw.head(10)
# Step 2: Comprehensive Data Cleaning
print("\n" + "="*70)
print("STEP 2: Data Cleaning & Normalization")
print("="*70)
df = df_raw.copy()
# 2.1: Clean test names - normalize medical terms
def clean_test_name(name):
if pd.isna(name) or not str(name).strip():
return None
name = str(name).strip()
# Canonical mapping for common typos
canonical_map = {
'hemoolobin': 'Hemoglobin',
'haemoglobin': 'Hemoglobin',
'hb': 'Hemoglobin',
'tlc': 'Total Leukocyte Count',
'wbc': 'Total Leukocyte Count',
'rbc': 'Total RBC Count',
'mcv': 'Mean Corpuscular Volume',
'mch': 'Mean Cell Haemoglobin',
'mchc': 'Mean Cell Haemoglobin Concentration',
'pcv': 'Packed Cell Volume',
'esr': 'Erythrocyte Sedimentation Rate',
'neutrophil': 'Neutrophils',
'lymphocyte': 'Lymphocytes',
'monocyte': 'Monocytes',
'eosinophil': 'Eosinophils',
'basophil': 'Basophils',
}
# Normalize to lowercase for lookup
name_lower = name.lower().strip()
# Remove trailing H/L flags
name_lower = re.sub(r'\s+[hl]$', '', name_lower, flags=re.IGNORECASE)
# Check canonical map
if name_lower in canonical_map:
return canonical_map[name_lower]
# Title case for standard medical terms
name = name.title()
# Fix common patterns
name = re.sub(r'\bMcv\b', 'MCV', name)
name = re.sub(r'\bMch\b', 'MCH', name)
name = re.sub(r'\bMchc\b', 'MCHC', name)
name = re.sub(r'\bRbc\b', 'RBC', name)
name = re.sub(r'\bTlc\b', 'TLC', name)
name = re.sub(r'\bWbc\b', 'WBC', name)
name = re.sub(r'\bPcv\b', 'PCV', name)
name = re.sub(r'\bEsr\b', 'ESR', name)
return name
# 2.2: Clean results - standardize numeric formats
def clean_result(value):
if pd.isna(value):
return None
val = str(value).strip()
if not val:
return None
# Remove trailing H/L flags
val = re.sub(r'\s+(?:High|Low|H|L)$', '', val, flags=re.IGNORECASE).strip()
# Keep only valid numeric patterns
if re.match(r'^[\d\.\,\-\<\>]+$', val):
return val
return val
# 2.3: Clean normal ranges
def clean_normal_range(value):
if pd.isna(value):
return None
val = str(value).strip()
if not val or val == 'nan':
return None
# Keep only valid range patterns
if re.search(r'[\d\.\-]', val):
# Clean up spacing
val = re.sub(r'\s+', ' ', val)
return val
return None
# Apply cleaning
print("\nCleaning test names...")
df['test_name_clean'] = df['test_name'].apply(clean_test_name)
print("Cleaning results...")
df['result_clean'] = df['result'].apply(clean_result)
print("Cleaning normal ranges...")
df['normal_range_clean'] = df['normal_range'].apply(clean_normal_range)
# Remove rows with no clean test name
df_clean = df[df['test_name_clean'].notna()].copy()
print(f"\n✓ Retained {len(df_clean)} / {len(df)} rows with valid test names")
print(f"✓ Unique clean test names: {df_clean['test_name_clean'].nunique()}")
# Replace original columns with cleaned versions
df_clean['test_name'] = df_clean['test_name_clean']
df_clean['result'] = df_clean['result_clean']
df_clean['normal_range'] = df_clean['normal_range_clean']
df_clean = df_clean.drop(columns=['test_name_clean', 'result_clean', 'normal_range_clean'])
print("\nMost common test names after cleaning:")
print(df_clean['test_name'].value_counts().head(15))
df_clean.head(10)
# Step 3: Clean Patient Metadata
print("\n" + "="*70)
print("STEP 3: Clean Patient Metadata")
print("="*70)
def clean_patient_name(name):
if pd.isna(name) or not str(name).strip():
return None
name = str(name).strip()
# Remove titles
name = re.sub(r'\b(?:MR\.?|MS\.?|MRS\.?|DR\.?)\s*', '', name, flags=re.IGNORECASE)
# Remove non-alpha characters except spaces and dots
name = re.sub(r'[^A-Za-z\s\.]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name if len(name) > 2 else None
def clean_age(age):
if pd.isna(age):
return None
age_str = str(age).strip()
# Extract numeric age
match = re.search(r'(\d{1,3})', age_str)
if match:
age_num = int(match.group(1))
if 0 <= age_num <= 120:
return str(age_num)
return None
def clean_gender(gender):
if pd.isna(gender):
return None
g = str(gender).strip().upper()
if g in ['M', 'MALE']:
return 'Male'
elif g in ['F', 'FEMALE']:
return 'Female'
return None
def clean_date(date):
if pd.isna(date):
return None
date_str = str(date).strip()
# Try to parse and normalize to YYYY-MM-DD
from datetime import datetime
patterns = [
r'(\d{4})-(\d{1,2})-(\d{1,2})',
r'(\d{1,2})[/-](\d{1,2})[/-](\d{4})',
r'(\d{1,2})[/-](\d{1,2})[/-](\d{2})',
]
for pattern in patterns:
match = re.search(pattern, date_str)
if match:
try:
if pattern == patterns[0]: # YYYY-MM-DD
y, m, d = match.groups()
dt = datetime(int(y), int(m), int(d))
elif pattern == patterns[1]: # DD/MM/YYYY
d, m, y = match.groups()
dt = datetime(int(y), int(m), int(d))
else: # DD/MM/YY
d, m, y = match.groups()
year = 2000 + int(y) if int(y) < 50 else 1900 + int(y)
dt = datetime(year, int(m), int(d))
return dt.strftime('%Y-%m-%d')
except:
pass
return None
# Apply metadata cleaning
df_clean['patient_name'] = df_clean['patient_name'].apply(clean_patient_name)
df_clean['age'] = df_clean['age'].apply(clean_age)
df_clean['gender'] = df_clean['gender'].apply(clean_gender)
df_clean['report_date'] = df_clean['report_date'].apply(clean_date)
print(f"✓ Patient names cleaned: {df_clean['patient_name'].notna().sum()} / {len(df_clean)}")
print(f"✓ Ages cleaned: {df_clean['age'].notna().sum()} / {len(df_clean)}")
print(f"✓ Genders cleaned: {df_clean['gender'].notna().sum()} / {len(df_clean)}")
print(f"✓ Dates cleaned: {df_clean['report_date'].notna().sum()} / {len(df_clean)}")
df_clean.head(10)
# Step 4: Save Cleaned Dataset
print("\n" + "="*70)
print("STEP 4: Save Cleaned Dataset")
print("="*70)
# Save the cleaned dataset
CLEAN_CSV = OUTPUT_DIR / 'lab_reports_cleaned.csv'
df_clean.to_csv(CLEAN_CSV, index=False)
print(f"✓ Saved cleaned dataset: {CLEAN_CSV}")
print(f"✓ Total rows: {len(df_clean)}")
print(f"✓ Total images: {df_clean['image'].nunique()}")
print(f"✓ Total unique tests: {df_clean['test_name'].nunique()}")
# Also update the original file
df_clean.to_csv(LAB_CSV, index=False)
print(f"✓ Updated original file: {LAB_CSV}")
print("\n" + "="*70)
print("PREPROCESSING COMPLETE!")
print("="*70)
print(f"✓ Clean dataset ready for spaCy format conversion")
print(f"✓ Next: Run spacy_format.ipynb to create training files")
print("="*70)
# Show final statistics
print("\nFinal Dataset Statistics:")
print(f" Rows: {len(df_clean)}")
print(f" Images: {df_clean['image'].nunique()}")
print(f" Unique tests: {df_clean['test_name'].nunique()}")
print(f" Completeness:")
print(f" - Patient names: {(df_clean['patient_name'].notna().sum() / len(df_clean) * 100):.1f}%")
print(f" - Ages: {(df_clean['age'].notna().sum() / len(df_clean) * 100):.1f}%")
print(f" - Genders: {(df_clean['gender'].notna().sum() / len(df_clean) * 100):.1f}%")
print(f" - Dates: {(df_clean['report_date'].notna().sum() / len(df_clean) * 100):.1f}%")
print(f" - Test names: {(df_clean['test_name'].notna().sum() / len(df_clean) * 100):.1f}%")
print(f" - Results: {(df_clean['result'].notna().sum() / len(df_clean) * 100):.1f}%")
print(f" - Normal ranges: {(df_clean['normal_range'].notna().sum() / len(df_clean) * 100):.1f}%")
df_clean
import os
from pathlib import Path
import sys
import re
import json
import shutil
import numpy as np
import pandas as pd
import cv2
import PIL
from PIL import Image
import pytesseract
import matplotlib.pyplot as plt
from datetime import datetime
if hasattr(pytesseract, 'pytesseract'):
_pt = pytesseract.pytesseract
else:
_pt = pytesseract
default_tesseract_paths = [
r"C:\\Program Files\\Tesseract-OCR\\tesseract.exe",
r"C:\\Program Files (x86)\\Tesseract-OCR\\tesseract.exe"
]
if not shutil.which('tesseract') and not Path(_pt.tesseract_cmd).exists():
for p in default_tesseract_paths:
if Path(p).exists():
_pt.tesseract_cmd = p
break
DATA_DIR = Path(r"C:\\Users\\Hp\\scan_prescription\\prescription_scan\\data")
OUTPUT_DIR = Path(r"C:\\Users\\Hp\\scan_prescription\\prescription_scan\\outputs")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
RAW_OCR_DIR = OUTPUT_DIR / "ocr_raw"
RAW_OCR_DIR.mkdir(exist_ok=True)
IMG_EXTS = {'.png', '.jpg', '.jpeg', '.tif', '.tiff', '.bmp'}
image_files = sorted([p for p in DATA_DIR.glob('*') if p.suffix.lower() in IMG_EXTS])
print(f"Found {len(image_files)} image(s) in {DATA_DIR}")
print("First few:")
for p in image_files[:5]:
print(" -", p.name)
print("Tesseract path:", _pt.tesseract_cmd)
print("Tesseract in PATH:", shutil.which('tesseract'))
import math
def read_image(path: Path):
img = cv2.imread(str(path))
if img is None:
raise FileNotFoundError(f"Could not read image: {path}")
return img
def resize_up(img, factor=2):
h, w = img.shape[:2]
return cv2.resize(img, (w*factor, h*factor), interpolation=cv2.INTER_CUBIC)
def to_gray(img):
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
def denoise(img_gray):
return cv2.bilateralFilter(img_gray, d=7, sigmaColor=75, sigmaSpace=75)
def binarize(img_gray):
return cv2.adaptiveThreshold(img_gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 31, 10)
def deskew(gray):
edges = cv2.Canny(gray, 50, 150)
lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=150)
if lines is None:
return gray
angles = []
for rho_theta in lines[:50]:
for rho, theta in rho_theta:
angle = (theta * 180 / np.pi) - 90
if -45 < angle < 45:
angles.append(angle)
if not angles:
return gray
median_angle = np.median(angles)
h, w = gray.shape[:2]
M = cv2.getRotationMatrix2D((w//2, h//2), median_angle, 1.0)
return cv2.warpAffine(gray, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
def preprocess_for_ocr(img):
up = resize_up(img, factor=2)
gray = to_gray(up)
dsk = deskew(gray)
dn = denoise(dsk)
bw = binarize(dn)
return bw
print("Preprocessing utilities loaded.")
def extract_table_data(text: str):
lines = text.split('\n')
table_start_idx = -1
for i, line in enumerate(lines):
line_lower = line.lower()
if ('investigation' in line_lower and ('observed' in line_lower or 'result' in line_lower)) or \
('investigation' in line_lower and 'reference' in line_lower):
table_start_idx = i + 1
break
if table_start_idx == -1:
return []
tests = []
for idx in range(table_start_idx, len(lines)):
line = lines[idx].strip()
if not line:
continue
if any(kw in line.lower() for kw in ['note:', 'signature', 'page', 'dr.', 'md (', 'interpretation',
'thanks for', 'end of report', 'generated on', 'instruments:',
'medical lab', 'technician', 'pathologist']):
break
if re.match(r'^[A-Z][A-Z\s]+$', line) and len(line.split()) <= 5:
continue
if 'Primary Sample Type' in line or 'Calculate' in line:
continue
match = re.match(r'^(.+?)\s+([\d\.]+(?:\s+(?:Low|High|Borderline))?)\s+([\d\.]+\s*[-–to]\s*[\d\.]+.*)', line)
if match:
test_name = match.group(1).strip()
result_part = match.group(2).strip()
normal_range = match.group(3).strip()
result_clean = re.sub(r'\s*(Low|High|Borderline)\s*$', '', result_part).strip()
if test_name.lower() not in ['investigation', 'result', 'reference', 'value', 'unit']:
tests.append({
'test_name': test_name,
'result': result_clean,
'normal_range': normal_range
})
return tests
def parse_lab_report(text: str):
norm_text = normalize_text(text)
patient_name = extract_patient_name_top(norm_text)
age = extract_age(norm_text)
gender = extract_gender(norm_text)
report_date = extract_report_date(norm_text)
report_name = extract_report_title(norm_text)
tests = extract_table_data(norm_text)
return {
'patient_name': patient_name,
'age': age,
'gender': gender,
'report_date': report_date,
'report_name': report_name,
'tests': tests
}
print("Lab report parsing functions ready.")
training_rows = []
for rec in raw_text_records:
img_name = rec['image']
text = rec['text']
parsed = parse_lab_report(text)
patient_name = parsed['patient_name']
age = parsed['age']
gender = parsed['gender']
report_date = parsed['report_date']
report_name = parsed['report_name']
tests = parsed['tests']
if not tests:
training_rows.append({
'image': img_name,
'text': text,
'patient_name': patient_name,
'age': age,
'gender': gender,
'report_date': report_date,
'report_name': report_name,
'test_name': None,
'result': None,
'normal_range': None
})
else:
for t in tests:
training_rows.append({
'image': img_name,
'text': text,
'patient_name': patient_name,
'age': age,
'gender': gender,
'report_date': report_date,
'report_name': report_name,
'test_name': t['test_name'],
'result': t['result'],
'normal_range': t.get('normal_range')
})
training_df = pd.DataFrame(training_rows)
training_csv = OUTPUT_DIR / 'spacy_training_data.csv'
training_df.to_csv(training_csv, index=False)
print(f"\n{'='*60}")
print(f"TRAINING CSV GENERATED: {training_csv}")
print(f"{'='*60}")
print(f"Total rows: {len(training_df)}")
print(f"Total images: {training_df['image'].nunique()}")
print(f"Columns: {list(training_df.columns)}")
print(f"\nSample rows:")
print(training_df.head(15))
print("="*70)
print("FINAL TRAINING CSV SUMMARY")
print("="*70)
print(f"File: {training_csv}")
print(f"Size: {training_csv.stat().st_size / 1024:.2f} KB")
print(f"\nData Quality:")
print(f" Images with patient names: {training_df['patient_name'].notna().sum()}")
print(f" Images with age: {training_df['age'].notna().sum()}")
print(f" Images with gender: {training_df['gender'].notna().sum()}")
print(f" Images with report date: {training_df['report_date'].notna().sum()}")
print(f" Images with report name: {training_df['report_name'].notna().sum()}")
print(f" Rows with test data: {training_df['test_name'].notna().sum()}")
print(f"\nColumn Names:")
print(f" {', '.join(training_df.columns)}")
print("\n" + "="*70)
print("✓ Ready for spaCy NER training!")
print("="*70)
from pathlib import Path
import pandas as pd
import re
from typing import List, Tuple, Dict, Optional
# Paths
LAB_CSV = Path(r"C:\Users\Hp\scan_prescription\prescription_scan\outputs\lab_reports.csv")
TOK_OUT = OUTPUT_DIR / 'lab_reports_bio_tokens.csv'
print(f"Reading: {LAB_CSV}")
lab_df = pd.read_csv(LAB_CSV)
print(lab_df.shape)
lab_df.head(3)
# Simple whitespace tokenization with character spans
def simple_tokenize_with_spans(text: str) -> List[Tuple[str, int, int]]:
tokens = []
i = 0
while i < len(text):
if text[i].isspace():
i += 1
continue
j = i
while j < len(text) and not text[j].isspace():
j += 1
tokens.append((text[i:j], i, j))
i = j
return tokens
# Build entity spans from row fields
def build_entity_spans(row: pd.Series) -> Dict[str, List[Tuple[int, int]]]:
text = str(row['text'])
spans: Dict[str, List[Tuple[int, int]]] = {}
def add_span(label: str, value: Optional[str]):
if not value or not isinstance(value, str):
return
value = value.strip()
if not value:
return
# Use first exact match occurrence
m = re.search(re.escape(value), text)
if m:
spans.setdefault(label, []).append((m.start(), m.end()))
# Patient-level fields
add_span('PATIENT_NAME', row.get('patient_name'))
# age could be non-string; convert
add_span('AGE', str(row.get('age')) if pd.notna(row.get('age')) else None)
add_span('GENDER', row.get('gender'))
add_span('REPORT_DATE', row.get('report_date'))
add_span('REPORT_NAME', row.get('report_name'))
# Test-level field for this row (if any)
add_span('TEST_NAME', row.get('test_name'))
add_span('RESULT', str(row.get('result')) if pd.notna(row.get('result')) else None)
add_span('NORMAL_RANGE', row.get('normal_range'))
return spans
# BIO-aware group generator
class GroupGen:
def __init__(self):
self.id = 0
self.prev_label = None
self.prev_bio = 'O'
def getgroup(self, bio_label: str) -> int:
# bio_label examples: 'B-PATIENT_NAME', 'I-PATIENT_NAME', 'O'
if bio_label == 'O':
self.prev_bio = 'O'
self.prev_label = None
return -1
bio = bio_label.split('-', 1)[0]
label = bio_label.split('-', 1)[1] if '-' in bio_label else None
if self.prev_bio == 'I' and label == self.prev_label:
# same entity continues
return self.id
if self.prev_bio == 'B' and label == self.prev_label and bio == 'I':
# still same chunk
self.prev_bio = 'I'
return self.id
# new chunk when B-* starts or label changes
self.id += 1
self.prev_bio = bio
self.prev_label = label
return self.id
def spans_to_bio(tokens: List[Tuple[str,int,int]], spans_by_label: Dict[str, List[Tuple[int,int]]]) -> List[str]:
# Expand span map into a per-char label map prioritizing longer spans
text_len = tokens[-1][2] if tokens else 0
char_labels = [['O', None] for _ in range(text_len)] # [bio, label]
# Collect all spans with labels
labeled_spans = []
for label, spans in spans_by_label.items():
for s,e in spans:
if 0 <= s < e <= text_len:
labeled_spans.append((s,e,label))
# Sort spans by length desc to prefer longer matches
labeled_spans.sort(key=lambda x: (x[1]-x[0]), reverse=True)
used = [False]*len(labeled_spans)
for idx,(s,e,lbl) in enumerate(labeled_spans):
overlap = any(not (e <= labeled_spans[j][0] or s >= labeled_spans[j][1])
for j in range(idx) if used[j])
if overlap:
continue
used[idx] = True
for i in range(s,e):
if char_labels[i][0] == 'O':
char_labels[i] = ['I', lbl]
# Now mark B at token starts
bio_tags = []
for tok, s, e in tokens:
# Examine first char of token
if s < text_len and char_labels[s][0] != 'O':
lbl = char_labels[s][1]
# If previous char is outside or different label -> B else I
prev_inside = (s-1 >= 0 and char_labels[s-1][0] != 'O' and char_labels[s-1][1] == lbl)
bio = 'I' if prev_inside else 'B'
bio_tags.append(f"{bio}-{lbl}")
else:
bio_tags.append('O')
return bio_tags
# Process each row into token BIO with groups
bio_rows = []
for idx, row in lab_df.iterrows():
text = str(row['text']) if pd.notna(row['text']) else ''
tokens = simple_tokenize_with_spans(text)
spans = build_entity_spans(row)
bio = spans_to_bio(tokens, spans)
grp = GroupGen()
for (tok, s, e), tag in zip(tokens, bio):
gid = grp.getgroup(tag)
bio_rows.append({
'image': row.get('image'),
'token': tok,
'start': s,
'end': e,
'bio': tag,
'group': gid
})
bio_df = pd.DataFrame(bio_rows)
bio_df.to_csv(TOK_OUT, index=False)
print(f"Saved BIO tokens: {TOK_OUT}")
bio_df.head(20)
# Verify lab_reports.csv structure and content
from pathlib import Path
import pandas as pd
csv_path = Path(r"C:\Users\Hp\scan_prescription\prescription_scan\outputs\lab_reports.csv")
assert csv_path.exists(), f"Missing file: {csv_path}"
df = pd.read_csv(csv_path)
required_cols = ['image','text','patient_name','age','gender','report_date','report_name','test_name','result','normal_range']
missing = [c for c in required_cols if c not in df.columns]
if missing:
print(f"⚠ Missing columns: {missing}")
else:
print("✓ All required columns present")
print("Rows:", len(df))
print("Unique images:", df['image'].nunique() if 'image' in df.columns else 'N/A')
print("Tests rows with non-empty test_name:", df['test_name'].notna().sum() if 'test_name' in df.columns else 'N/A')
print("\nSample rows:")
display(df.head(10))
No comments yet. Be the first to share your thoughts!