merge wechatservice codes from github user lei195827 (#14)

master
sisuer 2024-04-16 21:59:50 +08:00 committed by GitHub
parent 66482fa779
commit 7cfa37a2e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 547 additions and 1 deletions

2
app.py
View File

@ -28,7 +28,7 @@ def sigterm_handler_wrap(_signo):
def start_channel(channel_name: str):
channel = channel_factory.create_channel(channel_name)
if channel_name in ["wx", "wxy", "terminal", "wechatmp", "wechatmp_service", "wechatcom_app", "wework",
const.FEISHU, const.DINGTALK]:
"wechatcom_service", const.FEISHU, const.DINGTALK]:
PluginManager().load_plugins()
if conf().get("use_linkai"):

View File

@ -30,6 +30,9 @@ def create_channel(channel_type) -> Channel:
elif channel_type == "wechatcom_app":
from channel.wechatcom.wechatcomapp_channel import WechatComAppChannel
ch = WechatComAppChannel()
elif channel_type == "wechatcom_service":
from channel.wechatcs.wechatcomservice_channel import WechatComServiceChannel
ch = WechatComServiceChannel()
elif channel_type == "wework":
from channel.wework.wework_channel import WeworkChannel
ch = WeworkChannel()

View File

@ -0,0 +1,79 @@
# 企业微信客服号channel
企业微信官方提供了客服、应用等API本channel将原本的企业微信应用修改为企业客服应用目前还没做兼容此channel仅能在企业微信客服应用中回复原本的应用不会回复。
本channel的类型名叫作`wechatcom_service`,用于企业微信客服应用,此channel大部分配置流程和`wechatcom_app`一样。
同样的,`wechatcom_service` channel支持插件系统和图片声音交互等能力除了无法加入群聊作为个人使用的私人助理已绰绰有余。
## 开始之前
- 在企业中确认自己拥有在企业内自建应用的权限。
- 如果没有权限或者是个人用户,也可创建未认证的企业。操作方式:登录手机企业微信,选择`创建/加入企业`来创建企业,类型请选择企业,企业名称可随意填写。
未认证的企业有100人的服务人数上限其他功能与认证企业没有差异。
本channel需安装的依赖与公众号一致需要安装`wechatpy`和`web.py`,它们包含在`requirements-optional.txt`中。
此外,如果你是`Linux`系统,除了`ffmpeg`还需要安装`amr`编码器,否则会出现找不到编码器的错误,无法正常使用语音功能。
- Ubuntu/Debian
```bash
apt-get install libavcodec-extra
```
- Alpine
需自行编译`ffmpeg`,在编译参数里加入`amr`编码器的支持
## 使用方法
1.查看企业ID
- 扫码登陆[企业微信后台](https://work.weixin.qq.com)
- 选择`我的企业`,点击`企业信息`,记住该`企业ID`
2.创建自建应用
- 选择应用管理, 在自建区选创建应用来创建企业自建应用
- 上传应用logo填写应用名称等项
- 创建应用后进入应用详情页面,记住`AgentId`和`Secert`
3.配置应用
- 在详情页点击`企业可信IP`的配置(没看到可以不管)填入你服务器的公网IP如果不知道可以先不填
- 点击`接收消息`下的启用API接收消息
- `URL`填写格式为`http://url:port/wxcomapp``port`是程序监听的端口默认是9898
如果是未认证的企业url可直接使用服务器的IP。如果是认证企业需要使用备案的域名可使用二级域名。
- `Token`可随意填写,停留在这个页面
- 在程序根目录`config.json`中增加配置(**去掉注释**`wechatcomapp_aes_key`是当前页面的`wechatcomapp_aes_key`
```python
"channel_type": "wechatcom_service",
"wechatcom_corp_id": "", # 企业微信公司的corpID
"wechatcomapp_token": "", # 企业微信app的token
"wechatcomapp_port": 9898, # 企业微信app的服务端口, 不需要端口转发
"wechatcomapp_secret": "", # 企业微信app的secret
"wechatcomapp_agent_id": "", # 企业微信app的agent_id
"wechatcomapp_aes_key": "", # 企业微信app的aes_key
```
- 运行程序,在页面中点击保存,保存成功说明验证成功
4.将微信客服接入程序
- 点击应用中的`微信客服`,在`客服账号`栏下创建一个新的账号,输入名称后点击创建。
- 点击`微信客服`应用详情中的`API`(默认状态下是折叠的),点击`可调用接口的应用`栏中的`修改`,勾选刚才创建的应用并点击确认。
- 点击`微信客服`应用详情中的`API`(默认状态下是折叠的),点击`可调用接口的应用`栏中的`前往配置`。点击`操作选项`中的`更换客服账号`,将之前配置好的应用指定你刚才创建的客服号。
- 顺利的情况下,回到`微信客服`应用详情中,点击客服账号,生成二维码让用户扫码即可跟AI客服对话。
向机器人发送消息,如果日志里出现报错:
```bash
Error code: 60020, message: "not allow to access from your ip, ...from ip: xx.xx.xx.xx"
```
意思是IP不可信需要参考上一步的`企业可信IP`配置把这里的IP加进去。

View File

@ -0,0 +1,348 @@
# -*- coding=utf-8 -*-
import io
import json
import os
import time
import requests
import web
from wechatpy.enterprise import create_reply, parse_message
from wechatpy.enterprise.crypto import WeChatCrypto
from wechatpy.enterprise.exceptions import InvalidCorpIdException
from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
from bridge.context import Context
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel
from channel.wechatcom.wechatcomapp_client import WechatComAppClient
from channel.wechatcs.wechatcomservice_message import WechatComServiceMessage
from common.log import logger
from common.singleton import singleton
from common.utils import compress_imgfile, fsize, split_string_by_utf8_length
from config import conf, subscribe_msg
from voice.audio_convert import any_to_amr, split_audio
import web
import json
import requests
import xml.etree.ElementTree as ET
from wechatpy.enterprise.crypto import WeChatCrypto
from wechatpy.exceptions import InvalidSignatureException
from wechatpy.enterprise.exceptions import InvalidCorpIdException
MAX_UTF8_LEN = 2048
@singleton
class WechatComServiceChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = []
def __init__(self):
super().__init__()
self.corp_id = conf().get("wechatcom_corp_id")
self.secret = conf().get("wechatcomapp_secret")
self.agent_id = conf().get("wechatcomapp_agent_id")
self.token = conf().get("wechatcomapp_token")
self.aes_key = conf().get("wechatcomapp_aes_key")
print(self.corp_id, self.secret, self.agent_id, self.token, self.aes_key)
logger.info(
"[wechatcs] init: corp_id: {}, secret: {}, agent_id: {}, token: {}, aes_key: {}".format(self.corp_id,
self.secret,
self.agent_id,
self.token,
self.aes_key)
)
self.crypto = WeChatCrypto(self.token, self.aes_key, self.corp_id)
self.client = WechatComAppClient(self.corp_id, self.secret)
def startup(self):
# start message listener
# wechatcomservice_channel.py
urls = ("/wxcomapp", "channel.wechatcs.wechatcomservice_channel.Query")
app = web.application(urls, globals(), autoreload=False)
port = conf().get("wechatcomapp_port", 9898)
web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port))
def send(self, reply: Reply, context: Context):
receiver = context["receiver"]
external_userid = context.kwargs['msg'].external_userid # from_user_id
open_kfid = context.kwargs['msg'].open_kfid # to_user_id,也就是客服id
if reply.type in [ReplyType.TEXT, ReplyType.ERROR, ReplyType.INFO]:
reply_text = reply.content
texts = split_string_by_utf8_length(reply_text, MAX_UTF8_LEN)
if len(texts) > 1:
logger.info("[wechatcs] text too long, split into {} parts".format(len(texts)))
# self.send_text_message(external_userid, open_kfid,
# content, msgid=None)
content = reply.content
self.send_text_message(external_userid=external_userid, open_kfid=open_kfid, content=content)
logger.info("[wechatcs] Do send text to {}: {}".format(receiver, reply_text))
elif reply.type == ReplyType.VOICE:
try:
media_ids = []
file_path = reply.content
amr_file = os.path.splitext(file_path)[0] + ".amr"
any_to_amr(file_path, amr_file)
duration, files = split_audio(amr_file, 60 * 1000)
if len(files) > 1:
logger.info(
"[wechatcs] voice too long {}s > 60s , split into {} parts".format(duration / 1000.0,
len(files)))
for path in files:
response = self.client.media.upload("voice", open(path, "rb"))
logger.debug("[wechatcs] upload voice response: {}".format(response))
media_ids.append(response["media_id"])
except WeChatClientException as e:
logger.error("[wechatcs] upload voice failed: {}".format(e))
return
try:
os.remove(file_path)
if amr_file != file_path:
os.remove(amr_file)
except Exception:
pass
for media_id in media_ids:
# self.client.message.send_voice(self.agent_id, receiver, media_id)
self.send_voice_message(external_userid=external_userid, open_kfid=open_kfid,
media_id=media_id)
time.sleep(1)
logger.info("[wechatcs] sendVoice={}, receiver={}".format(reply.content, receiver))
elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
img_url = reply.content
pic_res = requests.get(img_url, stream=True)
image_storage = io.BytesIO()
for block in pic_res.iter_content(1024):
image_storage.write(block)
sz = fsize(image_storage)
if sz >= 10 * 1024 * 1024:
logger.info("[wechatcs] image too large, ready to compress, sz={}".format(sz))
image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
logger.info("[wechatcs] image compressed, sz={}".format(fsize(image_storage)))
image_storage.seek(0)
try:
response = self.client.media.upload("image", image_storage)
logger.debug("[wechatcs] upload image response: {}".format(response))
except WeChatClientException as e:
logger.error("[wechatcs] upload image failed: {}".format(e))
return
# self.client.message.send_image(self.agent_id, receiver, response["media_id"])
self.send_image_message(external_userid=external_userid, open_kfid=open_kfid, media_id=response["media_id"])
logger.info("[wechatcs] sendImage url={}, receiver={}".format(img_url, receiver))
elif reply.type == ReplyType.IMAGE: # 从文件读取图片
image_storage = reply.content
sz = fsize(image_storage)
if sz >= 10 * 1024 * 1024:
logger.info("[wechatcs] image too large, ready to compress, sz={}".format(sz))
image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
logger.info("[wechatcs] image compressed, sz={}".format(fsize(image_storage)))
image_storage.seek(0)
try:
response = self.client.media.upload("image", image_storage)
logger.debug("[wechatcs] upload image response: {}".format(response))
except WeChatClientException as e:
logger.error("[wechatcs] upload image failed: {}".format(e))
return
# self.client.message.send_image(self.agent_id, receiver, response["media_id"])
self.send_image_message(external_userid=external_userid, open_kfid=open_kfid, media_id=response["media_id"])
logger.info("[wechatcs] sendImage, receiver={}".format(receiver))
elif reply.type == ReplyType.LINK:
# 解析 reply.content 中的 JSON 数据
try:
# link_data = json.loads(reply.content)
link_data = reply.content
image_storage = link_data["image"]
sz = fsize(image_storage)
if sz >= 10 * 1024 * 1024:
logger.info("[wechatcs] image too large, ready to compress, sz={}".format(sz))
image_storage = compress_imgfile(image_storage, 10 * 1024 * 1024 - 1)
logger.info("[wechatcs] image compressed, sz={}".format(fsize(image_storage)))
image_storage.seek(0)
try:
response = self.client.media.upload("image", image_storage)
logger.debug("[wechatcs] upload image response: {}".format(response))
except WeChatClientException as e:
logger.error("[wechatcs] upload image failed: {}".format(e))
return
link_data["thumb_media_id"] = response["media_id"]
# 此时已经不需要图片数据了
link_data.pop("image")
self.send_link_message(
external_userid=external_userid, open_kfid=open_kfid, link_data=link_data
)
logger.info("[WX] sendLinkCard, receiver={}".format(receiver))
except json.JSONDecodeError:
logger.error("Invalid JSON format in reply.content")
def send_text_message(self, external_userid, open_kfid, content, msgid=None):
url = f"https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg?access_token={self.client.fetch_access_token()}"
data = {
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "text",
"text": {"content": content}
}
if msgid:
data["msgid"] = msgid
response = requests.post(url, json=data)
return response.json()
def send_image_message(self, external_userid, open_kfid, msgid=None, media_id=None):
url = f"https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg?access_token={self.client.fetch_access_token()}"
data = {
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "image",
"image": {"media_id": media_id}
}
if msgid:
data["msgid"] = msgid
response = requests.post(url, json=data).json()
if response['errmsg'] == 'ok':
print(f"Send IMAGE Message Success")
else:
print(f"Something error:{response}")
return response
def send_voice_message(self, external_userid, open_kfid, media_id, msgid=None):
url = f"https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg?access_token={self.client.fetch_access_token()}"
data = {
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "voice",
"voice": {"media_id": media_id}
}
if msgid:
data["msgid"] = msgid
response = requests.post(url, json=data).json()
if response['errmsg'] == 'ok':
print(f"Send VOICE Message Success")
else:
print(f"Something error:{response}")
return response
def send_link_message(self, external_userid, open_kfid, link_data, msgid=None):
# 从 link_data 中提取信息
# 构造发送图文链接消息的数据
data = {
"touser": external_userid,
"open_kfid": open_kfid,
"msgtype": "link",
"link": link_data
}
if msgid:
data["msgid"] = msgid
# 发送图文链接消息
url = f"https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg?access_token={self.client.fetch_access_token()}"
response = requests.post(url, json=data).json()
if response['errmsg'] == 'ok':
print("Send LINK Message Success")
else:
print(f"Something error: {response}")
return response
def get_latest_message(self, token, open_kfid, next_cursor=""):
logger.debug(f"self.client.fetch_access_token():{self.client.fetch_access_token()}")
url = f"https://qyapi.weixin.qq.com/cgi-bin/kf/sync_msg?access_token={self.client.fetch_access_token()}"
data = {
"token": token,
"open_kfid": open_kfid,
"limit": 1000
}
if next_cursor:
data["cursor"] = next_cursor
response = requests.post(url, json=data)
response_data = response.json()
# if response_data["errcode"] == 0 and response_data["msg_list"]:
# return response_data["msg_list"][-1] # 返回最新的一条消息
# else:
# return None
# 检查是否有错误码并打印相关错误信息
if response_data.get("errcode") != 0:
logger.error(
f"[ERROR][{response_data.get('errcode')}][{response_data.get('errmsg')}] - Failed to fetch messages, more info at {response_data.get('more_info') or 'https://open.work.weixin.qq.com/devtool/query?e=' + str(response_data.get('errcode'))}")
return None
logger.debug(f"response_data:{response_data}")
if response_data.get("msg_list"):
return response_data["msg_list"][-1] # 返回最新的一条消息
else:
return None
class Query:
def GET(self):
channel = WechatComServiceChannel()
params = web.input()
logger.info("[wechatcom] receive GET params: {}".format(params))
try:
signature = params.msg_signature
timestamp = params.timestamp
nonce = params.nonce
echostr = params.echostr
echostr = channel.crypto.check_signature(signature, timestamp, nonce, echostr)
except InvalidSignatureException:
logger.error("[wechatcs] Invalid signature in GET request")
raise web.Forbidden()
return echostr
def POST(self):
channel = WechatComServiceChannel()
params = web.input()
raw_data = web.data()
logger.debug("[wechatcs] receive POST params: {}".format(params))
logger.debug("[wechatcs] raw data: {}".format(raw_data))
try:
signature = params.msg_signature
timestamp = params.timestamp
nonce = params.nonce
encrypted_message = channel.crypto.decrypt_message(raw_data, signature, timestamp, nonce)
# 解析XML格式的消息
xml_tree = ET.fromstring(encrypted_message)
msg_type = xml_tree.find("MsgType").text
event = xml_tree.find("Event").text if xml_tree.find("Event") is not None else ""
if msg_type == "event" and event == "kf_msg_or_event":
# 在这里处理特定事件
# 示例代码,根据实际情况修改
token = xml_tree.find("Token").text
open_kfid = xml_tree.find("OpenKfId").text
next_cursor = "" # 第一次请求时不需要提供 cursor
latest_message = channel.get_latest_message(token, open_kfid, next_cursor)
logger.debug(f"[wechatcs] latest_message: {latest_message}")
try:
wechatcom_copy_msg = WechatComServiceMessage(msg=latest_message, client=channel.client)
logger.debug(f"[wechatcs] wechatcom_copy_msg: {wechatcom_copy_msg}")
except NotImplementedError as e:
logger.debug("[wechatcs] " + str(e))
return "success"
context = channel._compose_context(
wechatcom_copy_msg.ctype,
wechatcom_copy_msg.content,
isgroup=False,
msg=wechatcom_copy_msg,
)
logger.debug(f"[wechatcs] context: {context}")
if context:
channel.produce(context)
logger.debug(f"[wechatcs] get latest message: {latest_message}")
return json.dumps({"status": "success"})
else:
return "Unsupported event type"
except (InvalidSignatureException, InvalidCorpIdException) as e:
logger.error(f"[wechatcs] Error: {e}")
raise web.Forbidden()
except ET.ParseError as e:
logger.error(f"[wechatcs] XML Parse Error: {e}")
return "Invalid XML format"

