下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1138
代码说明:这个微信自动化群发工具包含登录模块、群组管理、消息发送、定时任务调度等功能。使用itchat库实现微信自动化操作,通过schedule库实现定时任务功能。配置文件支持黑名单管理和发送间隔设置。使用时需先扫码登录微信网页版。
import time
import schedule
from datetime import datetime
import itchat
from itchat.content import TEXT
import threading
import json
import os
class WeChatAutoSender:
def __init__(self):
self.config_file = "config.json"
self.message_queue = []
self.is_logged_in = False
self.load_config()
def load_config(self):
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
self.config = json.load(f)
else:
self.config = {
"scheduled_tasks": [],
"blacklist": [],
"send_interval": 1,
"max_retry": 3
}
def save_config(self):
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=4)
def login(self):
itchat.auto_login(hotReload=True, statusStorageDir='itchat.pkl')
self.is_logged_in = True
print(f"[{datetime.now()}] 登录成功")
def get_all_groups(self):
if not self.is_logged_in:
self.login()
return itchat.get_chatrooms(update=True)
def send_to_group(self, group_name, message):
if not self.is_logged_in:
self.login()
group = itchat.search_chatrooms(name=group_name)
if group:
retry = 0
while retry < self.config['max_retry']:
try:
itchat.send_msg(msg=message, toUserName=group[0]['UserName'])
print(f"[{datetime.now()}] 发送成功到群: {group_name}")
time.sleep(self.config['send_interval'])
return True
except Exception as e:
print(f"[{datetime.now()}] 发送失败: {str(e)}")
retry += 1
time.sleep(2)
return False
def send_to_all_groups(self, message, exclude_list=None):
if exclude_list is None:
exclude_list = self.config['blacklist']
groups = self.get_all_groups()
success_count = 0
for group in groups:
if group['NickName'] not in exclude_list:
if self.send_to_group(group['NickName'], message):
success_count += 1
return success_count
def add_scheduled_task(self, task_time, message, repeat_days=None):
task = {
"time": task_time,
"message": message,
"repeat_days": repeat_days or []
}
self.config['scheduled_tasks'].append(task)
self.save_config()
def job():
print(f"[{datetime.now()}] 执行定时任务: {task_time}")
self.send_to_all_groups(message)
schedule.every().day.at(task_time).do(job)
if repeat_days:
for day in repeat_days:
getattr(schedule.every(), day.lower()).at(task_time).do(job)
def start_scheduler(self):
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
print(f"[{datetime.now()}] 定时任务调度器已启动")
def run(self):
self.login()
self.start_scheduler()
itchat.run()
"scheduled_tasks": [
{
"time": "09:00",
"message": "早安问候!祝大家今天工作顺利!",
"repeat_days": ["monday", "tuesday", "wednesday", "thursday", "friday"]
},
{
"time": "18:00",
"message": "今日工作结束提醒,记得填写日报哦!",
"repeat_days": ["monday", "tuesday", "wednesday", "thursday", "friday"]
}
],
"blacklist": ["家人群", "测试群"],
"send_interval": 1,
"max_retry": 3
}
from wechat_auto_sender import WeChatAutoSender
sender = WeChatAutoSender()
# 立即发送消息到所有群(黑名单除外)
sender.send_to_all_groups("重要通知:明天公司停电,全体居家办公!")
# 添加定时任务(每天09:30发送)
sender.add_scheduled_task("09:30", "每日晨会提醒,请准时参加!")
# 添加每周特定日期的定时任务(每周一三五的12:00发送)
sender.add_scheduled_task("12:00", "午休时间到啦!", ["monday", "wednesday", "friday"])
# 启动主程序(保持运行状态)
sender.run()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。