feat: support coze bot

master
Han Fangyuan 2024-03-31 00:36:46 +08:00
parent 5563b0ec2a
commit 4bc2d4da81
6 changed files with 149 additions and 3 deletions

View File

@ -60,5 +60,8 @@ def create_bot(bot_type):
from bot.zhipuai.zhipuai_bot import ZHIPUAIBot
return ZHIPUAIBot()
elif bot_type == const.COZE:
from bot.bytedance.bytedance_coze_bot import ByteDanceCozeBot
return ByteDanceCozeBot()
raise RuntimeError

View File

@ -0,0 +1,136 @@
# encoding:utf-8
import time
from typing import List, Tuple
import requests
from requests import Response
from bot.bot import Bot
from bot.chatgpt.chat_gpt_session import ChatGPTSession
from bot.session_manager import SessionManager
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
from common.log import logger
from config import conf
class ByteDanceCozeBot(Bot):
def __init__(self):
super().__init__()
self.sessions = SessionManager(ChatGPTSession, model=conf().get("model") or "coze")
def reply(self, query, context=None):
# acquire reply content
if context.type == ContextType.TEXT:
logger.info("[COZE] query={}".format(query))
session_id = context["session_id"]
session = self.sessions.session_query(query, session_id)
logger.debug("[COZE] session query={}".format(session.messages))
reply_content, err = self._reply_text(session_id, session)
if err is not None:
logger.error("[COZE] reply error={}".format(err))
return Reply(ReplyType.ERROR, "我暂时遇到了一些问题,请您稍后重试~")
logger.debug(
"[COZE] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format(
session.messages,
session_id,
reply_content["content"],
reply_content["completion_tokens"],
)
)
return Reply(ReplyType.TEXT, reply_content["content"])
else:
reply = Reply(ReplyType.ERROR, "Bot不支持处理{}类型的消息".format(context.type))
return reply
def _get_api_base_url(self):
return conf().get("coze_api_base", "https://api.coze.cn/open_api/v2")
def _get_headers(self):
return {
'Authorization': f"Bearer {conf().get('coze_api_key', '')}"
}
def _get_payload(self, user: str, query: str, chat_history: List[dict]):
return {
'bot_id': conf().get('coze_bot_id'),
"user": user,
"query": query,
"chat_history": chat_history,
"stream": False
}
def _reply_text(self, session_id: str, session: ChatGPTSession, retry_count=0):
try:
query, chat_history = self._convert_messages_format(session.messages)
base_url = self._get_api_base_url()
chat_url = f'{base_url}/chat'
headers = self._get_headers()
payload = self._get_payload(session.session_id, query, chat_history)
response = requests.post(chat_url, headers=headers, json=payload)
if response.status_code != 200:
error_info = f"[COZE] response text={response.text} status_code={response.status_code}"
logger.warn(error_info)
return None, error_info
answer, err = self._get_completion_content(response)
if err is not None:
return None, err
completion_tokens, total_tokens = self._calc_tokens(session.messages, answer)
return {
"total_tokens": total_tokens,
"completion_tokens": completion_tokens,
"content": answer
}, None
except Exception as e:
if retry_count < 2:
time.sleep(3)
logger.warn(f"[COZE] Exception: {repr(e)}{retry_count + 1}次重试")
return self._reply_text(session_id, session, retry_count + 1)
else:
return None, f"[COZE] Exception: {repr(e)} 超过最大重试次数"
def _convert_messages_format(self, messages) -> Tuple[str, List[dict]]:
# [
# {"role":"user","content":"你好""content_type":"text"},
# {"role":"assistant","type":"answer","content":"你好,请问有什么可以帮助你的吗?""content_type":"text"}
# ]
chat_history = []
for message in messages:
role = message.get('role')
if role == 'user':
content = message.get('content')
chat_history.append({"role":"user", "content": content, "content_type":"text"})
elif role == 'assistant':
content = message.get('content')
chat_history.append({"role":"assistant", "type":"answer", "content": content, "content_type":"text"})
elif role =='system':
# TODO: deal system message
pass
user_message = chat_history.pop()
if user_message.get('role') != 'user' or user_message.get('content', '') == '':
raise Exception('no user message')
query = user_message.get('content')
logger.debug("[COZE] converted coze messages: {}".format([item for item in chat_history]))
logger.debug("[COZE] user content as query: {}".format(query))
return query, chat_history
def _get_completion_content(self, response: Response):
json_response = response.json()
if json_response['msg'] != 'success':
return None, f"[COZE] Error: {json_response['msg']}"
answer = None
for message in json_response['messages']:
if message.get('type') == 'answer':
answer = message.get('content')
break
if not answer:
return None, "[COZE] Error: empty answer"
return answer, None
def _calc_tokens(self, messages, answer):
# 简单统计token
completion_tokens = len(answer)
prompt_tokens = 0
for message in messages:
prompt_tokens += len(message["content"])
return completion_tokens, prompt_tokens + completion_tokens

