Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中)

CSDN 2024-06-20 12:05:07 阅读 50

下载安装 wcferry 库

通过 pip 快速安装 wcferry

pip install wcferry

免责声明:仅供学习和技术研究使用,不得用于任何商业或非法行为,否则后果自负。

基本原理

当微信收到消息时,抢在微信处理(显示到页面)前,先让工具处理,处理完之后再交还给原来的处理模块。需要发送消息时,模拟微信发送消息,组装好消息体,调用微信发送消息的模块。获取联系人,则是遍历一块特定的内存空间。通过好友验证,则是组装好验证信息,调用微信的验证模块。数据库相关功能,则是通过获取到数据库句柄,基于 sqlite3 的接口来执行。

在这里插入图片描述

博客地址:微信机器人 DIY 从 0 到 1

Wcf 函数说明

WeChatFerry:一个玩微信的工具(函数说明)

函数名称 描述 返回类型
cleanup 关闭连接 回收资源 None
keep_running 阻塞进程(让 RPC 一直维持连接)
is_receiving_msg 是否已启动接收消息功能 bool
get_qrcode 获取登录二维码(已经登录则返回空字符串) str
is_login 检查登录状态 bool
get_self_wxid 获取登录账号的 wxid str
get_msg_types 获取所有消息类型 Dict
get_contacts 获取所有联系人 List[Dict]
get_friends 获取所有好友 List[Dict]
get_dbs 获取数据库 List[str]
get_tables 获取某数据库下的表 List[Dict]
get_user_info 获取登录账号个人信息 Dict
get_audio_msg 取语音消息并转成 MP3 str
send_text 发送文本消息(可 @) int
_download_file 下载文件 str
_process_path 处理路径(如果是网络路径则下载文件) str
send_image 发送图片(非线程安全) int
send_file 发送文件(非线程安全) int
send_xml 发送 XML int
send_emotion 发送表情 int
send_rich_text 发送富文本消息 int
send_pat_msg 拍一拍群友 int
forward_msg 转发消息 int
get_msg 从消息队列中获取消息 WxMsg
enable_receiving_msg 允许接收消息 bool
enable_recv_msg 允许接收消息(旧接口) bool
disable_recv_msg 停止接收消息 bool
query_sql 执行 SQL 查询 List[Dict]
accept_new_friend 接受好友申请 int
refresh_pyq 刷新朋友圈 int
download_attach 下载附件 int
get_info_by_wxid 通过 wxid 查询微信号昵称等信息 dict
revoke_msg 撤回消息 int
decrypt_image 解密图片 str
get_ocr_result 获取 OCR 结果 str
download_image 下载图片 str
add_chatroom_members 添加群成员 int
del_chatroom_members 删除群成员 int
invite_chatroom_members 邀请群成员 int
get_chatroom_members 获取群成员 Dict
get_alias_in_chatroom 获取群名片 str
receive_transfer 接收转账 int

电脑端检测登录微信

from wcferry import Wcfwcf = Wcf()

检测微信登陆状态

检查当前 PC 端微信登陆状态?

from wcferry import Wcfwcf = Wcf()print(wcf.is_login())

获取登录账号信息

获取当前 PC 端微信账号信息?

from wcferry import Wcfwcf = Wcf()print(wcf.get_user_info())

运行结果

{ 'wxid': 'wxid_***', 'name': '字里行间', 'mobile': '195********', 'home': 'C:\\Users\\Administrator\\Documents\\WeChat Files\\'}

开辟线程监听群消息

开启线程监听消息:判断是否是群消息?

from queue import Emptyfrom threading import Threadfrom wcferry import Wcf, WxMsgwcf = Wcf()def processMsg(msg: WxMsg): if msg.from_group(): print(msg.content)def enableReceivingMsg(): def innerWcFerryProcessMsg(): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() processMsg(msg) except Empty: continue except Exception as e: print(f"ERROR: { e}") wcf.enable_receiving_msg() Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()enableReceivingMsg()wcf.keep_running()

