Initial commit

This commit is contained in:
Zhongwei Li
2025-11-30 08:30:14 +08:00
commit 1dd5bee3b4
335 changed files with 147360 additions and 0 deletions

View File

@@ -0,0 +1,346 @@
#!/usr/bin/env python3
"""
Check clinical reports for HIPAA identifiers that need removal.
Scans text for 18 HIPAA identifiers and flags potential privacy violations.
Usage:
python check_deidentification.py <input_file>
python check_deidentification.py <input_file> --output violations.json
"""
import argparse
import json
import re
from pathlib import Path
from typing import Dict, List
# 18 HIPAA Identifiers patterns
HIPAA_IDENTIFIERS = {
"1_names": {
"description": "Names (patient, family, providers)",
"patterns": [
r"\b(Dr\.|Mr\.|Mrs\.|Ms\.)\s+[A-Z][a-z]+",
r"\b[A-Z][a-z]+,\s+[A-Z][a-z]+\b", # Last, First
],
"severity": "HIGH"
},
"2_geographic": {
"description": "Geographic subdivisions smaller than state",
"patterns": [
r"\b\d+\s+[A-Z][a-z]+\s+(Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr)\b",
r"\b[A-Z][a-z]+,\s+[A-Z]{2}\s+\d{5}\b", # City, ST ZIP
],
"severity": "HIGH"
},
"3_dates": {
"description": "Dates (except year)",
"patterns": [
r"\b(0?[1-9]|1[0-2])/(0?[1-9]|[12][0-9]|3[01])/\d{4}\b",
r"\b(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},\s+\d{4}\b",
r"\b\d{1,2}\s+(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b",
],
"severity": "HIGH"
},
"4_telephone": {
"description": "Telephone numbers",
"patterns": [
r"\b\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
r"\b1-\d{3}-\d{3}-\d{4}\b",
],
"severity": "HIGH"
},
"5_fax": {
"description": "Fax numbers",
"patterns": [
r"(?i)fax[:]\s*\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}",
],
"severity": "HIGH"
},
"6_email": {
"description": "Email addresses",
"patterns": [
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
],
"severity": "HIGH"
},
"7_ssn": {
"description": "Social Security numbers",
"patterns": [
r"\b\d{3}-\d{2}-\d{4}\b",
r"\b\d{9}\b",
],
"severity": "CRITICAL"
},
"8_mrn": {
"description": "Medical record numbers",
"patterns": [
r"(?i)(mrn|medical\s+record\s+(number|#))[:]\s*\d+",
r"(?i)patient\s+id[:]\s*\d+",
],
"severity": "HIGH"
},
"9_health_plan": {
"description": "Health plan beneficiary numbers",
"patterns": [
r"(?i)(insurance|policy)\s+(number|#|id)[:]\s*[A-Z0-9]+",
],
"severity": "HIGH"
},
"10_account": {
"description": "Account numbers",
"patterns": [
r"(?i)account\s+(number|#)[:]\s*\d+",
],
"severity": "MEDIUM"
},
"11_license": {
"description": "Certificate/license numbers",
"patterns": [
r"(?i)(driver[']?s\s+license|DL)[:]\s*[A-Z0-9]+",
],
"severity": "MEDIUM"
},
"12_vehicle": {
"description": "Vehicle identifiers",
"patterns": [
r"(?i)(license\s+plate|VIN)[:]\s*[A-Z0-9]+",
],
"severity": "MEDIUM"
},
"13_device": {
"description": "Device identifiers and serial numbers",
"patterns": [
r"(?i)(serial|device)\s+(number|#)[:]\s*[A-Z0-9-]+",
],
"severity": "MEDIUM"
},
"14_url": {
"description": "Web URLs",
"patterns": [
r"https?://[^\s]+",
r"www\.[^\s]+",
],
"severity": "MEDIUM"
},
"15_ip": {
"description": "IP addresses",
"patterns": [
r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
],
"severity": "HIGH"
},
"16_biometric": {
"description": "Biometric identifiers",
"patterns": [
r"(?i)(fingerprint|voiceprint|retinal\s+scan)",
],
"severity": "CRITICAL"
},
"17_photos": {
"description": "Full-face photographs",
"patterns": [
r"(?i)(photograph|photo|image).*face",
r"\.(jpg|jpeg|png|gif)\b",
],
"severity": "HIGH"
},
"18_unique": {
"description": "Any other unique identifying characteristic",
"patterns": [
r"(?i)(tattoo|birthmark|scar).*unique",
],
"severity": "MEDIUM"
},
}
def check_identifiers(text: str) -> Dict:
"""Check text for HIPAA identifiers."""
violations = {}
total_issues = 0
for identifier_id, config in HIPAA_IDENTIFIERS.items():
matches = []
for pattern in config["patterns"]:
found = re.findall(pattern, text, re.IGNORECASE)
matches.extend(found)
if matches:
# Remove duplicates, limit to first 5 examples
unique_matches = list(set(matches))[:5]
violations[identifier_id] = {
"description": config["description"],
"severity": config["severity"],
"count": len(matches),
"examples": unique_matches
}
total_issues += len(matches)
return {
"total_violations": len(violations),
"total_instances": total_issues,
"violations": violations
}
def check_age_compliance(text: str) -> Dict:
"""Check if ages >89 are properly aggregated."""
age_pattern = r"\b(\d{2,3})\s*(?:year|yr)s?[\s-]?old\b"
ages = [int(age) for age in re.findall(age_pattern, text, re.IGNORECASE)]
violations = [age for age in ages if age > 89]
return {
"ages_over_89": len(violations),
"examples": violations[:5] if violations else [],
"compliant": len(violations) == 0
}
def generate_report(filename: str) -> Dict:
"""Generate de-identification compliance report."""
filepath = Path(filename)
if not filepath.exists():
raise FileNotFoundError(f"File not found: {filename}")
with open(filepath, 'r', encoding='utf-8') as f:
text = f.read()
identifier_check = check_identifiers(text)
age_check = check_age_compliance(text)
# Determine overall compliance
critical_violations = sum(
1 for v in identifier_check["violations"].values()
if v["severity"] == "CRITICAL"
)
high_violations = sum(
1 for v in identifier_check["violations"].values()
if v["severity"] == "HIGH"
)
if critical_violations > 0 or high_violations >= 3:
status = "NON_COMPLIANT"
elif high_violations > 0 or not age_check["compliant"]:
status = "NEEDS_REVIEW"
else:
status = "COMPLIANT"
report = {
"filename": str(filename),
"status": status,
"identifier_violations": identifier_check,
"age_compliance": age_check,
"recommendation": get_recommendation(status, identifier_check, age_check)
}
return report
def get_recommendation(status: str, identifiers: Dict, ages: Dict) -> str:
"""Generate recommendation based on findings."""
if status == "COMPLIANT":
return "Document appears compliant. Perform final manual review before publication."
recommendations = []
if identifiers["total_violations"] > 0:
recommendations.append(
f"Remove or redact {identifiers['total_instances']} identified HIPAA identifiers."
)
if not ages["compliant"]:
recommendations.append(
f"Aggregate {ages['ages_over_89']} age(s) >89 years to '90 or older' or '>89 years'."
)
return " ".join(recommendations)
def print_report(report: Dict):
"""Print human-readable report."""
print("=" * 70)
print("HIPAA DE-IDENTIFICATION CHECK")
print(f"File: {report['filename']}")
print("=" * 70)
print()
print(f"Overall Status: {report['status']}")
print()
if report["identifier_violations"]["total_violations"] == 0:
print("✓ No HIPAA identifiers detected")
else:
print(f"⚠ Found {report['identifier_violations']['total_violations']} types of violations")
print(f" Total instances: {report['identifier_violations']['total_instances']}")
print()
print("Violations by type:")
print("-" * 70)
for id_type, details in sorted(
report["identifier_violations"]["violations"].items(),
key=lambda x: {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2}[x[1]["severity"]]
):
severity_symbol = "⚠⚠⚠" if details["severity"] == "CRITICAL" else "⚠⚠" if details["severity"] == "HIGH" else ""
print(f"{severity_symbol} [{details['severity']:8}] {details['description']}")
print(f" Count: {details['count']}")
print(f" Examples:")
for example in details["examples"]:
print(f" - {example}")
print()
age_check = report["age_compliance"]
if age_check["compliant"]:
print("✓ Age reporting compliant (no ages >89 or properly aggregated)")
else:
print(f"⚠ Age compliance issue: {age_check['ages_over_89']} age(s) >89 detected")
print(f" Ages must be aggregated to '90 or older' or '>89 years'")
print(f" Ages found: {age_check['examples']}")
print()
print("Recommendation:")
print(report["recommendation"])
print("=" * 70)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Check clinical reports for HIPAA identifiers"
)
parser.add_argument("input_file", help="Path to clinical report file")
parser.add_argument("--output", "-o", help="Output JSON report to file")
parser.add_argument("--json", action="store_true", help="Output JSON to stdout")
args = parser.parse_args()
try:
report = generate_report(args.input_file)
if args.json:
print(json.dumps(report, indent=2))
else:
print_report(report)
if args.output:
with open(args.output, 'w') as f:
json.dump(report, f, indent=2)
print(f"\nJSON report saved to: {args.output}")
# Exit with non-zero if violations found
exit_code = 0 if report["status"] == "COMPLIANT" else 1
return exit_code
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""
Check clinical reports for regulatory compliance (HIPAA, GCP, FDA).
Usage:
python compliance_checker.py <report_file>
"""
import argparse
import json
import re
COMPLIANCE_CHECKS = {
"hipaa": {
"consent_statement": r"(?i)(informed\s+consent|written\s+consent).*obtained",
"deidentification": r"(?i)(de-identif|anonymi[sz])",
},
"gcp": {
"irb_approval": r"(?i)(IRB|IEC|ethics\s+committee).*approv",
"protocol_compliance": r"(?i)protocol",
"informed_consent": r"(?i)informed\s+consent",
},
"fda": {
"study_id": r"(?i)(IND|IDE|protocol)\s+(number|#)[:]\s*\S+",
"safety_reporting": r"(?i)(adverse\s+event|SAE)",
}
}
def check_compliance(filename: str) -> dict:
"""Check regulatory compliance."""
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
results = {}
for regulation, checks in COMPLIANCE_CHECKS.items():
reg_results = {}
for check_name, pattern in checks.items():
reg_results[check_name] = bool(re.search(pattern, content))
results[regulation] = reg_results
return {"filename": filename, "compliance": results}
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Check regulatory compliance")
parser.add_argument("input_file", help="Path to clinical report")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
try:
report = check_compliance(args.input_file)
if args.json:
print(json.dumps(report, indent=2))
else:
print("\nRegulatory Compliance Check:\n")
for reg, checks in report["compliance"].items():
print(f"{reg.upper()}:")
for check, passed in checks.items():
symbol = "" if passed else ""
print(f" {symbol} {check}")
print()
return 0
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Extract structured clinical data from reports.
Usage:
python extract_clinical_data.py <report_file>
"""
import argparse
import json
import re
def extract_vital_signs(content: str) -> dict:
"""Extract vital signs."""
vitals = {}
patterns = {
"temperature": r"(?i)temp(?:erature)?[:]\s*([\d.]+)\s*°?F",
"bp": r"(?i)BP[:]\s*(\d+/\d+)",
"hr": r"(?i)HR[:]\s*(\d+)",
"rr": r"(?i)RR[:]\s*(\d+)",
"spo2": r"(?i)SpO2[:]\s*([\d.]+)%",
}
for vital, pattern in patterns.items():
match = re.search(pattern, content)
if match:
vitals[vital] = match.group(1)
return vitals
def extract_demographics(content: str) -> dict:
"""Extract patient demographics."""
demographics = {}
patterns = {
"age": r"(?i)(\d+)[\s-]year[\s-]old",
"sex": r"(?i)(male|female|M|F)",
}
for demo, pattern in patterns.items():
match = re.search(pattern, content)
if match:
demographics[demo] = match.group(1)
return demographics
def extract_medications(content: str) -> list:
"""Extract medication list."""
meds = []
# Simple pattern for common medication format
pattern = r"(?i)(\w+)\s+(\d+\s*mg)\s+(PO|IV|SC)\s+(daily|BID|TID|QID)"
matches = re.findall(pattern, content)
for match in matches:
meds.append({
"drug": match[0],
"dose": match[1],
"route": match[2],
"frequency": match[3]
})
return meds
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Extract clinical data")
parser.add_argument("input_file", help="Path to clinical report")
parser.add_argument("--output", "-o", help="Output JSON file")
args = parser.parse_args()
try:
with open(args.input_file, 'r', encoding='utf-8') as f:
content = f.read()
extracted_data = {
"demographics": extract_demographics(content),
"vital_signs": extract_vital_signs(content),
"medications": extract_medications(content),
}
if args.output:
with open(args.output, 'w') as f:
json.dump(extracted_data, f, indent=2)
print(f"✓ Data extracted to: {args.output}")
else:
print(json.dumps(extracted_data, indent=2))
return 0
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""
Format adverse event data into tables for clinical trial reports.
Converts CSV or structured data into formatted AE summary tables.
Usage:
python format_adverse_events.py <ae_data.csv>
"""
import argparse
import csv
from collections import defaultdict
from pathlib import Path
def format_ae_summary_table(data: list) -> str:
"""Generate AE summary table in markdown format."""
# Group by treatment arm
arm_stats = defaultdict(lambda: {
'total': 0,
'any_ae': 0,
'related_ae': 0,
'sae': 0,
'deaths': 0,
'discontinuations': 0
})
for row in data:
arm = row.get('treatment_arm', 'Unknown')
arm_stats[arm]['total'] += 1
if row.get('any_ae', '').lower() == 'yes':
arm_stats[arm]['any_ae'] += 1
if row.get('related', '').lower() == 'yes':
arm_stats[arm]['related_ae'] += 1
if row.get('serious', '').lower() == 'yes':
arm_stats[arm]['sae'] += 1
if row.get('fatal', '').lower() == 'yes':
arm_stats[arm]['deaths'] += 1
if row.get('discontinuation', '').lower() == 'yes':
arm_stats[arm]['discontinuations'] += 1
# Generate table
table = "| Category | " + " | ".join(arm_stats.keys()) + " |\n"
table += "|----------|" + "|".join(["--------"] * len(arm_stats)) + "|\n"
categories = [
('Total N', 'total'),
('Any AE', 'any_ae'),
('Treatment-related AE', 'related_ae'),
('Serious AE', 'sae'),
('Deaths', 'deaths'),
('Discontinuation due to AE', 'discontinuations')
]
for cat_name, cat_key in categories:
row_data = [cat_name]
for arm_data in arm_stats.values():
count = arm_data[cat_key]
total = arm_data['total']
pct = (count / total * 100) if total > 0 and cat_key != 'total' else 0
value = f"{count}" if cat_key == 'total' else f"{count} ({pct:.1f}%)"
row_data.append(value)
table += "| " + " | ".join(row_data) + " |\n"
return table
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Format AE data into tables")
parser.add_argument("input_file", help="Path to AE data CSV")
parser.add_argument("--output", "-o", help="Output markdown file")
args = parser.parse_args()
try:
with open(args.input_file, 'r') as f:
reader = csv.DictReader(f)
data = list(reader)
table = format_ae_summary_table(data)
if args.output:
with open(args.output, 'w') as f:
f.write(table)
print(f"✓ Table saved to: {args.output}")
else:
print("\nAdverse Events Summary Table:\n")
print(table)
return 0
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""
Interactive template generator for clinical reports.
Helps users select and generate appropriate clinical report templates.
Usage:
python generate_report_template.py
python generate_report_template.py --type case_report --output my_case_report.md
"""
import argparse
import shutil
from pathlib import Path
TEMPLATES = {
"case_report": "case_report_template.md",
"soap_note": "soap_note_template.md",
"h_and_p": "history_physical_template.md",
"discharge_summary": "discharge_summary_template.md",
"consult_note": "consult_note_template.md",
"radiology": "radiology_report_template.md",
"pathology": "pathology_report_template.md",
"lab": "lab_report_template.md",
"sae": "clinical_trial_sae_template.md",
"csr": "clinical_trial_csr_template.md",
}
DESCRIPTIONS = {
"case_report": "Clinical Case Report (CARE guidelines)",
"soap_note": "SOAP Progress Note",
"h_and_p": "History and Physical Examination",
"discharge_summary": "Hospital Discharge Summary",
"consult_note": "Consultation Note",
"radiology": "Radiology/Imaging Report",
"pathology": "Surgical Pathology Report",
"lab": "Laboratory Report",
"sae": "Serious Adverse Event Report",
"csr": "Clinical Study Report (ICH-E3)",
}
def get_template_dir() -> Path:
"""Get the templates directory path."""
script_dir = Path(__file__).parent
template_dir = script_dir.parent / "assets"
return template_dir
def list_templates():
"""List available templates."""
print("\nAvailable Clinical Report Templates:")
print("=" * 60)
for i, (key, desc) in enumerate(DESCRIPTIONS.items(), 1):
print(f"{i:2}. {key:20} - {desc}")
print("=" * 60)
def generate_template(template_type: str, output_file: str = None):
"""Generate template file."""
if template_type not in TEMPLATES:
raise ValueError(f"Invalid template type: {template_type}")
template_filename = TEMPLATES[template_type]
template_path = get_template_dir() / template_filename
if not template_path.exists():
raise FileNotFoundError(f"Template not found: {template_path}")
if output_file is None:
output_file = f"new_{template_filename}"
shutil.copy(template_path, output_file)
print(f"✓ Template created: {output_file}")
print(f" Type: {DESCRIPTIONS[template_type]}")
print(f" Source: {template_filename}")
return output_file
def interactive_mode():
"""Interactive template selection."""
list_templates()
print()
while True:
choice = input("Select template number (or 'q' to quit): ").strip()
if choice.lower() == 'q':
print("Goodbye!")
return
try:
idx = int(choice) - 1
template_types = list(TEMPLATES.keys())
if 0 <= idx < len(template_types):
template_type = template_types[idx]
output_file = input(f"Output filename (default: new_{TEMPLATES[template_type]}): ").strip()
if not output_file:
output_file = None
generate_template(template_type, output_file)
another = input("\nGenerate another template? (y/n): ").strip().lower()
if another != 'y':
print("Goodbye!")
return
else:
print()
list_templates()
print()
else:
print("Invalid selection. Please try again.")
except (ValueError, IndexError):
print("Invalid input. Please enter a number or 'q' to quit.")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Generate clinical report templates"
)
parser.add_argument(
"--type",
choices=list(TEMPLATES.keys()),
help="Template type to generate"
)
parser.add_argument(
"--output",
"-o",
help="Output filename"
)
parser.add_argument(
"--list",
action="store_true",
help="List available templates"
)
args = parser.parse_args()
try:
if args.list:
list_templates()
elif args.type:
generate_template(args.type, args.output)
else:
# Interactive mode
interactive_mode()
return 0
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@@ -0,0 +1,133 @@
#!/usr/bin/env python3
"""
Validate medical terminology and coding in clinical reports.
Usage:
python terminology_validator.py <report_file>
"""
import argparse
import json
import re
# Common medical abbreviations that should be avoided (JCAHO "Do Not Use" list)
DO_NOT_USE = {
"U": "Unit",
"IU": "International Unit",
"QD": "daily",
"QOD": "every other day",
"MS": "morphine sulfate or magnesium sulfate",
"MSO4": "morphine sulfate",
"MgSO4": "magnesium sulfate",
}
# Common abbreviations with potential ambiguity
AMBIGUOUS = ["cc", "hs", "TIW", "SC", "SQ", "D/C", "AS", "AD", "AU", "OS", "OD", "OU"]
def check_do_not_use_abbreviations(content: str) -> dict:
"""Check for prohibited abbreviations."""
violations = {}
for abbrev, meaning in DO_NOT_USE.items():
# Word boundary pattern to avoid false positives
pattern = rf"\b{re.escape(abbrev)}\b"
matches = re.findall(pattern, content)
if matches:
violations[abbrev] = {
"count": len(matches),
"should_use": meaning,
"severity": "HIGH"
}
return violations
def check_ambiguous_abbreviations(content: str) -> dict:
"""Check for ambiguous abbreviations."""
found = {}
for abbrev in AMBIGUOUS:
pattern = rf"\b{re.escape(abbrev)}\b"
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
found[abbrev] = {
"count": len(matches),
"severity": "MEDIUM"
}
return found
def validate_icd10_format(content: str) -> list:
"""Check ICD-10 code format."""
# ICD-10 format: Letter + 2 digits + optional decimal + 0-4 more digits
pattern = r"\b[A-Z]\d{2}\.?\d{0,4}\b"
codes = re.findall(pattern, content)
return list(set(codes)) # Unique codes
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Validate medical terminology")
parser.add_argument("input_file", help="Path to clinical report")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
try:
with open(args.input_file, 'r', encoding='utf-8') as f:
content = f.read()
do_not_use = check_do_not_use_abbreviations(content)
ambiguous = check_ambiguous_abbreviations(content)
icd10_codes = validate_icd10_format(content)
report = {
"filename": args.input_file,
"do_not_use_violations": do_not_use,
"ambiguous_abbreviations": ambiguous,
"icd10_codes_found": icd10_codes,
"total_issues": len(do_not_use) + len(ambiguous)
}
if args.json:
print(json.dumps(report, indent=2))
else:
print("\nTerminology Validation Report:\n")
if do_not_use:
print("❌ DO NOT USE Abbreviations Found:")
for abbrev, details in do_not_use.items():
print(f" {abbrev}: {details['count']} occurrence(s)")
print(f" → Use '{details['should_use']}' instead")
print()
else:
print("✓ No prohibited abbreviations found\n")
if ambiguous:
print("⚠ Ambiguous Abbreviations Found:")
for abbrev, details in ambiguous.items():
print(f" {abbrev}: {details['count']} occurrence(s)")
print(" Consider spelling out for clarity\n")
if icd10_codes:
print(f" ICD-10 codes detected: {len(icd10_codes)}")
for code in icd10_codes[:5]:
print(f" - {code}")
if len(icd10_codes) > 5:
print(f" ... and {len(icd10_codes) - 5} more")
print()
return 0 if not do_not_use else 1
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@@ -0,0 +1,334 @@
#!/usr/bin/env python3
"""
Validate case reports against CARE (CAse REport) guidelines.
This script checks a clinical case report for compliance with CARE guidelines
and provides a checklist of required elements.
Usage:
python validate_case_report.py <input_file.md|.txt>
python validate_case_report.py <input_file> --output report.json
"""
import argparse
import json
import re
from pathlib import Path
from typing import Dict, List, Tuple
class CareValidator:
"""Validator for CARE guideline compliance."""
# CARE checklist items with regex patterns
CARE_REQUIREMENTS = {
"title": {
"name": "Title contains 'case report'",
"pattern": r"(?i)(case\s+report|case\s+study)",
"section": "Title",
"required": True
},
"keywords": {
"name": "Keywords provided (2-5)",
"pattern": r"(?i)keywords?[:]\s*(.+)",
"section": "Keywords",
"required": True
},
"abstract": {
"name": "Abstract present",
"pattern": r"(?i)##?\s*abstract",
"section": "Abstract",
"required": True
},
"introduction": {
"name": "Introduction explaining novelty",
"pattern": r"(?i)##?\s*introduction",
"section": "Introduction",
"required": True
},
"patient_info": {
"name": "Patient demographics present",
"pattern": r"(?i)(patient\s+information|demographics?)",
"section": "Patient Information",
"required": True
},
"clinical_findings": {
"name": "Clinical findings documented",
"pattern": r"(?i)(clinical\s+findings?|physical\s+exam)",
"section": "Clinical Findings",
"required": True
},
"timeline": {
"name": "Timeline of events",
"pattern": r"(?i)(timeline|chronology)",
"section": "Timeline",
"required": True
},
"diagnostic": {
"name": "Diagnostic assessment",
"pattern": r"(?i)diagnostic\s+(assessment|evaluation|workup)",
"section": "Diagnostic Assessment",
"required": True
},
"therapeutic": {
"name": "Therapeutic interventions",
"pattern": r"(?i)(therapeutic\s+intervention|treatment)",
"section": "Therapeutic Interventions",
"required": True
},
"followup": {
"name": "Follow-up and outcomes",
"pattern": r"(?i)(follow[\-\s]?up|outcomes?)",
"section": "Follow-up and Outcomes",
"required": True
},
"discussion": {
"name": "Discussion with literature review",
"pattern": r"(?i)##?\s*discussion",
"section": "Discussion",
"required": True
},
"consent": {
"name": "Informed consent statement",
"pattern": r"(?i)(informed\s+consent|written\s+consent|consent.*obtained)",
"section": "Informed Consent",
"required": True
},
}
# HIPAA identifiers to check for
HIPAA_PATTERNS = {
"dates": r"\b(0?[1-9]|1[0-2])/(0?[1-9]|[12][0-9]|3[01])/\d{4}\b",
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"mrn": r"(?i)(mrn|medical\s+record)[:]\s*\d+",
"zip_full": r"\b\d{5}-\d{4}\b",
}
def __init__(self, filename: str):
"""Initialize validator with input file."""
self.filename = Path(filename)
self.content = self._read_file()
self.results = {}
def _read_file(self) -> str:
"""Read input file content."""
try:
with open(self.filename, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {self.filename}")
except Exception as e:
raise Exception(f"Error reading file: {e}")
def validate_care_compliance(self) -> Dict[str, Dict]:
"""Validate compliance with CARE guidelines."""
results = {}
for key, item in self.CARE_REQUIREMENTS.items():
pattern = item["pattern"]
found = bool(re.search(pattern, self.content))
results[key] = {
"name": item["name"],
"section": item["section"],
"required": item["required"],
"found": found,
"status": "PASS" if found else "FAIL" if item["required"] else "WARNING"
}
self.results["care_compliance"] = results
return results
def check_deidentification(self) -> Dict[str, List[str]]:
"""Check for potential HIPAA identifier violations."""
violations = {}
for identifier, pattern in self.HIPAA_PATTERNS.items():
matches = re.findall(pattern, self.content)
if matches:
violations[identifier] = matches[:5] # Limit to first 5 examples
self.results["hipaa_violations"] = violations
return violations
def check_word_count(self) -> Dict[str, int]:
"""Check word count and provide limits guidance."""
words = len(re.findall(r'\b\w+\b', self.content))
word_count = {
"total_words": words,
"typical_min": 1500,
"typical_max": 3000,
"status": "ACCEPTABLE" if 1500 <= words <= 3500 else "CHECK"
}
self.results["word_count"] = word_count
return word_count
def check_references(self) -> Dict[str, any]:
"""Check for presence of references."""
ref_patterns = [
r"##?\s*references",
r"\[\d+\]",
r"\d+\.\s+[A-Z][a-z]+.*\d{4}", # Numbered references
]
has_refs = any(re.search(p, self.content, re.IGNORECASE) for p in ref_patterns)
ref_count = len(re.findall(r"\[\d+\]", self.content))
references = {
"has_references": has_refs,
"estimated_count": ref_count,
"recommended_min": 10,
"status": "ACCEPTABLE" if ref_count >= 10 else "LOW"
}
self.results["references"] = references
return references
def generate_report(self) -> Dict:
"""Generate comprehensive validation report."""
if not self.results:
self.validate_care_compliance()
self.check_deidentification()
self.check_word_count()
self.check_references()
# Calculate overall compliance
care = self.results["care_compliance"]
total_required = sum(1 for v in care.values() if v["required"])
passed = sum(1 for v in care.values() if v["required"] and v["found"])
compliance_rate = (passed / total_required * 100) if total_required > 0 else 0
report = {
"filename": str(self.filename),
"compliance_rate": round(compliance_rate, 1),
"care_compliance": care,
"hipaa_violations": self.results["hipaa_violations"],
"word_count": self.results["word_count"],
"references": self.results["references"],
"overall_status": "PASS" if compliance_rate >= 90 and not self.results["hipaa_violations"] else "NEEDS_REVISION"
}
return report
def print_report(self):
"""Print human-readable validation report."""
report = self.generate_report()
print("=" * 70)
print(f"CARE Guideline Validation Report")
print(f"File: {report['filename']}")
print("=" * 70)
print()
print(f"Overall Compliance: {report['compliance_rate']}%")
print(f"Status: {report['overall_status']}")
print()
print("CARE Checklist:")
print("-" * 70)
for key, item in report["care_compliance"].items():
status_symbol = "" if item["found"] else ""
print(f"{status_symbol} [{item['status']:8}] {item['name']}")
print()
if report["hipaa_violations"]:
print("HIPAA DE-IDENTIFICATION WARNINGS:")
print("-" * 70)
for identifier, examples in report["hipaa_violations"].items():
print(f"{identifier.upper()}: {len(examples)} instance(s) found")
for ex in examples[:3]:
print(f" Example: {ex}")
print()
else:
print("✓ No obvious HIPAA identifiers detected")
print()
wc = report["word_count"]
print(f"Word Count: {wc['total_words']} words")
print(f" Typical range: {wc['typical_min']}-{wc['typical_max']} words")
print(f" Status: {wc['status']}")
print()
refs = report["references"]
print(f"References: {refs['estimated_count']} citation(s) detected")
print(f" Recommended minimum: {refs['recommended_min']}")
print(f" Status: {refs['status']}")
print()
print("=" * 70)
# Recommendations
issues = []
if report['compliance_rate'] < 100:
missing = [v["name"] for v in report["care_compliance"].values() if v["required"] and not v["found"]]
issues.append(f"Missing required sections: {', '.join(missing)}")
if report["hipaa_violations"]:
issues.append("HIPAA identifiers detected - review de-identification")
if refs["status"] == "LOW":
issues.append("Low reference count - consider adding more citations")
if issues:
print("RECOMMENDATIONS:")
for i, issue in enumerate(issues, 1):
print(f"{i}. {issue}")
else:
print("✓ Case report meets CARE guidelines!")
print("=" * 70)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Validate clinical case reports against CARE guidelines"
)
parser.add_argument(
"input_file",
help="Path to case report file (Markdown or text)"
)
parser.add_argument(
"--output",
"-o",
help="Output JSON report to file"
)
parser.add_argument(
"--json",
action="store_true",
help="Output JSON to stdout instead of human-readable report"
)
args = parser.parse_args()
try:
validator = CareValidator(args.input_file)
report = validator.generate_report()
if args.json:
print(json.dumps(report, indent=2))
else:
validator.print_report()
if args.output:
with open(args.output, 'w') as f:
json.dumps(report, f, indent=2)
print(f"\nJSON report saved to: {args.output}")
# Exit with non-zero if validation failed
exit_code = 0 if report["overall_status"] == "PASS" else 1
return exit_code
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
import sys
sys.exit(main())