View File

@ -0,0 +1,59 @@
import threading
import time
import requests
import time
from wechatpy.enterprise import WeChatClient
from config import conf
class WeChatTokenManager:
def __init__(self):
self.access_token = None
self.expires_at = 0
def get_token(self):
current_time = time.time()
if self.access_token and self.expires_at - current_time > 60:
return self.access_token
corpid = conf().get("wechatcom_corp_id")
corpsecret = conf().get("wechatcomapp_secret")
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}"
response = requests.get(url).json()
if 'access_token' in response:
self.access_token = response['access_token']
self.expires_at = current_time + response['expires_in'] - 60
print(f'access_token:{self.access_token}')
return self.access_token
else:
raise Exception("Failed to retrieve access token")
# class WechatComAppClient(WeChatClient):
# def __init__(self, corp_id, secret, access_token=None, session=None, timeout=None, auto_retry=True):
# super(WechatComAppClient, self).__init__(corp_id, secret, access_token, session, timeout, auto_retry)
# self.fetch_access_token_lock = threading.Lock()
#
# def fetch_access_token(self): # 重载父类方法加锁避免多线程重复获取access_token
# with self.fetch_access_token_lock:
# access_token = self.session.get(self.access_token_key)
# if access_token:
# if not self.expires_at:
# return access_token
# timestamp = time.time()
# if self.expires_at - timestamp > 60:
# return access_token
# return super().fetch_access_token()
class WechatComServiceClient(WeChatClient):
def __init__(self, corp_id, secret, access_token=None, session=None, timeout=None, auto_retry=True):
super(WechatComServiceClient, self).__init__(corp_id, secret, access_token, session, timeout, auto_retry)
self.token_manager = WeChatTokenManager()
self.fetch_access_token_lock = threading.Lock()
def fetch_access_token(self):
with self.fetch_access_token_lock:
return self.token_manager.get_token()

