disabled ouput parser streaming

pull/1115/head
Henry 2023-10-31 14:07:29 +00:00
parent 8857530f29
commit f57a08f59b
11 changed files with 32 additions and 28 deletions

View File

@ -98,7 +98,7 @@ class LLMChain_Chains implements INode {
verbose: process.env.DEBUG === 'true'
})
const inputVariables = chain.prompt.inputVariables as string[] // ["product"]
const res = await runPrediction(inputVariables, chain, input, promptValues, options, nodeData, this.outputParser)
const res = await runPrediction(inputVariables, chain, input, promptValues, options, nodeData)
// eslint-disable-next-line no-console
console.log('\x1b[92m\x1b[1m\n*****OUTPUT PREDICTION*****\n\x1b[0m\x1b[0m')
// eslint-disable-next-line no-console
@ -121,7 +121,7 @@ class LLMChain_Chains implements INode {
this.outputParser = outputParser
}
promptValues = injectOutputParser(this.outputParser, chain, promptValues)
const res = await runPrediction(inputVariables, chain, input, promptValues, options, nodeData, this.outputParser)
const res = await runPrediction(inputVariables, chain, input, promptValues, options, nodeData)
// eslint-disable-next-line no-console
console.log('\x1b[93m\x1b[1m\n*****FINAL RESULT*****\n\x1b[0m\x1b[0m')
// eslint-disable-next-line no-console
@ -136,8 +136,7 @@ const runPrediction = async (
input: string,
promptValuesRaw: ICommonObject | undefined,
options: ICommonObject,
nodeData: INodeData,
outputParser: BaseOutputParser
nodeData: INodeData
) => {
const loggerHandler = new ConsoleCallbackHandler(options.logger)
const callbacks = await additionalCallbacks(nodeData, options)
@ -167,7 +166,7 @@ const runPrediction = async (
// All inputVariables have fixed values specified
const options = { ...promptValues }
if (isStreaming) {
const handler = new CustomChainHandler(socketIO, socketIOClientId, undefined, undefined, outputParser ? true : undefined)
const handler = new CustomChainHandler(socketIO, socketIOClientId)
const res = await chain.call(options, [loggerHandler, handler, ...callbacks])
return formatResponse(res?.text)
} else {
@ -183,11 +182,12 @@ const runPrediction = async (
[lastValue]: input
}
if (isStreaming) {
const handler = new CustomChainHandler(socketIO, socketIOClientId, undefined, undefined, outputParser ? true : undefined)
const handler = new CustomChainHandler(socketIO, socketIOClientId)
const res = await chain.call(options, [loggerHandler, handler, ...callbacks])
return formatResponse(res?.text)
} else {
const res = await chain.call(options, [loggerHandler, ...callbacks])
console.log('formatResponse=', formatResponse(res?.text))
return formatResponse(res?.text)
}
} else {
@ -195,7 +195,7 @@ const runPrediction = async (
}
} else {
if (isStreaming) {
const handler = new CustomChainHandler(socketIO, socketIOClientId, undefined, undefined, outputParser ? true : undefined)
const handler = new CustomChainHandler(socketIO, socketIOClientId)
const res = await chain.run(input, [loggerHandler, handler, ...callbacks])
return formatResponse(res)
} else {

View File

@ -1,4 +1,4 @@
import { getBaseClasses, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
import { getBaseClasses, INode, INodeData, INodeParams } from '../../../src'
import { BaseOutputParser } from 'langchain/schema/output_parser'
import { CommaSeparatedListOutputParser } from 'langchain/output_parsers'
import { CATEGORY } from '../OutputParserHelpers'
@ -29,14 +29,13 @@ class CSVListOutputParser implements INode {
label: 'Autofix',
name: 'autofixParser',
type: 'boolean',
rows: 4,
optional: true,
description: 'In the event that the first call fails, will make another call to the model to fix any errors.'
}
]
}
// eslint-disable-next-line unused-imports/no-unused-vars
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
async init(nodeData: INodeData): Promise<any> {
const autoFix = nodeData.inputs?.autofixParser as boolean
const commaSeparatedListOutputParser = new CommaSeparatedListOutputParser()

View File

Before

Width:  |  Height:  |  Size: 8.3 KiB

After

Width:  |  Height:  |  Size: 8.3 KiB

View File

@ -1,4 +1,4 @@
import { getBaseClasses, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
import { getBaseClasses, INode, INodeData, INodeParams } from '../../../src'
import { BaseOutputParser } from 'langchain/schema/output_parser'
import { CustomListOutputParser as LangchainCustomListOutputParser } from 'langchain/output_parsers'
import { CATEGORY } from '../OutputParserHelpers'
@ -44,19 +44,19 @@ class CustomListOutputParser implements INode {
label: 'Autofix',
name: 'autofixParser',
type: 'boolean',
rows: 4,
optional: true,
description: 'In the event that the first call fails, will make another call to the model to fix any errors.'
}
]
}
// eslint-disable-next-line unused-imports/no-unused-vars
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
async init(nodeData: INodeData): Promise<any> {
const separator = nodeData.inputs?.separator as string
const lengthStr = nodeData.inputs?.length as string
const autoFix = nodeData.inputs?.autofixParser as boolean
let length = 5
if (lengthStr) length = parseInt(lengthStr, 10)
const parser = new LangchainCustomListOutputParser({ length: length, separator: separator })
Object.defineProperty(parser, 'autoFix', {
enumerable: true,

View File

Before

Width:  |  Height:  |  Size: 4.9 KiB

After

Width:  |  Height:  |  Size: 4.9 KiB

View File

@ -1,4 +1,4 @@
import { getBaseClasses, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
import { getBaseClasses, INode, INodeData, INodeParams } from '../../../src'
import { BaseOutputParser } from 'langchain/schema/output_parser'
import { StructuredOutputParser as LangchainStructuredOutputParser } from 'langchain/output_parsers'
import { CATEGORY } from '../OutputParserHelpers'
@ -53,14 +53,13 @@ class StructuredOutputParser implements INode {
label: 'Autofix',
name: 'autofixParser',
type: 'boolean',
rows: 4,
optional: true,
description: 'In the event that the first call fails, will make another call to the model to fix any errors.'
}
]
}
// eslint-disable-next-line unused-imports/no-unused-vars
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
async init(nodeData: INodeData): Promise<any> {
const structureType = nodeData.inputs?.structureType as string
const structure = nodeData.inputs?.structure as string
const autoFix = nodeData.inputs?.autofixParser as boolean

View File

Before

Width:  |  Height:  |  Size: 3.8 KiB

After

Width:  |  Height:  |  Size: 3.8 KiB

View File

@ -152,15 +152,13 @@ export class CustomChainHandler extends BaseCallbackHandler {
skipK = 0 // Skip streaming for first K numbers of handleLLMStart
returnSourceDocuments = false
cachedResponse = true
isOutputParser = false
constructor(socketIO: Server, socketIOClientId: string, skipK?: number, returnSourceDocuments?: boolean, isOutputParser?: boolean) {
constructor(socketIO: Server, socketIOClientId: string, skipK?: number, returnSourceDocuments?: boolean) {
super()
this.socketIO = socketIO
this.socketIOClientId = socketIOClientId
this.skipK = skipK ?? this.skipK
this.returnSourceDocuments = returnSourceDocuments ?? this.returnSourceDocuments
this.isOutputParser = isOutputParser ?? this.isOutputParser
}
handleLLMStart() {
@ -173,7 +171,6 @@ export class CustomChainHandler extends BaseCallbackHandler {
if (!this.isLLMStarted) {
this.isLLMStarted = true
this.socketIO.to(this.socketIOClientId).emit('start', token)
if (this.isOutputParser) this.socketIO.to(this.socketIOClientId).emit('token', '```json')
}
this.socketIO.to(this.socketIOClientId).emit('token', token)
}

View File

@ -998,10 +998,7 @@ export class App {
analytic: chatflow.analytic
})
result = typeof result === 'string' ? { text: result } : result
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
return res.json(result)
} catch (e: any) {
logger.error('[server]: Error:', e)

View File

@ -804,7 +804,16 @@ export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNod
isValidChainOrAgent = whitelistAgents.includes(endingNodeData.name)
}
return isChatOrLLMsExist && isValidChainOrAgent
// If no output parser, flow is available to stream
let isOutputParserExist = false
for (const flowNode of reactFlowNodes) {
const data = flowNode.data
if (data.category.includes('Output Parser')) {
isOutputParserExist = true
}
}
return isChatOrLLMsExist && isValidChainOrAgent && !isOutputParserExist
}
/**

View File

@ -164,17 +164,20 @@ export const ChatMessage = ({ open, chatflowid, isDialog }) => {
{ message: data.text, sourceDocuments: data.sourceDocuments, type: 'apiMessage' }
])
}
console.log('here1=', data.text)
addChatMessage(data.text, 'apiMessage', data.sourceDocuments)
} else if (typeof data === 'object' && data.json) {
const text = '```json' + JSON.stringify(data.json, null, 2)
const text = '```json\n' + JSON.stringify(data.json, null, 2)
if (!isChatFlowAvailableToStream) {
setMessages((prevMessages) => [...prevMessages, { message: text, type: 'apiMessage' }])
}
console.log('here2=', text)
addChatMessage(text, 'apiMessage')
} else {
if (!isChatFlowAvailableToStream) {
setMessages((prevMessages) => [...prevMessages, { message: data, type: 'apiMessage' }])
}
console.log('here3=', data)
addChatMessage(data, 'apiMessage')
}
setLoading(false)