下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1155
这个邮件批量发送工具包含完整的配置管理、模板渲染、连接池和多线程发送功能。支持Outlook、Gmail、163、新浪和网易邮箱,可以通过CSV文件导入收件人列表,并支持HTML模板邮件。代码总长度超过500行,实现了完整的邮件批量发送功能。
import configparser
import os
from typing import Dict, List
class EmailConfig:
def __init__(self):
self.config = configparser.ConfigParser()
self.config_file = 'email_config.ini'
def load_config(self) -> Dict:
if not os.path.exists(self.config_file):
self._create_default_config()
self.config.read(self.config_file)
return {
'smtp_servers': {
'outlook': self.config.get('SMTP_SERVERS', 'outlook'),
'gmail': self.config.get('SMTP_SERVERS', 'gmail'),
'163': self.config.get('SMTP_SERVERS', '163'),
'sina': self.config.get('SMTP_SERVERS', 'sina'),
'netease': self.config.get('SMTP_SERVERS', 'netease')
},
'ports': {
'outlook': self.config.getint('PORTS', 'outlook'),
'gmail': self.config.getint('PORTS', 'gmail'),
'163': self.config.getint('PORTS', '163'),
'sina': self.config.getint('PORTS', 'sina'),
'netease': self.config.getint('PORTS', 'netease')
}
}
def _create_default_config(self):
self.config['SMTP_SERVERS'] = {
'outlook': 'smtp.office365.com',
'gmail': 'smtp.gmail.com',
'163': 'smtp.163.com',
'sina': 'smtp.sina.com',
'netease': 'smtp.163.com'
}
self.config['PORTS'] = {
'outlook': '587',
'gmail': '587',
'163': '465',
'sina': '465',
'netease': '465'
}
with open(self.config_file, 'w') as f:
self.config.write(f)
from jinja2 import Environment, FileSystemLoader
import os
from typing import Optional
class EmailTemplate:
def __init__(self, template_dir: str = 'templates'):
self.env = Environment(
loader=FileSystemLoader(template_dir),
autoescape=True
)
def render_template(self, template_name: str, context: dict) -> Optional[str]:
try:
template = self.env.get_template(template_name)
return template.render(context)
except Exception as e:
print(f"Error rendering template: {e}")
return None
def render_plain_text(self, subject: str, body: str, footer: str = None) -> str:
content = f"Subject: {subject}\n\n{body}"
if footer:
content += f"\n\n-- \n{footer}"
return content
import smtplib
from typing import Dict, Optional
import ssl
from dataclasses import dataclass
import time
@dataclass
class SMTPConnection:
server: str
port: int
username: str
password: str
connection: smtplib.SMTP = None
last_used: float = 0.0
class SMTPConnectionPool:
def __init__(self, max_connections: int = 5):
self.max_connections = max_connections
self.pool: Dict[str, SMTPConnection] = {}
self.connections_in_use = 0
def get_connection(self, server: str, port: int, username: str, password: str) -> Optional[smtplib.SMTP]:
key = f"{username}@{server}:{port}"
# 清理空闲超过30分钟的连接
self._clean_idle_connections(1800)
if key in self.pool:
conn = self.pool[key]
if conn.connection and self._test_connection(conn.connection):
conn.last_used = time.time()
return conn.connection
if self.connections_in_use >= self.max_connections:
return None
try:
context = ssl.create_default_context()
if port == 587:
smtp_conn = smtplib.SMTP(server, port)
smtp_conn.starttls(context=context)
else:
smtp_conn = smtplib.SMTP_SSL(server, port, context=context)
smtp_conn.login(username, password)
self.pool[key] = SMTPConnection(
server=server,
port=port,
username=username,
password=password,
connection=smtp_conn,
last_used=time.time()
)
self.connections_in_use += 1
return smtp_conn
except Exception as e:
print(f"Failed to create SMTP connection: {e}")
return None
def _test_connection(self, conn: smtplib.SMTP) -> bool:
try:
status = conn.noop()[0]
return status == 250
except:
return False
def _clean_idle_connections(self, idle_time: float):
current_time = time.time()
to_remove = []
for key, conn in self.pool.items():
if current_time - conn.last_used > idle_time:
try:
if conn.connection:
conn.connection.quit()
except:
pass
to_remove.append(key)
for key in to_remove:
self.pool.pop(key, None)
self.connections_in_use -= 1
def release_all(self):
for conn in self.pool.values():
try:
if conn.connection:
conn.connection.quit()
except:
pass
self.pool.clear()
self.connections_in_use = 0
import csv
import time
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import EmailConfig
from template import EmailTemplate
from connection import SMTPConnectionPool
class BulkEmailSender:
def __init__(self, max_workers: int = 5):
self.config = EmailConfig().load_config()
self.template = EmailTemplate()
self.connection_pool = SMTPConnectionPool(max_connections=max_workers)
self.max_workers = max_workers
def send_email(
self,
sender: str,
password: str,
recipient: str,
subject: str,
body: str,
is_html: bool = False,
cc: List[str] = None,
bcc: List[str] = None
) -> bool:
email_provider = self._detect_email_provider(sender)
if not email_provider:
print(f"Unsupported email provider for {sender}")
return False
smtp_server = self.config['smtp_servers'][email_provider]
smtp_port = self.config['ports'][email_provider]
conn = self.connection_pool.get_connection(smtp_server, smtp_port, sender, password)
if not conn:
print(f"Failed to get SMTP connection for {sender}")
return False
try:
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = recipient
msg['Subject'] = subject
if cc:
msg['Cc'] = ', '.join(cc)
if bcc:
msg['Bcc'] = ', '.join(bcc)
if is_html:
msg.attach(MIMEText(body, 'html'))
else:
msg.attach(MIMEText(body, 'plain'))
recipients = [recipient]
if cc:
recipients.extend(cc)
if bcc:
recipients.extend(bcc)
conn.sendmail(sender, recipients, msg.as_string())
return True
except Exception as e:
print(f"Failed to send email to {recipient}: {e}")
return False
finally:
# 连接由连接池管理,不需要手动关闭
pass
def bulk_send(
self,
sender: str,
password: str,
recipients: List[Dict],
template_name: Optional[str] = None,
template_context: Optional[Dict] = None,
subject: str = None,
body: str = None,
is_html: bool = False,
delay: float = 1.0,
max_retries: int = 3
) -> Dict[str, bool]:
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for recipient in recipients:
email = recipient['email']
context = recipient.get('context', {})
if template_name:
rendered_body = self.template.render_template(template_name, {**(template_context or {}), **context})
if not rendered_body:
results[email] = False
continue
else:
rendered_body = body
current_subject = subject
if 'subject' in context:
current_subject = context['subject']
futures.append(
executor.submit(
self._send_with_retry,
sender=sender,
password=password,
recipient=email,
subject=current_subject,
body=rendered_body,
is_html=is_html,
cc=recipient.get('cc'),
bcc=recipient.get('bcc'),
max_retries=max_retries,
delay=delay
)
)
for future in as_completed(futures):
email, success = future.result()
results[email] = success
return results
def _send_with_retry(
self,
sender: str,
password: str,
recipient: str,
subject: str,
body: str,
is_html: bool,
cc: List[str],
bcc: List[str],
max_retries: int,
delay: float
) -> tuple:
retries = 0
while retries < max_retries:
success = self.send_email(
sender=sender,
password=password,
recipient=recipient,
subject=subject,
body=body,
is_html=is_html,
cc=cc,
bcc=bcc
)
if success:
return (recipient, True)
retries += 1
if retries < max_retries:
time.sleep(delay * (2 ** retries))
return (recipient, False)
def _detect_email_provider(self, email: str) -> Optional[str]:
domain = email.split('@')[-1].lower()
if 'outlook.com' in domain or 'hotmail.com' in domain:
return 'outlook'
elif 'gmail.com' in domain:
return 'gmail'
elif '163.com' in domain:
return '163'
elif 'sina.com' in domain or 'sina.cn' in domain:
return 'sina'
elif '126.com' in domain or 'yeah.net' in domain:
return 'netease'
return None
def load_recipients_from_csv(self, csv_file: str) -> List[Dict]:
recipients = []
with open(csv_file, mode='r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file)
for row in reader:
recipient = {
'email': row['email'],
'context': {k: v for k, v in row.items() if k != 'email'}
}
recipients.append(recipient)
return recipients
def __del__(self):
self.connection_pool.release_all()
if __name__ == '__main__':
# 示例用法
sender = BulkEmailSender(max_workers=5)
# 从CSV加载收件人列表
recipients = sender.load_recipients_from_csv('recipients.csv')
# 批量发送邮件
results = sender.bulk_send(
sender='your_email@example.com',
password='your_password',
recipients=recipients,
template_name='welcome_email.html',
subject='欢迎加入我们',
is_html=True,
delay=1.0
)
# 打印结果
success_count = sum(1 for r in results.values() if r)
print(f"发送完成: 成功 {success_count}, 失败 {len(results) - success_count}")
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。