首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >微软邮箱outlook批量群发工具,谷歌163新浪网易邮件批量发送软件,python版开源

微软邮箱outlook批量群发工具,谷歌163新浪网易邮件批量发送软件,python版开源

原创
作者头像
用户11744395
发布2025-07-15 10:21:06
发布2025-07-15 10:21:06
4830
举报

下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1155

这个邮件批量发送工具包含完整的配置管理、模板渲染、连接池和多线程发送功能。支持Outlook、Gmail、163、新浪和网易邮箱,可以通过CSV文件导入收件人列表,并支持HTML模板邮件。代码总长度超过500行,实现了完整的邮件批量发送功能。

代码语言:txt
复制

import configparser
import os
from typing import Dict, List

class EmailConfig:
    def __init__(self):
        self.config = configparser.ConfigParser()
        self.config_file = 'email_config.ini'
        
    def load_config(self) -> Dict:
        if not os.path.exists(self.config_file):
            self._create_default_config()
        self.config.read(self.config_file)
        return {
            'smtp_servers': {
                'outlook': self.config.get('SMTP_SERVERS', 'outlook'),
                'gmail': self.config.get('SMTP_SERVERS', 'gmail'),
                '163': self.config.get('SMTP_SERVERS', '163'),
                'sina': self.config.get('SMTP_SERVERS', 'sina'),
                'netease': self.config.get('SMTP_SERVERS', 'netease')
            },
            'ports': {
                'outlook': self.config.getint('PORTS', 'outlook'),
                'gmail': self.config.getint('PORTS', 'gmail'),
                '163': self.config.getint('PORTS', '163'),
                'sina': self.config.getint('PORTS', 'sina'),
                'netease': self.config.getint('PORTS', 'netease')
            }
        }
    
    def _create_default_config(self):
        self.config['SMTP_SERVERS'] = {
            'outlook': 'smtp.office365.com',
            'gmail': 'smtp.gmail.com',
            '163': 'smtp.163.com',
            'sina': 'smtp.sina.com',
            'netease': 'smtp.163.com'
        }
        self.config['PORTS'] = {
            'outlook': '587',
            'gmail': '587',
            '163': '465',
            'sina': '465',
            'netease': '465'
        }
        with open(self.config_file, 'w') as f:
            self.config.write(f)
代码语言:txt
复制
from jinja2 import Environment, FileSystemLoader
import os
from typing import Optional

class EmailTemplate:
    def __init__(self, template_dir: str = 'templates'):
        self.env = Environment(
            loader=FileSystemLoader(template_dir),
            autoescape=True
        )
        
    def render_template(self, template_name: str, context: dict) -> Optional[str]:
        try:
            template = self.env.get_template(template_name)
            return template.render(context)
        except Exception as e:
            print(f"Error rendering template: {e}")
            return None
            
    def render_plain_text(self, subject: str, body: str, footer: str = None) -> str:
        content = f"Subject: {subject}\n\n{body}"
        if footer:
            content += f"\n\n-- \n{footer}"
        return content
代码语言:txt
复制
import smtplib
from typing import Dict, Optional
import ssl
from dataclasses import dataclass
import time

@dataclass
class SMTPConnection:
    server: str
    port: int
    username: str
    password: str
    connection: smtplib.SMTP = None
    last_used: float = 0.0

class SMTPConnectionPool:
    def __init__(self, max_connections: int = 5):
        self.max_connections = max_connections
        self.pool: Dict[str, SMTPConnection] = {}
        self.connections_in_use = 0
        
    def get_connection(self, server: str, port: int, username: str, password: str) -> Optional[smtplib.SMTP]:
        key = f"{username}@{server}:{port}"
        
        # 清理空闲超过30分钟的连接
        self._clean_idle_connections(1800)
        
        if key in self.pool:
            conn = self.pool[key]
            if conn.connection and self._test_connection(conn.connection):
                conn.last_used = time.time()
                return conn.connection
        
        if self.connections_in_use >= self.max_connections:
            return None
            
        try:
            context = ssl.create_default_context()
            if port == 587:
                smtp_conn = smtplib.SMTP(server, port)
                smtp_conn.starttls(context=context)
            else:
                smtp_conn = smtplib.SMTP_SSL(server, port, context=context)
                
            smtp_conn.login(username, password)
            
            self.pool[key] = SMTPConnection(
                server=server,
                port=port,
                username=username,
                password=password,
                connection=smtp_conn,
                last_used=time.time()
            )
            self.connections_in_use += 1
            return smtp_conn
        except Exception as e:
            print(f"Failed to create SMTP connection: {e}")
            return None
            
    def _test_connection(self, conn: smtplib.SMTP) -> bool:
        try:
            status = conn.noop()[0]
            return status == 250
        except:
            return False
            
    def _clean_idle_connections(self, idle_time: float):
        current_time = time.time()
        to_remove = []
        
        for key, conn in self.pool.items():
            if current_time - conn.last_used > idle_time:
                try:
                    if conn.connection:
                        conn.connection.quit()
                except:
                    pass
                to_remove.append(key)
                
        for key in to_remove:
            self.pool.pop(key, None)
            self.connections_in_use -= 1
            
    def release_all(self):
        for conn in self.pool.values():
            try:
                if conn.connection:
                    conn.connection.quit()
            except:
                pass
        self.pool.clear()
        self.connections_in_use = 0
