Initial commit
This commit is contained in:
255
skills/mailhog/examples/app-configs/python-smtp.py
Normal file
255
skills/mailhog/examples/app-configs/python-smtp.py
Normal file
@@ -0,0 +1,255 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Python SMTP configuration for MailHog
|
||||
|
||||
This module provides easy email sending functionality that works
|
||||
with both MailHog in development and real SMTP servers in production.
|
||||
"""
|
||||
|
||||
import smtplib
|
||||
import os
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.base import MIMEBase
|
||||
from email import encoders
|
||||
from typing import Optional, List, Dict, Any
|
||||
import logging
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EmailSender:
|
||||
"""Email sender class that works with MailHog and real SMTP servers"""
|
||||
|
||||
def __init__(self):
|
||||
self.smtp_host = os.getenv('SMTP_HOST', 'localhost')
|
||||
self.smtp_port = int(os.getenv('SMTP_PORT', 1025))
|
||||
self.smtp_user = os.getenv('SMTP_USER')
|
||||
self.smtp_pass = os.getenv('SMTP_PASS')
|
||||
self.use_tls = os.getenv('SMTP_USE_TLS', 'false').lower() == 'true'
|
||||
self.from_email = os.getenv('EMAIL_FROM', 'test@sender.local')
|
||||
|
||||
def create_connection(self):
|
||||
"""Create SMTP connection based on environment"""
|
||||
if os.getenv('NODE_ENV') == 'production':
|
||||
# Production SMTP configuration
|
||||
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
|
||||
if self.use_tls:
|
||||
server.starttls()
|
||||
if self.smtp_user and self.smtp_pass:
|
||||
server.login(self.smtp_user, self.smtp_pass)
|
||||
else:
|
||||
# Development with MailHog
|
||||
server = smtplib.SMTP('localhost', 1025)
|
||||
# MailHog doesn't require authentication or TLS
|
||||
|
||||
return server
|
||||
|
||||
def send_text_email(self, to_email: str, subject: str, body: str) -> bool:
|
||||
"""Send plain text email"""
|
||||
try:
|
||||
msg = MIMEText(body, 'plain', 'utf-8')
|
||||
msg['Subject'] = subject
|
||||
msg['From'] = self.from_email
|
||||
msg['To'] = to_email
|
||||
|
||||
with self.create_connection() as server:
|
||||
server.send_message(msg)
|
||||
logger.info(f"Text email sent successfully to {to_email}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send text email: {e}")
|
||||
return False
|
||||
|
||||
def send_html_email(self, to_email: str, subject: str, html_body: str,
|
||||
text_body: Optional[str] = None) -> bool:
|
||||
"""Send HTML email with optional plain text fallback"""
|
||||
try:
|
||||
msg = MIMEMultipart('alternative')
|
||||
msg['Subject'] = subject
|
||||
msg['From'] = self.from_email
|
||||
msg['To'] = to_email
|
||||
|
||||
# Add plain text part
|
||||
if text_body:
|
||||
text_part = MIMEText(text_body, 'plain', 'utf-8')
|
||||
msg.attach(text_part)
|
||||
else:
|
||||
# Convert HTML to plain text (basic conversion)
|
||||
import re
|
||||
plain_text = re.sub(r'<[^>]+>', '', html_body)
|
||||
plain_text = re.sub(r'\s+', ' ', plain_text).strip()
|
||||
text_part = MIMEText(plain_text, 'plain', 'utf-8')
|
||||
msg.attach(text_part)
|
||||
|
||||
# Add HTML part
|
||||
html_part = MIMEText(html_body, 'html', 'utf-8')
|
||||
msg.attach(html_part)
|
||||
|
||||
with self.create_connection() as server:
|
||||
server.send_message(msg)
|
||||
logger.info(f"HTML email sent successfully to {to_email}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send HTML email: {e}")
|
||||
return False
|
||||
|
||||
def send_email_with_attachments(self, to_email: str, subject: str,
|
||||
body: str, attachments: List[str]) -> bool:
|
||||
"""Send email with file attachments"""
|
||||
try:
|
||||
msg = MIMEMultipart()
|
||||
msg['Subject'] = subject
|
||||
msg['From'] = self.from_email
|
||||
msg['To'] = to_email
|
||||
|
||||
# Add body
|
||||
msg.attach(MIMEText(body, 'plain', 'utf-8'))
|
||||
|
||||
# Add attachments
|
||||
for file_path in attachments:
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path, 'rb') as attachment:
|
||||
part = MIMEBase('application', 'octet-stream')
|
||||
part.set_payload(attachment.read())
|
||||
encoders.encode_base64(part)
|
||||
part.add_header(
|
||||
'Content-Disposition',
|
||||
('attachment; filename=' +
|
||||
f'{os.path.basename(file_path)}')
|
||||
)
|
||||
msg.attach(part)
|
||||
else:
|
||||
logger.warning(f"Attachment file not found: {file_path}")
|
||||
|
||||
with self.create_connection() as server:
|
||||
server.send_message(msg)
|
||||
logger.info(
|
||||
f"Email with attachments sent successfully to {to_email}"
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send email with attachments: {e}")
|
||||
return False
|
||||
|
||||
def send_bulk_emails(self, recipients: List[str], subject: str, body: str,
|
||||
is_html: bool = False) -> Dict[str, Any]:
|
||||
"""Send bulk emails to multiple recipients"""
|
||||
results = {
|
||||
'total': len(recipients),
|
||||
'successful': 0,
|
||||
'failed': 0,
|
||||
'errors': []
|
||||
}
|
||||
|
||||
for recipient in recipients:
|
||||
try:
|
||||
if is_html:
|
||||
success = self.send_html_email(recipient, subject, body)
|
||||
else:
|
||||
success = self.send_text_email(recipient, subject, body)
|
||||
|
||||
if success:
|
||||
results['successful'] += 1
|
||||
else:
|
||||
results['failed'] += 1
|
||||
results['errors'].append(f"Failed to send to {recipient}")
|
||||
except Exception as e:
|
||||
results['failed'] += 1
|
||||
results['errors'].append(
|
||||
f"Error sending to {recipient}: {str(e)}"
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# Example usage and test functions
|
||||
def test_basic_email():
|
||||
"""Test sending a basic text email"""
|
||||
sender = EmailSender()
|
||||
success = sender.send_text_email(
|
||||
to_email='test@recipient.local',
|
||||
subject='Python Test Email',
|
||||
body='This is a test email sent from Python using MailHog.'
|
||||
)
|
||||
print(f"Basic email test: {'PASSED' if success else 'FAILED'}")
|
||||
|
||||
|
||||
def test_html_email():
|
||||
"""Test sending an HTML email"""
|
||||
sender = EmailSender()
|
||||
html_content = '''
|
||||
<html>
|
||||
<body>
|
||||
<h1>HTML Test Email</h1>
|
||||
<p>This is an <strong>HTML email</strong> sent from Python.</p>
|
||||
<ul>
|
||||
<li>Feature 1</li>
|
||||
<li>Feature 2</li>
|
||||
<li>Feature 3</li>
|
||||
</ul>
|
||||
<p style="color: blue;">Styled text example</p>
|
||||
</body>
|
||||
</html>
|
||||
'''
|
||||
|
||||
success = sender.send_html_email(
|
||||
to_email='html@test.local',
|
||||
subject='HTML Email Test',
|
||||
html_body=html_content,
|
||||
text_body='This is the plain text version of the HTML email.'
|
||||
)
|
||||
print(f"HTML email test: {'PASSED' if success else 'FAILED'}")
|
||||
|
||||
|
||||
def test_bulk_emails():
|
||||
"""Test sending bulk emails"""
|
||||
sender = EmailSender()
|
||||
recipients = [
|
||||
'bulk1@test.local',
|
||||
'bulk2@test.local',
|
||||
'bulk3@test.local'
|
||||
]
|
||||
|
||||
results = sender.send_bulk_emails(
|
||||
recipients=recipients,
|
||||
subject='Bulk Email Test',
|
||||
body='This is a bulk email test sent to multiple recipients.'
|
||||
)
|
||||
|
||||
print(
|
||||
"Bulk email test: " +
|
||||
f"{results['successful']}/{results['total']} successful"
|
||||
)
|
||||
if results['errors']:
|
||||
print("Errors:", results['errors'])
|
||||
|
||||
|
||||
def test_connection():
|
||||
"""Test SMTP connection"""
|
||||
try:
|
||||
sender = EmailSender()
|
||||
with sender.create_connection() as server:
|
||||
server.noop() # No-operation command
|
||||
print("SMTP connection test: PASSED")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"SMTP connection test: FAILED - {e}")
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("Running MailHog Python email tests...")
|
||||
print("=" * 40)
|
||||
|
||||
# Run all tests
|
||||
test_connection()
|
||||
test_basic_email()
|
||||
test_html_email()
|
||||
test_bulk_emails()
|
||||
print("=" * 40)
|
||||
print(
|
||||
"Email tests completed. Check MailHog UI at http://localhost:8025"
|
||||
)
|
||||
Reference in New Issue
Block a user