View File

@ -0,0 +1,56 @@
from wechatpy.enterprise import WeChatClient
from bridge.context import ContextType
from channel.chat_message import ChatMessage
from common.log import logger
from common.tmp_dir import TmpDir
class WechatComServiceMessage(ChatMessage):
def __init__(self, msg, client: WeChatClient = None, is_group=False):
self.is_group = is_group
self.msg_id = msg['msgid']
self.external_userid = msg['external_userid']
self.create_time = msg['send_time']
self.origin = msg['origin']
self.msgtype = msg['msgtype']
self.open_kfid = msg['open_kfid']
if self.msgtype == "text":
self.content = msg['text']['content']
self.ctype = ContextType.TEXT
elif self.msgtype == "image":
self.ctype = ContextType.IMAGE
# 实现图像消息的处理逻辑
self.content = TmpDir().path() + msg.get("image", {}).get("media_id", "") + "." + 'jpg' # 假设图片格式为jpg
def download_image():
# 下载图片逻辑
response = client.media.download(msg.get("image", {}).get("media_id", ""))
if response.status_code == 200:
with open(self.content, "wb") as f:
f.write(response.content)
else:
logger.info(f"[wechatcom_copy] Failed to download image file, {response.content}")
# download_image()
self._prepare_fn = download_image
elif self.msgtype == "voice":
self.ctype = ContextType.VOICE
self.content = TmpDir().path() + msg.get("voice", {}).get("media_id", "") + "." + 'mp3' # content直接存临时目录路径
def download_voice():
# 如果响应状态码是200则将响应内容写入本地文件
response = client.media.download(msg.get("voice", {}).get("media_id", ""))
if response.status_code == 200:
with open(self.content, "wb") as f:
f.write(response.content)
else:
logger.info(f"[wechatcom_copy] Failed to download voice file, {response.content}")
# download_voice()
self._prepare_fn = download_voice
# 可以根据需要添加更多消息类型的处理
self.from_user_id = self.external_userid
self.to_user_id = self.open_kfid
self.other_user_id = self.external_userid

View File

@ -16,6 +16,7 @@ import datetime, random
class OpenaiVoice(Voice):
def __init__(self):
openai.api_key = conf().get("open_ai_api_key")
openai.api_base = conf().get("open_ai_api_base") or "https://api.openai.com/v1"
def voiceToText(self, voice_file):
logger.debug("[Openai] voice file name={}".format(voice_file))