微信消息属性说明

class WxMsg() 微信消息属性说明

属性说明

字段名 类型 描述
type int 消息类型 可通过 get_msg_types 获取
id str 消息 id
xml str 消息 xml 部分
sender str 消息发送人
roomid str (仅群消息有)群 id
content str 消息内容
thumb str 视频或图片消息的缩略图路径
extra str 视频或图片消息的路径

消息类型

from wcferry import Wcfwcf = Wcf()print(wcf.get_msg_types())

消息类型编号 消息类型描述 属性
0 朋友圈消息 int
1 文字 int
3 图片 int
34 语音 int
37 好友确认 int
40 POSSIBLEFRIEND_MSG int
42 名片 int
43 视频 int
47 石头剪刀布 表情图片
48 位置 int
49 共享实时位置、文件、转账、链接 int
50 VOIPMSG int
51 微信初始化 int
52 VOIPNOTIFY int
53 VOIPINVITE int
62 小视频 int
66 微信红包 int
9999 SYSNOTICE int
10000 红包、系统消息 int
10002 撤回消息 int
1048625 搜狗表情 int
16777265 链接 int
436207665 微信红包 int
536936497 红包封面 int
754974769 视频号视频 int
771751985 视频号名片 int
822083633 引用消息 int
922746929 拍一拍 int
973078577 视频号直播 int
974127153 商品链接 int
975175729 视频号直播 int
1040187441 音乐链接 int
1090519089 文件 int

根据群名称查询群 wxid

特别注意:Wcf 没有提供根据群名称查询群 wxid 功能。我们可以先获取全部联系人数据(微信好友、微信群等等),基于 wxid 进行区分,因为微信群 wxid 后缀都是 “chatroom” 结尾。

from wcferry import Wcf, WxMsgwcf = Wcf()wcf_rooms = []for contact in wcf.get_contacts(): if contact['wxid'].endswith("chatroom"): wcf_rooms.append(contact)def get_chatroom_roomid(wcf_rooms: list, room_name: str): for room in wcf_rooms: if room['name'] == room_name: return room['wxid'] return Noneroom_id = get_chatroom_roomid(wcf_rooms=wcf_rooms, room_name="测试群")

定时发送群文件

如何进行定时发送群文件?通过 aspschedule 第三方库实现定时任务,然后调用 wcf.send_file 函数执行发送文件的消息。

from apscheduler.schedulers.background import BackgroundSchedulerfrom datetime import datetimefrom wcferry import Wcfwcf = Wcf()wcf_rooms = []for contact in wcf.get_contacts(): if contact['wxid'].endswith("chatroom"): wcf_rooms.append(contact)def get_chatroom_roomid(wcf_rooms: list, room_name: str): for room in wcf_rooms: if room['name'] == room_name: return room['wxid'] return Nonedef schedule_task_job(room_id: str, wcf: Wcf): wcf.send_file(path="test.txt", receiver=room_id)customize_time = "2024-05-09 09:10:10"customize_room = "唤醒手腕测试群"run_date = datetime.strptime(customize_time, "%Y-%m-%d %H:%M:%S")room_id = get_chatroom_roomid(wcf_rooms, customize_room)scheduler = BackgroundScheduler()scheduler.add_job(schedule_task_job, args=(room_id, wcf), run_date=run_date)scheduler.start()wcf.keep_running()

运行结果

在这里插入图片描述

监听保存语音消息

from queue import Emptyfrom threading import Threadfrom wcferry import Wcf, WxMsgwcf = Wcf()def processMsg(msg: WxMsg): if msg.from_group(): response = wcf.get_audio_msg(id=msg.id, dir=f"audio") print("语音地址:" + response)def enableReceivingMsg(): def innerWcFerryProcessMsg(): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() processMsg(msg) except Empty: continue except Exception as e: print(f"ERROR: { e}") wcf.enable_receiving_msg() Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()enableReceivingMsg()wcf.keep_running()

更新中······



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。