prescription_scan
import numpy as np
import pandas as pd
import string
import re
import pickle
import random
import csv
from pathlib import Path
# Use the wide CSV with patient/test fields (not the BIO tokens CSV)
DATA_FILE = Path(r"C:\Users\Hp\scan_prescription\prescription_scan\outputs\lab_reports.csv")
OUTPUT_DIR = Path(r"C:\Users\Hp\scan_prescription\prescription_scan\outputs")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print(f"Loading data from: {DATA_FILE}")
# lab_reports.csv is a CSV file
df = pd.read_csv(DATA_FILE, sep=',', encoding='utf-8')
print(f"Loaded {len(df)} rows")
print(f"Columns: {list(df.columns)}")
df.head()
# Cleaners tuned for TSV inputs without destroying token boundaries
punctuation = "!#$%&'()*+:;<=>?[\\]^`{|}~" # keep . , - / ( ) for spans
def cleanText(txt: str) -> str:
text = str(txt) if txt is not None else ""
text = re.sub(r"\s+", " ", text).strip()
text = re.sub(r"[{}]".format(re.escape(punctuation)), "", text)
return text
# Identify investigation columns (investigation1..N)
base_cols = ["image", "patient_name", "age", "gender", "report_date", "report_name"]
investigation_cols = [c for c in df.columns if c.lower().startswith("investigation")]
missing = [c for c in base_cols if c not in df.columns]
if missing:
raise ValueError(f"{DATA_FILE.name} is missing columns: {missing}")
if not investigation_cols:
raise ValueError(f"{DATA_FILE.name} has no investigation* columns")
# Clean header fields
df_clean = df[base_cols + investigation_cols].copy()
for col in base_cols:
df_clean[col] = df_clean[col].fillna("").apply(cleanText)
# Convert wide investigations -> long rows
df_long = df_clean.melt(id_vars=base_cols, value_vars=investigation_cols,
var_name="investigation_slot", value_name="test_name")
# Drop empty investigations and clean text
df_long["test_name"] = df_long["test_name"].fillna("").apply(cleanText)
df_long = df_long[~df_long["test_name"].isin(["", "0", "-", "None", "NONE"])]
# Add placeholder result / normal range columns (dataset does not provide these)
df_long["result"] = ""
df_long["normal_range"] = ""
df_clean = df_long
print(f"Rows after exploding investigations: {len(df_clean)}")
print(f"Unique reports: {df_clean['image'].nunique()}")
df_clean.head()
# Build spaCy-style training tuples: (text, {entities: [(start, end, label), ...]})
# We will concatenate fields in a deterministic order and compute character spans.
group = df_clean.groupby(by='image')
images = list(group.groups.keys())
allCardsData = []
for img in images:
img_rows = group.get_group(img)
content_parts = []
entities = []
cursor = {'pos': 0} # use a mutable container to avoid nonlocal/global
def append_with_entity(text_val: str, label: str):
if not text_val or text_val == 'nan':
return
# Strip leading/trailing whitespace from the value
text_val = str(text_val).strip()
if not text_val:
return
start = cursor['pos']
content_parts.append(text_val)
cursor['pos'] += len(text_val)
end = cursor['pos'] # exclusive end for spaCy v3+
entities.append((start, end, label))
# add a space separator after each field
content_parts.append(' ')
cursor['pos'] += 1
# Core patient/report header (use first row values)
first = img_rows.iloc[0]
append_with_entity(str(first.get('patient_name', '')), 'PATIENT_NAME')
append_with_entity(str(first.get('age', '')), 'AGE')
append_with_entity(str(first.get('gender', '')), 'GENDER')
append_with_entity(str(first.get('report_date', '')), 'REPORT_DATE')
# Optionally include report_name as DES or ORG-like label if you want it in NER
# append_with_entity(str(first.get('report_name', '')), 'DES')
# Tests: iterate rows for test_name, result, normal_range
for _, row in img_rows.iterrows():
append_with_entity(str(row.get('test_name', '')), 'TEST_NAME')
append_with_entity(str(row.get('result', '')), 'RESULT')
append_with_entity(str(row.get('normal_range', '')), 'NORMAL_RANGE')
# Finalize content and record sample
text = ''.join(content_parts).strip()
annotations = {'entities': entities}
allCardsData.append((text, annotations))
print(f"Total training examples created: {len(allCardsData)}")
print(f"\nSample example:")
print(f"Text: {allCardsData[0][0][:200]}...")
print(f"Entities: {allCardsData[0][1]['entities'][:8]}")
# Validate entities don't have whitespace at boundaries
def validate_entities(data):
errors = []
for idx, (text, annot) in enumerate(data):
for start, end, label in annot['entities']:
span_text = text[start:end]
if span_text != span_text.strip():
errors.append(f"Example {idx}, {label}: '{span_text}' has whitespace")
return errors
validation_errors = validate_entities(allCardsData)
if validation_errors:
print(f"\n⚠ Found {len(validation_errors)} validation errors:")
for err in validation_errors[:5]:
print(f" - {err}")
else:
print("\n✓ All entities validated successfully!")
# Keep only clean entries and rebuild tuple list
clean_df = card_data_df.query('isNull == "Clean"')[['text', 'labels']].copy()
print(f"Clean data count: {len(clean_df)}")
allCardsData = list(map(lambda x: tuple(x), clean_df.values.tolist()))
print(f"Final training examples: {len(allCardsData)}")
print(f"\nFirst example:")
print(f"Text: {allCardsData[0][0][:150]}...")
print(f"Entities count: {len(allCardsData[0][1]['entities'])}")
random.shuffle(allCardsData)
total_samples = len(allCardsData)
train_split = int(0.8 * total_samples)
TrainData = allCardsData[:train_split]
TestData = allCardsData[train_split:]
print(f"Total samples: {total_samples}")
print(f"Training samples: {len(TrainData)}")
print(f"Testing samples: {len(TestData)}")
# Save as pickles to match your downstream training code
data_dir = OUTPUT_DIR / 'data'
data_dir.mkdir(exist_ok=True)
train_pickle_path = data_dir / 'TrainData.pickle'
test_pickle_path = data_dir / 'TestData.pickle'
# Shuffle and split
random.shuffle(allCardsData)
train_split = int(0.8 * len(allCardsData))
TrainData = allCardsData[:train_split]
TestData = allCardsData[train_split:]
import pickle
pickle.dump(TrainData, open(train_pickle_path, mode='wb'))
pickle.dump(TestData, open(test_pickle_path, mode='wb'))
print("="*70)
print("✓ PICKLE FILES SAVED SUCCESSFULLY")
print("="*70)
print(f"Training data: {train_pickle_path}")
print(f"Testing data: {test_pickle_path}")
print(f"\nFile sizes:")
print(f" TrainData.pickle: {train_pickle_path.stat().st_size / 1024:.2f} KB")
print(f" TestData.pickle: {test_pickle_path.stat().st_size / 1024:.2f} KB")
print("="*70)
import pickle
loaded_train = pickle.load(open(OUTPUT_DIR / 'data' / 'TrainData.pickle', mode='rb'))
loaded_test = pickle.load(open(OUTPUT_DIR / 'data' / 'TestData.pickle', mode='rb'))
print("="*70)
print("VERIFICATION: Loaded pickle files")
print("="*70)
print(f"Training samples loaded: {len(loaded_train)}")
print(f"Testing samples loaded: {len(loaded_test)}")
print(f"\n{'='*70}")
print("Sample Training Example:")
print(f"{'='*70}")
sample = loaded_train[0]
print(f"\nText (first 200 chars):")
print(sample[0][:200])
print(f"\nEntities:")
for i, entity in enumerate(sample[1]['entities'][:8], 1):
start, end, label = entity
text_span = sample[0][start:end+1]
print(f" {i}. [{label}]: '{text_span}'")
print(f"\n{'='*70}")
print("Entity Labels in Dataset:")
print(f"{'='*70}")
all_labels = set()
for text, annotations in loaded_train + loaded_test:
for _, _, label in annotations['entities']:
all_labels.add(label)
print(f"Labels: {', '.join(sorted(all_labels))}")
print(f"Total unique labels: {len(all_labels)}")
print("="*70)
# Build DocBin and save train/test .spacy files
import spacy
from spacy.tokens import DocBin
from spacy.util import filter_spans
NLP = spacy.blank("en")
TRAIN_PATH = OUTPUT_DIR / 'data' / 'train.spacy'
DEV_PATH = OUTPUT_DIR / 'data' / 'test.spacy'
(OUTPUT_DIR / 'data').mkdir(exist_ok=True)
def to_docbin(examples, nlp):
db = DocBin(store_user_data=True)
skipped = 0
for text, ann in examples:
doc = nlp.make_doc(text)
ents = []
for start, end, label in ann['entities']:
# spaCy v3+ uses exclusive end
span = doc.char_span(start, end, label=label, alignment_mode='contract')
if span is not None:
# Verify no leading/trailing whitespace
if span.text == span.text.strip():
ents.append(span)
else:
skipped += 1
else:
skipped += 1
ents = filter_spans(ents)
doc.ents = ents
db.add(doc)
if skipped > 0:
print(f" ⚠ Skipped {skipped} invalid spans")
return db
print("Building training DocBin...")
train_db = to_docbin(TrainData, NLP)
print("Building test DocBin...")
dev_db = to_docbin(TestData, NLP)
train_db.to_disk(TRAIN_PATH)
dev_db.to_disk(DEV_PATH)
print(f"\n✓ Saved train to: {TRAIN_PATH}")
print(f"✓ Saved test to: {DEV_PATH}")
# Quick verification of saved DocBins
from spacy.tokens import DocBin
import spacy
nlp_blank = spacy.blank('en')
train_db = DocBin().from_disk(str(OUTPUT_DIR / 'data' / 'train.spacy'))
test_db = DocBin().from_disk(str(OUTPUT_DIR / 'data' / 'test.spacy'))
train_docs = list(train_db.get_docs(nlp_blank.vocab))
test_docs = list(test_db.get_docs(nlp_blank.vocab))
print(f"Train docs: {len(train_docs)} | Test docs: {len(test_docs)}")
labels = set()
for d in train_docs + test_docs:
for e in d.ents:
labels.add(e.label_)
print(f"Labels: {sorted(labels)}")
print(f"Train ents: {sum(len(d.ents) for d in train_docs)} | Test ents: {sum(len(d.ents) for d in test_docs)}")
No comments yet. Be the first to share your thoughts!