Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中)
CSDN 2024-06-20 12:05:07 阅读 50
下载安装 wcferry 库
通过 pip 快速安装 wcferry
pip install wcferry
免责声明:仅供学习和技术研究使用,不得用于任何商业或非法行为,否则后果自负。
基本原理
当微信收到消息时,抢在微信处理(显示到页面)前,先让工具处理,处理完之后再交还给原来的处理模块。需要发送消息时,模拟微信发送消息,组装好消息体,调用微信发送消息的模块。获取联系人,则是遍历一块特定的内存空间。通过好友验证,则是组装好验证信息,调用微信的验证模块。数据库相关功能,则是通过获取到数据库句柄,基于 sqlite3 的接口来执行。
博客地址:微信机器人 DIY 从 0 到 1
Wcf 函数说明
WeChatFerry:一个玩微信的工具(函数说明)
函数名称 | 描述 | 返回类型 |
---|---|---|
cleanup | 关闭连接 回收资源 | None |
keep_running | 阻塞进程(让 RPC 一直维持连接) | |
is_receiving_msg | 是否已启动接收消息功能 | bool |
get_qrcode | 获取登录二维码(已经登录则返回空字符串) | str |
is_login | 检查登录状态 | bool |
get_self_wxid | 获取登录账号的 wxid | str |
get_msg_types | 获取所有消息类型 | Dict |
get_contacts | 获取所有联系人 | List[Dict] |
get_friends | 获取所有好友 | List[Dict] |
get_dbs | 获取数据库 | List[str] |
get_tables | 获取某数据库下的表 | List[Dict] |
get_user_info | 获取登录账号个人信息 | Dict |
get_audio_msg | 取语音消息并转成 MP3 | str |
send_text | 发送文本消息(可 @) | int |
_download_file | 下载文件 | str |
_process_path | 处理路径(如果是网络路径则下载文件) | str |
send_image | 发送图片(非线程安全) | int |
send_file | 发送文件(非线程安全) | int |
send_xml | 发送 XML | int |
send_emotion | 发送表情 | int |
send_rich_text | 发送富文本消息 | int |
send_pat_msg | 拍一拍群友 | int |
forward_msg | 转发消息 | int |
get_msg | 从消息队列中获取消息 | WxMsg |
enable_receiving_msg | 允许接收消息 | bool |
enable_recv_msg | 允许接收消息(旧接口) | bool |
disable_recv_msg | 停止接收消息 | bool |
query_sql | 执行 SQL 查询 | List[Dict] |
accept_new_friend | 接受好友申请 | int |
refresh_pyq | 刷新朋友圈 | int |
download_attach | 下载附件 | int |
get_info_by_wxid | 通过 wxid 查询微信号昵称等信息 | dict |
revoke_msg | 撤回消息 | int |
decrypt_image | 解密图片 | str |
get_ocr_result | 获取 OCR 结果 | str |
download_image | 下载图片 | str |
add_chatroom_members | 添加群成员 | int |
del_chatroom_members | 删除群成员 | int |
invite_chatroom_members | 邀请群成员 | int |
get_chatroom_members | 获取群成员 | Dict |
get_alias_in_chatroom | 获取群名片 | str |
receive_transfer | 接收转账 | int |
电脑端检测登录微信
from wcferry import Wcfwcf = Wcf()
检测微信登陆状态
检查当前 PC 端微信登陆状态?
from wcferry import Wcfwcf = Wcf()print(wcf.is_login())
获取登录账号信息
获取当前 PC 端微信账号信息?
from wcferry import Wcfwcf = Wcf()print(wcf.get_user_info())
运行结果
{ 'wxid': 'wxid_***', 'name': '字里行间', 'mobile': '195********', 'home': 'C:\\Users\\Administrator\\Documents\\WeChat Files\\'}
开辟线程监听群消息
开启线程监听消息:判断是否是群消息?
from queue import Emptyfrom threading import Threadfrom wcferry import Wcf, WxMsgwcf = Wcf()def processMsg(msg: WxMsg): if msg.from_group(): print(msg.content)def enableReceivingMsg(): def innerWcFerryProcessMsg(): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() processMsg(msg) except Empty: continue except Exception as e: print(f"ERROR: { e}") wcf.enable_receiving_msg() Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()enableReceivingMsg()wcf.keep_running()
微信消息属性说明
class WxMsg() 微信消息属性说明
属性说明
字段名 | 类型 | 描述 |
---|---|---|
type | int | 消息类型 可通过 get_msg_types 获取 |
id | str | 消息 id |
xml | str | 消息 xml 部分 |
sender | str | 消息发送人 |
roomid | str | (仅群消息有)群 id |
content | str | 消息内容 |
thumb | str | 视频或图片消息的缩略图路径 |
extra | str | 视频或图片消息的路径 |
消息类型
from wcferry import Wcfwcf = Wcf()print(wcf.get_msg_types())
消息类型编号 | 消息类型描述 | 属性 |
---|---|---|
0 | 朋友圈消息 | int |
1 | 文字 | int |
3 | 图片 | int |
34 | 语音 | int |
37 | 好友确认 | int |
40 | POSSIBLEFRIEND_MSG | int |
42 | 名片 | int |
43 | 视频 | int |
47 | 石头剪刀布 | 表情图片 |
48 | 位置 | int |
49 | 共享实时位置、文件、转账、链接 | int |
50 | VOIPMSG | int |
51 | 微信初始化 | int |
52 | VOIPNOTIFY | int |
53 | VOIPINVITE | int |
62 | 小视频 | int |
66 | 微信红包 | int |
9999 | SYSNOTICE | int |
10000 | 红包、系统消息 | int |
10002 | 撤回消息 | int |
1048625 | 搜狗表情 | int |
16777265 | 链接 | int |
436207665 | 微信红包 | int |
536936497 | 红包封面 | int |
754974769 | 视频号视频 | int |
771751985 | 视频号名片 | int |
822083633 | 引用消息 | int |
922746929 | 拍一拍 | int |
973078577 | 视频号直播 | int |
974127153 | 商品链接 | int |
975175729 | 视频号直播 | int |
1040187441 | 音乐链接 | int |
1090519089 | 文件 | int |
根据群名称查询群 wxid
特别注意:Wcf 没有提供根据群名称查询群 wxid 功能。我们可以先获取全部联系人数据(微信好友、微信群等等),基于 wxid 进行区分,因为微信群 wxid 后缀都是 “chatroom” 结尾。
from wcferry import Wcf, WxMsgwcf = Wcf()wcf_rooms = []for contact in wcf.get_contacts(): if contact['wxid'].endswith("chatroom"): wcf_rooms.append(contact)def get_chatroom_roomid(wcf_rooms: list, room_name: str): for room in wcf_rooms: if room['name'] == room_name: return room['wxid'] return Noneroom_id = get_chatroom_roomid(wcf_rooms=wcf_rooms, room_name="测试群")
定时发送群文件
如何进行定时发送群文件?通过 aspschedule 第三方库实现定时任务,然后调用 wcf.send_file 函数执行发送文件的消息。
from apscheduler.schedulers.background import BackgroundSchedulerfrom datetime import datetimefrom wcferry import Wcfwcf = Wcf()wcf_rooms = []for contact in wcf.get_contacts(): if contact['wxid'].endswith("chatroom"): wcf_rooms.append(contact)def get_chatroom_roomid(wcf_rooms: list, room_name: str): for room in wcf_rooms: if room['name'] == room_name: return room['wxid'] return Nonedef schedule_task_job(room_id: str, wcf: Wcf): wcf.send_file(path="test.txt", receiver=room_id)customize_time = "2024-05-09 09:10:10"customize_room = "唤醒手腕测试群"run_date = datetime.strptime(customize_time, "%Y-%m-%d %H:%M:%S")room_id = get_chatroom_roomid(wcf_rooms, customize_room)scheduler = BackgroundScheduler()scheduler.add_job(schedule_task_job, args=(room_id, wcf), run_date=run_date)scheduler.start()wcf.keep_running()
运行结果
监听保存语音消息
from queue import Emptyfrom threading import Threadfrom wcferry import Wcf, WxMsgwcf = Wcf()def processMsg(msg: WxMsg): if msg.from_group(): response = wcf.get_audio_msg(id=msg.id, dir=f"audio") print("语音地址:" + response)def enableReceivingMsg(): def innerWcFerryProcessMsg(): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() processMsg(msg) except Empty: continue except Exception as e: print(f"ERROR: { e}") wcf.enable_receiving_msg() Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()enableReceivingMsg()wcf.keep_running()
更新中······
上一篇: 【Matlab算法】牛顿法(Newton‘s Method)(附MATLAB完整代码)
下一篇: 关于idea-Java-servlet-Tomcat-Web开发中出现404NOT FOUND问题的解决
本文标签
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。