View File

@ -35,6 +35,8 @@ class Bridge(object):
self.btype["chat"] = const.DIFY
if model_type in [const.ZHIPU_AI]:
self.btype["chat"] = const.ZHIPU_AI
if model_type in [const.COZE]:
self.btype["chat"] = const.COZE
if conf().get("use_linkai") and conf().get("linkai_api_key"):
self.btype["chat"] = const.LINKAI

View File

@ -10,6 +10,7 @@ QWEN = "qwen"
GEMINI = "gemini"
DIFY = "dify"
ZHIPU_AI = "glm-4"
COZE = "coze"
# model
@ -22,7 +23,7 @@ TTS_1 = "tts-1"
TTS_1_HD = "tts-1-hd"
MODEL_LIST = ["gpt-3.5-turbo", "gpt-3.5-turbo-16k", "gpt-4", "wenxin", "wenxin-4", "xunfei", "claude", "gpt-4-turbo",
"gpt-4-turbo-preview", "gpt-4-1106-preview", GPT4_TURBO_PREVIEW, QWEN, GEMINI, DIFY, ZHIPU_AI]
"gpt-4-turbo-preview", "gpt-4-1106-preview", GPT4_TURBO_PREVIEW, QWEN, GEMINI, DIFY, ZHIPU_AI, COZE]
# channel
FEISHU = "feishu"

View File

@ -80,6 +80,10 @@ available_setting = {
"dify_api_key": "app-xxx",
"dify_agent": True, # dify助手类型如果是基础助手请设置为False智能助手请设置为True默认为True
"dify_convsersation_max_messages": 5, # dify目前不支持设置历史消息长度暂时使用超过最大消息数清空会话的策略缺点是没有滑动窗口会突然丢失历史消息
# coze配置
"coze_api_base": "https://api.coze.cn/open_api/v2",
"coze_api_key": "xxx",
"coze_bot_id": "xxx",
# wework的通用配置
"wework_smart": True, # 配置wework是否使用已登录的企业微信False为多开
# 语音设置

View File

@ -313,7 +313,7 @@ class Godcmd(Plugin):
except Exception as e:
ok, result = False, "你没有设置私有GPT模型"
elif cmd == "reset":
if bottype in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.LINKAI, const.BAIDU, const.XUNFEI, const.QWEN, const.GEMINI, const.DIFY]:
if bottype in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.LINKAI, const.BAIDU, const.XUNFEI, const.QWEN, const.GEMINI, const.DIFY, const.COZE]:
bot.sessions.clear_session(session_id)
if Bridge().chat_bots.get(bottype):
Bridge().chat_bots.get(bottype).sessions.clear_session(session_id)
@ -339,7 +339,7 @@ class Godcmd(Plugin):
ok, result = True, "配置已重载"
elif cmd == "resetall":
if bottype in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.LINKAI,
const.BAIDU, const.XUNFEI, const.QWEN, const.GEMINI, const.DIFY]:
const.BAIDU, const.XUNFEI, const.QWEN, const.GEMINI, const.DIFY, const.COZE]:
channel.cancel_all_session()
bot.sessions.clear_all_session()
ok, result = True, "重置所有会话成功"