View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""
Validate clinical trial reports against ICH-E3 structure.
Checks Clinical Study Reports (CSR) for ICH-E3 compliance.
Usage:
python validate_trial_report.py <csr_file.md>
"""
import argparse
import json
import re
from pathlib import Path
ICH_E3_SECTIONS = {
"title_page": "Title Page",
"synopsis": "Synopsis (2)",
"toc": "Table of Contents (3)",
"abbreviations": "List of Abbreviations (4)",
"ethics": "Ethics (Section 2)",
"investigators": "Investigators and Study Administrative Structure (Section 3)",
"introduction": "Introduction (Section 4)",
"objectives": "Study Objectives and Plan (Section 5)",
"study_patients": "Study Patients (Section 6)",
"efficacy": "Efficacy Evaluation (Section 7)",
"safety": "Safety Evaluation (Section 8)",
"discussion": "Discussion and Overall Conclusions (Section 9)",
"tables_figures": "Tables, Figures, and Graphs (Section 10)",
"references": "References (Section 11)",
"appendices": "Appendices (Section 12-14)",
}
def validate_ich_e3(filename: str) -> dict:
"""Validate CSR structure against ICH-E3."""
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
results = {}
for section_id, section_name in ICH_E3_SECTIONS.items():
# Simple pattern matching for section headers
pattern = rf"(?i)##?\s*{re.escape(section_name.split('(')[0].strip())}"
found = bool(re.search(pattern, content))
results[section_id] = {"name": section_name, "found": found}
compliance_rate = sum(1 for r in results.values() if r["found"]) / len(results) * 100
return {
"filename": filename,
"compliance_rate": round(compliance_rate, 1),
"sections": results,
"status": "PASS" if compliance_rate >= 90 else "NEEDS_REVISION"
}
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Validate CSR against ICH-E3")
parser.add_argument("input_file", help="Path to CSR file")
parser.add_argument("--json", action="store_true", help="Output JSON")
args = parser.parse_args()
try:
report = validate_ich_e3(args.input_file)
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"\nICH-E3 Compliance: {report['compliance_rate']}%")
print(f"Status: {report['status']}\n")
print("Section Checklist:")
for section, details in report["sections"].items():
symbol = "" if details["found"] else ""
print(f"{symbol} {details['name']}")
return 0 if report["status"] == "PASS" else 1
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
import sys
sys.exit(main())