Initial commit
This commit is contained in:
569
skills/citation-management/scripts/extract_metadata.py
Executable file
569
skills/citation-management/scripts/extract_metadata.py
Executable file
@@ -0,0 +1,569 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Metadata Extraction Tool
|
||||
Extract citation metadata from DOI, PMID, arXiv ID, or URL using various APIs.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import requests
|
||||
import argparse
|
||||
import time
|
||||
import re
|
||||
import json
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Optional, Dict, List, Tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
class MetadataExtractor:
|
||||
"""Extract metadata from various sources and generate BibTeX."""
|
||||
|
||||
def __init__(self, email: Optional[str] = None):
|
||||
"""
|
||||
Initialize extractor.
|
||||
|
||||
Args:
|
||||
email: Email for Entrez API (recommended for PubMed)
|
||||
"""
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
'User-Agent': 'MetadataExtractor/1.0 (Citation Management Tool)'
|
||||
})
|
||||
self.email = email or os.getenv('NCBI_EMAIL', '')
|
||||
|
||||
def identify_type(self, identifier: str) -> Tuple[str, str]:
|
||||
"""
|
||||
Identify the type of identifier.
|
||||
|
||||
Args:
|
||||
identifier: DOI, PMID, arXiv ID, or URL
|
||||
|
||||
Returns:
|
||||
Tuple of (type, cleaned_identifier)
|
||||
"""
|
||||
identifier = identifier.strip()
|
||||
|
||||
# Check if URL
|
||||
if identifier.startswith('http://') or identifier.startswith('https://'):
|
||||
return self._parse_url(identifier)
|
||||
|
||||
# Check for DOI
|
||||
if identifier.startswith('10.'):
|
||||
return ('doi', identifier)
|
||||
|
||||
# Check for arXiv ID
|
||||
if re.match(r'^\d{4}\.\d{4,5}(v\d+)?$', identifier):
|
||||
return ('arxiv', identifier)
|
||||
if identifier.startswith('arXiv:'):
|
||||
return ('arxiv', identifier.replace('arXiv:', ''))
|
||||
|
||||
# Check for PMID (8-digit number typically)
|
||||
if identifier.isdigit() and len(identifier) >= 7:
|
||||
return ('pmid', identifier)
|
||||
|
||||
# Check for PMCID
|
||||
if identifier.upper().startswith('PMC') and identifier[3:].isdigit():
|
||||
return ('pmcid', identifier.upper())
|
||||
|
||||
return ('unknown', identifier)
|
||||
|
||||
def _parse_url(self, url: str) -> Tuple[str, str]:
|
||||
"""Parse URL to extract identifier type and value."""
|
||||
parsed = urlparse(url)
|
||||
|
||||
# DOI URLs
|
||||
if 'doi.org' in parsed.netloc:
|
||||
doi = parsed.path.lstrip('/')
|
||||
return ('doi', doi)
|
||||
|
||||
# PubMed URLs
|
||||
if 'pubmed.ncbi.nlm.nih.gov' in parsed.netloc or 'ncbi.nlm.nih.gov/pubmed' in url:
|
||||
pmid = re.search(r'/(\d+)', parsed.path)
|
||||
if pmid:
|
||||
return ('pmid', pmid.group(1))
|
||||
|
||||
# arXiv URLs
|
||||
if 'arxiv.org' in parsed.netloc:
|
||||
arxiv_id = re.search(r'/abs/(\d{4}\.\d{4,5})', parsed.path)
|
||||
if arxiv_id:
|
||||
return ('arxiv', arxiv_id.group(1))
|
||||
|
||||
# Nature, Science, Cell, etc. - try to extract DOI from URL
|
||||
doi_match = re.search(r'10\.\d{4,}/[^\s/]+', url)
|
||||
if doi_match:
|
||||
return ('doi', doi_match.group())
|
||||
|
||||
return ('url', url)
|
||||
|
||||
def extract_from_doi(self, doi: str) -> Optional[Dict]:
|
||||
"""
|
||||
Extract metadata from DOI using CrossRef API.
|
||||
|
||||
Args:
|
||||
doi: Digital Object Identifier
|
||||
|
||||
Returns:
|
||||
Metadata dictionary or None
|
||||
"""
|
||||
url = f'https://api.crossref.org/works/{doi}'
|
||||
|
||||
try:
|
||||
response = self.session.get(url, timeout=15)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
message = data.get('message', {})
|
||||
|
||||
metadata = {
|
||||
'type': 'doi',
|
||||
'entry_type': self._crossref_type_to_bibtex(message.get('type')),
|
||||
'doi': doi,
|
||||
'title': message.get('title', [''])[0],
|
||||
'authors': self._format_authors_crossref(message.get('author', [])),
|
||||
'year': self._extract_year_crossref(message),
|
||||
'journal': message.get('container-title', [''])[0] if message.get('container-title') else '',
|
||||
'volume': str(message.get('volume', '')) if message.get('volume') else '',
|
||||
'issue': str(message.get('issue', '')) if message.get('issue') else '',
|
||||
'pages': message.get('page', ''),
|
||||
'publisher': message.get('publisher', ''),
|
||||
'url': f'https://doi.org/{doi}'
|
||||
}
|
||||
|
||||
return metadata
|
||||
else:
|
||||
print(f'Error: CrossRef API returned status {response.status_code} for DOI: {doi}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f'Error extracting metadata from DOI {doi}: {e}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
def extract_from_pmid(self, pmid: str) -> Optional[Dict]:
|
||||
"""
|
||||
Extract metadata from PMID using PubMed E-utilities.
|
||||
|
||||
Args:
|
||||
pmid: PubMed ID
|
||||
|
||||
Returns:
|
||||
Metadata dictionary or None
|
||||
"""
|
||||
url = f'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi'
|
||||
params = {
|
||||
'db': 'pubmed',
|
||||
'id': pmid,
|
||||
'retmode': 'xml',
|
||||
'rettype': 'abstract'
|
||||
}
|
||||
|
||||
if self.email:
|
||||
params['email'] = self.email
|
||||
|
||||
api_key = os.getenv('NCBI_API_KEY')
|
||||
if api_key:
|
||||
params['api_key'] = api_key
|
||||
|
||||
try:
|
||||
response = self.session.get(url, params=params, timeout=15)
|
||||
|
||||
if response.status_code == 200:
|
||||
root = ET.fromstring(response.content)
|
||||
article = root.find('.//PubmedArticle')
|
||||
|
||||
if article is None:
|
||||
print(f'Error: No article found for PMID: {pmid}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
# Extract metadata from XML
|
||||
medline_citation = article.find('.//MedlineCitation')
|
||||
article_elem = medline_citation.find('.//Article')
|
||||
journal = article_elem.find('.//Journal')
|
||||
|
||||
# Get DOI if available
|
||||
doi = None
|
||||
article_ids = article.findall('.//ArticleId')
|
||||
for article_id in article_ids:
|
||||
if article_id.get('IdType') == 'doi':
|
||||
doi = article_id.text
|
||||
break
|
||||
|
||||
metadata = {
|
||||
'type': 'pmid',
|
||||
'entry_type': 'article',
|
||||
'pmid': pmid,
|
||||
'title': article_elem.findtext('.//ArticleTitle', ''),
|
||||
'authors': self._format_authors_pubmed(article_elem.findall('.//Author')),
|
||||
'year': self._extract_year_pubmed(article_elem),
|
||||
'journal': journal.findtext('.//Title', ''),
|
||||
'volume': journal.findtext('.//JournalIssue/Volume', ''),
|
||||
'issue': journal.findtext('.//JournalIssue/Issue', ''),
|
||||
'pages': article_elem.findtext('.//Pagination/MedlinePgn', ''),
|
||||
'doi': doi
|
||||
}
|
||||
|
||||
return metadata
|
||||
else:
|
||||
print(f'Error: PubMed API returned status {response.status_code} for PMID: {pmid}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f'Error extracting metadata from PMID {pmid}: {e}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
def extract_from_arxiv(self, arxiv_id: str) -> Optional[Dict]:
|
||||
"""
|
||||
Extract metadata from arXiv ID using arXiv API.
|
||||
|
||||
Args:
|
||||
arxiv_id: arXiv identifier
|
||||
|
||||
Returns:
|
||||
Metadata dictionary or None
|
||||
"""
|
||||
url = 'http://export.arxiv.org/api/query'
|
||||
params = {
|
||||
'id_list': arxiv_id,
|
||||
'max_results': 1
|
||||
}
|
||||
|
||||
try:
|
||||
response = self.session.get(url, params=params, timeout=15)
|
||||
|
||||
if response.status_code == 200:
|
||||
# Parse Atom XML
|
||||
root = ET.fromstring(response.content)
|
||||
ns = {'atom': 'http://www.w3.org/2005/Atom', 'arxiv': 'http://arxiv.org/schemas/atom'}
|
||||
|
||||
entry = root.find('atom:entry', ns)
|
||||
if entry is None:
|
||||
print(f'Error: No entry found for arXiv ID: {arxiv_id}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
# Extract DOI if published
|
||||
doi_elem = entry.find('arxiv:doi', ns)
|
||||
doi = doi_elem.text if doi_elem is not None else None
|
||||
|
||||
# Extract journal reference if published
|
||||
journal_ref_elem = entry.find('arxiv:journal_ref', ns)
|
||||
journal_ref = journal_ref_elem.text if journal_ref_elem is not None else None
|
||||
|
||||
# Get publication date
|
||||
published = entry.findtext('atom:published', '', ns)
|
||||
year = published[:4] if published else ''
|
||||
|
||||
# Get authors
|
||||
authors = []
|
||||
for author in entry.findall('atom:author', ns):
|
||||
name = author.findtext('atom:name', '', ns)
|
||||
if name:
|
||||
authors.append(name)
|
||||
|
||||
metadata = {
|
||||
'type': 'arxiv',
|
||||
'entry_type': 'misc' if not doi else 'article',
|
||||
'arxiv_id': arxiv_id,
|
||||
'title': entry.findtext('atom:title', '', ns).strip().replace('\n', ' '),
|
||||
'authors': ' and '.join(authors),
|
||||
'year': year,
|
||||
'doi': doi,
|
||||
'journal_ref': journal_ref,
|
||||
'abstract': entry.findtext('atom:summary', '', ns).strip().replace('\n', ' '),
|
||||
'url': f'https://arxiv.org/abs/{arxiv_id}'
|
||||
}
|
||||
|
||||
return metadata
|
||||
else:
|
||||
print(f'Error: arXiv API returned status {response.status_code} for ID: {arxiv_id}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f'Error extracting metadata from arXiv {arxiv_id}: {e}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
def metadata_to_bibtex(self, metadata: Dict, citation_key: Optional[str] = None) -> str:
|
||||
"""
|
||||
Convert metadata dictionary to BibTeX format.
|
||||
|
||||
Args:
|
||||
metadata: Metadata dictionary
|
||||
citation_key: Optional custom citation key
|
||||
|
||||
Returns:
|
||||
BibTeX string
|
||||
"""
|
||||
if not citation_key:
|
||||
citation_key = self._generate_citation_key(metadata)
|
||||
|
||||
entry_type = metadata.get('entry_type', 'misc')
|
||||
|
||||
# Build BibTeX entry
|
||||
lines = [f'@{entry_type}{{{citation_key},']
|
||||
|
||||
# Add fields
|
||||
if metadata.get('authors'):
|
||||
lines.append(f' author = {{{metadata["authors"]}}},')
|
||||
|
||||
if metadata.get('title'):
|
||||
# Protect capitalization
|
||||
title = self._protect_title(metadata['title'])
|
||||
lines.append(f' title = {{{title}}},')
|
||||
|
||||
if entry_type == 'article' and metadata.get('journal'):
|
||||
lines.append(f' journal = {{{metadata["journal"]}}},')
|
||||
elif entry_type == 'misc' and metadata.get('type') == 'arxiv':
|
||||
lines.append(f' howpublished = {{arXiv}},')
|
||||
|
||||
if metadata.get('year'):
|
||||
lines.append(f' year = {{{metadata["year"]}}},')
|
||||
|
||||
if metadata.get('volume'):
|
||||
lines.append(f' volume = {{{metadata["volume"]}}},')
|
||||
|
||||
if metadata.get('issue'):
|
||||
lines.append(f' number = {{{metadata["issue"]}}},')
|
||||
|
||||
if metadata.get('pages'):
|
||||
pages = metadata['pages'].replace('-', '--') # En-dash
|
||||
lines.append(f' pages = {{{pages}}},')
|
||||
|
||||
if metadata.get('doi'):
|
||||
lines.append(f' doi = {{{metadata["doi"]}}},')
|
||||
elif metadata.get('url'):
|
||||
lines.append(f' url = {{{metadata["url"]}}},')
|
||||
|
||||
if metadata.get('pmid'):
|
||||
lines.append(f' note = {{PMID: {metadata["pmid"]}}},')
|
||||
|
||||
if metadata.get('type') == 'arxiv' and not metadata.get('doi'):
|
||||
lines.append(f' note = {{Preprint}},')
|
||||
|
||||
# Remove trailing comma from last field
|
||||
if lines[-1].endswith(','):
|
||||
lines[-1] = lines[-1][:-1]
|
||||
|
||||
lines.append('}')
|
||||
|
||||
return '\n'.join(lines)
|
||||
|
||||
def _crossref_type_to_bibtex(self, crossref_type: str) -> str:
|
||||
"""Map CrossRef type to BibTeX entry type."""
|
||||
type_map = {
|
||||
'journal-article': 'article',
|
||||
'book': 'book',
|
||||
'book-chapter': 'incollection',
|
||||
'proceedings-article': 'inproceedings',
|
||||
'posted-content': 'misc',
|
||||
'dataset': 'misc',
|
||||
'report': 'techreport'
|
||||
}
|
||||
return type_map.get(crossref_type, 'misc')
|
||||
|
||||
def _format_authors_crossref(self, authors: List[Dict]) -> str:
|
||||
"""Format author list from CrossRef data."""
|
||||
if not authors:
|
||||
return ''
|
||||
|
||||
formatted = []
|
||||
for author in authors:
|
||||
given = author.get('given', '')
|
||||
family = author.get('family', '')
|
||||
if family:
|
||||
if given:
|
||||
formatted.append(f'{family}, {given}')
|
||||
else:
|
||||
formatted.append(family)
|
||||
|
||||
return ' and '.join(formatted)
|
||||
|
||||
def _format_authors_pubmed(self, authors: List) -> str:
|
||||
"""Format author list from PubMed XML."""
|
||||
formatted = []
|
||||
for author in authors:
|
||||
last_name = author.findtext('.//LastName', '')
|
||||
fore_name = author.findtext('.//ForeName', '')
|
||||
if last_name:
|
||||
if fore_name:
|
||||
formatted.append(f'{last_name}, {fore_name}')
|
||||
else:
|
||||
formatted.append(last_name)
|
||||
|
||||
return ' and '.join(formatted)
|
||||
|
||||
def _extract_year_crossref(self, message: Dict) -> str:
|
||||
"""Extract year from CrossRef message."""
|
||||
# Try published-print first, then published-online
|
||||
date_parts = message.get('published-print', {}).get('date-parts', [[]])
|
||||
if not date_parts or not date_parts[0]:
|
||||
date_parts = message.get('published-online', {}).get('date-parts', [[]])
|
||||
|
||||
if date_parts and date_parts[0]:
|
||||
return str(date_parts[0][0])
|
||||
return ''
|
||||
|
||||
def _extract_year_pubmed(self, article: ET.Element) -> str:
|
||||
"""Extract year from PubMed XML."""
|
||||
year = article.findtext('.//Journal/JournalIssue/PubDate/Year', '')
|
||||
if not year:
|
||||
medline_date = article.findtext('.//Journal/JournalIssue/PubDate/MedlineDate', '')
|
||||
if medline_date:
|
||||
year_match = re.search(r'\d{4}', medline_date)
|
||||
if year_match:
|
||||
year = year_match.group()
|
||||
return year
|
||||
|
||||
def _generate_citation_key(self, metadata: Dict) -> str:
|
||||
"""Generate a citation key from metadata."""
|
||||
# Get first author last name
|
||||
authors = metadata.get('authors', '')
|
||||
if authors:
|
||||
first_author = authors.split(' and ')[0]
|
||||
if ',' in first_author:
|
||||
last_name = first_author.split(',')[0].strip()
|
||||
else:
|
||||
last_name = first_author.split()[-1] if first_author else 'Unknown'
|
||||
else:
|
||||
last_name = 'Unknown'
|
||||
|
||||
# Get year
|
||||
year = metadata.get('year', '').strip()
|
||||
if not year:
|
||||
year = 'XXXX'
|
||||
|
||||
# Clean last name (remove special characters)
|
||||
last_name = re.sub(r'[^a-zA-Z]', '', last_name)
|
||||
|
||||
# Get keyword from title
|
||||
title = metadata.get('title', '')
|
||||
words = re.findall(r'\b[a-zA-Z]{4,}\b', title)
|
||||
keyword = words[0].lower() if words else 'paper'
|
||||
|
||||
return f'{last_name}{year}{keyword}'
|
||||
|
||||
def _protect_title(self, title: str) -> str:
|
||||
"""Protect capitalization in title for BibTeX."""
|
||||
# Protect common acronyms and proper nouns
|
||||
protected_words = [
|
||||
'DNA', 'RNA', 'CRISPR', 'COVID', 'HIV', 'AIDS', 'AlphaFold',
|
||||
'Python', 'AI', 'ML', 'GPU', 'CPU', 'USA', 'UK', 'EU'
|
||||
]
|
||||
|
||||
for word in protected_words:
|
||||
title = re.sub(rf'\b{word}\b', f'{{{word}}}', title, flags=re.IGNORECASE)
|
||||
|
||||
return title
|
||||
|
||||
def extract(self, identifier: str) -> Optional[str]:
|
||||
"""
|
||||
Extract metadata and return BibTeX.
|
||||
|
||||
Args:
|
||||
identifier: DOI, PMID, arXiv ID, or URL
|
||||
|
||||
Returns:
|
||||
BibTeX string or None
|
||||
"""
|
||||
id_type, clean_id = self.identify_type(identifier)
|
||||
|
||||
print(f'Identified as {id_type}: {clean_id}', file=sys.stderr)
|
||||
|
||||
metadata = None
|
||||
|
||||
if id_type == 'doi':
|
||||
metadata = self.extract_from_doi(clean_id)
|
||||
elif id_type == 'pmid':
|
||||
metadata = self.extract_from_pmid(clean_id)
|
||||
elif id_type == 'arxiv':
|
||||
metadata = self.extract_from_arxiv(clean_id)
|
||||
else:
|
||||
print(f'Error: Unknown identifier type: {identifier}', file=sys.stderr)
|
||||
return None
|
||||
|
||||
if metadata:
|
||||
return self.metadata_to_bibtex(metadata)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
"""Command-line interface."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Extract citation metadata from DOI, PMID, arXiv ID, or URL',
|
||||
epilog='Example: python extract_metadata.py --doi 10.1038/s41586-021-03819-2'
|
||||
)
|
||||
|
||||
parser.add_argument('--doi', help='Digital Object Identifier')
|
||||
parser.add_argument('--pmid', help='PubMed ID')
|
||||
parser.add_argument('--arxiv', help='arXiv ID')
|
||||
parser.add_argument('--url', help='URL to article')
|
||||
parser.add_argument('-i', '--input', help='Input file with identifiers (one per line)')
|
||||
parser.add_argument('-o', '--output', help='Output file for BibTeX (default: stdout)')
|
||||
parser.add_argument('--format', choices=['bibtex', 'json'], default='bibtex', help='Output format')
|
||||
parser.add_argument('--email', help='Email for NCBI E-utilities (recommended)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Collect identifiers
|
||||
identifiers = []
|
||||
if args.doi:
|
||||
identifiers.append(args.doi)
|
||||
if args.pmid:
|
||||
identifiers.append(args.pmid)
|
||||
if args.arxiv:
|
||||
identifiers.append(args.arxiv)
|
||||
if args.url:
|
||||
identifiers.append(args.url)
|
||||
|
||||
if args.input:
|
||||
try:
|
||||
with open(args.input, 'r', encoding='utf-8') as f:
|
||||
file_ids = [line.strip() for line in f if line.strip()]
|
||||
identifiers.extend(file_ids)
|
||||
except Exception as e:
|
||||
print(f'Error reading input file: {e}', file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not identifiers:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
# Extract metadata
|
||||
extractor = MetadataExtractor(email=args.email)
|
||||
bibtex_entries = []
|
||||
|
||||
for i, identifier in enumerate(identifiers):
|
||||
print(f'\nProcessing {i+1}/{len(identifiers)}...', file=sys.stderr)
|
||||
bibtex = extractor.extract(identifier)
|
||||
if bibtex:
|
||||
bibtex_entries.append(bibtex)
|
||||
|
||||
# Rate limiting
|
||||
if i < len(identifiers) - 1:
|
||||
time.sleep(0.5)
|
||||
|
||||
if not bibtex_entries:
|
||||
print('Error: No successful extractions', file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Format output
|
||||
if args.format == 'bibtex':
|
||||
output = '\n\n'.join(bibtex_entries) + '\n'
|
||||
else: # json
|
||||
output = json.dumps({
|
||||
'count': len(bibtex_entries),
|
||||
'entries': bibtex_entries
|
||||
}, indent=2)
|
||||
|
||||
# Write output
|
||||
if args.output:
|
||||
with open(args.output, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
print(f'\nSuccessfully wrote {len(bibtex_entries)} entries to {args.output}', file=sys.stderr)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
print(f'\nExtracted {len(bibtex_entries)}/{len(identifiers)} entries', file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user