256 lines
8.4 KiB
Python
256 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Python SMTP configuration for MailHog
|
|
|
|
This module provides easy email sending functionality that works
|
|
with both MailHog in development and real SMTP servers in production.
|
|
"""
|
|
|
|
import smtplib
|
|
import os
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from typing import Optional, List, Dict, Any
|
|
import logging
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EmailSender:
|
|
"""Email sender class that works with MailHog and real SMTP servers"""
|
|
|
|
def __init__(self):
|
|
self.smtp_host = os.getenv('SMTP_HOST', 'localhost')
|
|
self.smtp_port = int(os.getenv('SMTP_PORT', 1025))
|
|
self.smtp_user = os.getenv('SMTP_USER')
|
|
self.smtp_pass = os.getenv('SMTP_PASS')
|
|
self.use_tls = os.getenv('SMTP_USE_TLS', 'false').lower() == 'true'
|
|
self.from_email = os.getenv('EMAIL_FROM', 'test@sender.local')
|
|
|
|
def create_connection(self):
|
|
"""Create SMTP connection based on environment"""
|
|
if os.getenv('NODE_ENV') == 'production':
|
|
# Production SMTP configuration
|
|
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
|
|
if self.use_tls:
|
|
server.starttls()
|
|
if self.smtp_user and self.smtp_pass:
|
|
server.login(self.smtp_user, self.smtp_pass)
|
|
else:
|
|
# Development with MailHog
|
|
server = smtplib.SMTP('localhost', 1025)
|
|
# MailHog doesn't require authentication or TLS
|
|
|
|
return server
|
|
|
|
def send_text_email(self, to_email: str, subject: str, body: str) -> bool:
|
|
"""Send plain text email"""
|
|
try:
|
|
msg = MIMEText(body, 'plain', 'utf-8')
|
|
msg['Subject'] = subject
|
|
msg['From'] = self.from_email
|
|
msg['To'] = to_email
|
|
|
|
with self.create_connection() as server:
|
|
server.send_message(msg)
|
|
logger.info(f"Text email sent successfully to {to_email}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to send text email: {e}")
|
|
return False
|
|
|
|
def send_html_email(self, to_email: str, subject: str, html_body: str,
|
|
text_body: Optional[str] = None) -> bool:
|
|
"""Send HTML email with optional plain text fallback"""
|
|
try:
|
|
msg = MIMEMultipart('alternative')
|
|
msg['Subject'] = subject
|
|
msg['From'] = self.from_email
|
|
msg['To'] = to_email
|
|
|
|
# Add plain text part
|
|
if text_body:
|
|
text_part = MIMEText(text_body, 'plain', 'utf-8')
|
|
msg.attach(text_part)
|
|
else:
|
|
# Convert HTML to plain text (basic conversion)
|
|
import re
|
|
plain_text = re.sub(r'<[^>]+>', '', html_body)
|
|
plain_text = re.sub(r'\s+', ' ', plain_text).strip()
|
|
text_part = MIMEText(plain_text, 'plain', 'utf-8')
|
|
msg.attach(text_part)
|
|
|
|
# Add HTML part
|
|
html_part = MIMEText(html_body, 'html', 'utf-8')
|
|
msg.attach(html_part)
|
|
|
|
with self.create_connection() as server:
|
|
server.send_message(msg)
|
|
logger.info(f"HTML email sent successfully to {to_email}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to send HTML email: {e}")
|
|
return False
|
|
|
|
def send_email_with_attachments(self, to_email: str, subject: str,
|
|
body: str, attachments: List[str]) -> bool:
|
|
"""Send email with file attachments"""
|
|
try:
|
|
msg = MIMEMultipart()
|
|
msg['Subject'] = subject
|
|
msg['From'] = self.from_email
|
|
msg['To'] = to_email
|
|
|
|
# Add body
|
|
msg.attach(MIMEText(body, 'plain', 'utf-8'))
|
|
|
|
# Add attachments
|
|
for file_path in attachments:
|
|
if os.path.exists(file_path):
|
|
with open(file_path, 'rb') as attachment:
|
|
part = MIMEBase('application', 'octet-stream')
|
|
part.set_payload(attachment.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
'Content-Disposition',
|
|
('attachment; filename=' +
|
|
f'{os.path.basename(file_path)}')
|
|
)
|
|
msg.attach(part)
|
|
else:
|
|
logger.warning(f"Attachment file not found: {file_path}")
|
|
|
|
with self.create_connection() as server:
|
|
server.send_message(msg)
|
|
logger.info(
|
|
f"Email with attachments sent successfully to {to_email}"
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to send email with attachments: {e}")
|
|
return False
|
|
|
|
def send_bulk_emails(self, recipients: List[str], subject: str, body: str,
|
|
is_html: bool = False) -> Dict[str, Any]:
|
|
"""Send bulk emails to multiple recipients"""
|
|
results = {
|
|
'total': len(recipients),
|
|
'successful': 0,
|
|
'failed': 0,
|
|
'errors': []
|
|
}
|
|
|
|
for recipient in recipients:
|
|
try:
|
|
if is_html:
|
|
success = self.send_html_email(recipient, subject, body)
|
|
else:
|
|
success = self.send_text_email(recipient, subject, body)
|
|
|
|
if success:
|
|
results['successful'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
results['errors'].append(f"Failed to send to {recipient}")
|
|
except Exception as e:
|
|
results['failed'] += 1
|
|
results['errors'].append(
|
|
f"Error sending to {recipient}: {str(e)}"
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
# Example usage and test functions
|
|
def test_basic_email():
|
|
"""Test sending a basic text email"""
|
|
sender = EmailSender()
|
|
success = sender.send_text_email(
|
|
to_email='test@recipient.local',
|
|
subject='Python Test Email',
|
|
body='This is a test email sent from Python using MailHog.'
|
|
)
|
|
print(f"Basic email test: {'PASSED' if success else 'FAILED'}")
|
|
|
|
|
|
def test_html_email():
|
|
"""Test sending an HTML email"""
|
|
sender = EmailSender()
|
|
html_content = '''
|
|
<html>
|
|
<body>
|
|
<h1>HTML Test Email</h1>
|
|
<p>This is an <strong>HTML email</strong> sent from Python.</p>
|
|
<ul>
|
|
<li>Feature 1</li>
|
|
<li>Feature 2</li>
|
|
<li>Feature 3</li>
|
|
</ul>
|
|
<p style="color: blue;">Styled text example</p>
|
|
</body>
|
|
</html>
|
|
'''
|
|
|
|
success = sender.send_html_email(
|
|
to_email='html@test.local',
|
|
subject='HTML Email Test',
|
|
html_body=html_content,
|
|
text_body='This is the plain text version of the HTML email.'
|
|
)
|
|
print(f"HTML email test: {'PASSED' if success else 'FAILED'}")
|
|
|
|
|
|
def test_bulk_emails():
|
|
"""Test sending bulk emails"""
|
|
sender = EmailSender()
|
|
recipients = [
|
|
'bulk1@test.local',
|
|
'bulk2@test.local',
|
|
'bulk3@test.local'
|
|
]
|
|
|
|
results = sender.send_bulk_emails(
|
|
recipients=recipients,
|
|
subject='Bulk Email Test',
|
|
body='This is a bulk email test sent to multiple recipients.'
|
|
)
|
|
|
|
print(
|
|
"Bulk email test: " +
|
|
f"{results['successful']}/{results['total']} successful"
|
|
)
|
|
if results['errors']:
|
|
print("Errors:", results['errors'])
|
|
|
|
|
|
def test_connection():
|
|
"""Test SMTP connection"""
|
|
try:
|
|
sender = EmailSender()
|
|
with sender.create_connection() as server:
|
|
server.noop() # No-operation command
|
|
print("SMTP connection test: PASSED")
|
|
return True
|
|
except Exception as e:
|
|
print(f"SMTP connection test: FAILED - {e}")
|
|
return False
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("Running MailHog Python email tests...")
|
|
print("=" * 40)
|
|
|
|
# Run all tests
|
|
test_connection()
|
|
test_basic_email()
|
|
test_html_email()
|
|
test_bulk_emails()
|
|
print("=" * 40)
|
|
print(
|
|
"Email tests completed. Check MailHog UI at http://localhost:8025"
|
|
)
|