代码语言:txt
复制
import csv
import time
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import EmailConfig
from template import EmailTemplate
from connection import SMTPConnectionPool

class BulkEmailSender:
    def __init__(self, max_workers: int = 5):
        self.config = EmailConfig().load_config()
        self.template = EmailTemplate()
        self.connection_pool = SMTPConnectionPool(max_connections=max_workers)
        self.max_workers = max_workers
        
    def send_email(
        self,
        sender: str,
        password: str,
        recipient: str,
        subject: str,
        body: str,
        is_html: bool = False,
        cc: List[str] = None,
        bcc: List[str] = None
    ) -> bool:
        email_provider = self._detect_email_provider(sender)
        if not email_provider:
            print(f"Unsupported email provider for {sender}")
            return False
            
        smtp_server = self.config['smtp_servers'][email_provider]
        smtp_port = self.config['ports'][email_provider]
        
        conn = self.connection_pool.get_connection(smtp_server, smtp_port, sender, password)
        if not conn:
            print(f"Failed to get SMTP connection for {sender}")
            return False
            
        try:
            msg = MIMEMultipart()
            msg['From'] = sender
            msg['To'] = recipient
            msg['Subject'] = subject
            
            if cc:
                msg['Cc'] = ', '.join(cc)
            if bcc:
                msg['Bcc'] = ', '.join(bcc)
                
            if is_html:
                msg.attach(MIMEText(body, 'html'))
            else:
                msg.attach(MIMEText(body, 'plain'))
                
            recipients = [recipient]
            if cc:
                recipients.extend(cc)
            if bcc:
                recipients.extend(bcc)
                
            conn.sendmail(sender, recipients, msg.as_string())
            return True
        except Exception as e:
            print(f"Failed to send email to {recipient}: {e}")
            return False
        finally:
            # 连接由连接池管理,不需要手动关闭
            pass
            
    def bulk_send(
        self,
        sender: str,
        password: str,
        recipients: List[Dict],
        template_name: Optional[str] = None,
        template_context: Optional[Dict] = None,
        subject: str = None,
        body: str = None,
        is_html: bool = False,
        delay: float = 1.0,
        max_retries: int = 3
    ) -> Dict[str, bool]:
        results = {}
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            for recipient in recipients:
                email = recipient['email']
                context = recipient.get('context', {})
                
                if template_name:
                    rendered_body = self.template.render_template(template_name, {**(template_context or {}), **context})
                    if not rendered_body:
                        results[email] = False
                        continue
                else:
                    rendered_body = body
                    
                current_subject = subject
                if 'subject' in context:
                    current_subject = context['subject']
                    
                futures.append(
                    executor.submit(
                        self._send_with_retry,
                        sender=sender,
                        password=password,
                        recipient=email,
                        subject=current_subject,
                        body=rendered_body,
                        is_html=is_html,
                        cc=recipient.get('cc'),
                        bcc=recipient.get('bcc'),
                        max_retries=max_retries,
                        delay=delay
                    )
                )
                
            for future in as_completed(futures):
                email, success = future.result()
                results[email] = success
                
        return results
        
    def _send_with_retry(
        self,
        sender: str,
        password: str,
        recipient: str,
        subject: str,
        body: str,
        is_html: bool,
        cc: List[str],
        bcc: List[str],
        max_retries: int,
        delay: float
    ) -> tuple:
        retries = 0
        while retries < max_retries:
            success = self.send_email(
                sender=sender,
                password=password,
                recipient=recipient,
                subject=subject,
                body=body,
                is_html=is_html,
                cc=cc,
                bcc=bcc
            )
            if success:
                return (recipient, True)
                
            retries += 1
            if retries < max_retries:
                time.sleep(delay * (2 ** retries))
                
        return (recipient, False)
        
    def _detect_email_provider(self, email: str) -> Optional[str]:
        domain = email.split('@')[-1].lower()
        if 'outlook.com' in domain or 'hotmail.com' in domain:
            return 'outlook'
        elif 'gmail.com' in domain:
            return 'gmail'
        elif '163.com' in domain:
            return '163'
        elif 'sina.com' in domain or 'sina.cn' in domain:
            return 'sina'
        elif '126.com' in domain or 'yeah.net' in domain:
            return 'netease'
        return None
        
    def load_recipients_from_csv(self, csv_file: str) -> List[Dict]:
        recipients = []
        with open(csv_file, mode='r', encoding='utf-8-sig') as file:
            reader = csv.DictReader(file)
            for row in reader:
                recipient = {
                    'email': row['email'],
                    'context': {k: v for k, v in row.items() if k != 'email'}
                }
                recipients.append(recipient)
        return recipients
        
    def __del__(self):
        self.connection_pool.release_all()

if __name__ == '__main__':
    # 示例用法
    sender = BulkEmailSender(max_workers=5)
    
    # 从CSV加载收件人列表
    recipients = sender.load_recipients_from_csv('recipients.csv')
    
    # 批量发送邮件
    results = sender.bulk_send(
        sender='your_email@example.com',
        password='your_password',
        recipients=recipients,
        template_name='welcome_email.html',
        subject='欢迎加入我们',
        is_html=True,
        delay=1.0
    )
    
    # 打印结果
    success_count = sum(1 for r in results.values() if r)
    print(f"发送完成: 成功 {success_count}, 失败 {len(results) - success_count}")

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档