@@ -215,18 +215,21 @@ func (r *pipeSSERecorder) Write(p []byte) (int, error) {
215215
216216func (r * pipeSSERecorder ) Flush () {}
217217
218+ var sseDelimiter = []byte ("\n \n " )
219+
218220func (r * pipeSSERecorder ) consumeBody () {
219221 for {
220- body := r .body .String ()
221- idx := strings .Index (body , " \n \n " )
222+ buf := r .body .Bytes ()
223+ idx := bytes .Index (buf , sseDelimiter )
222224 if idx == - 1 {
223225 return
224226 }
225227
226- chunk := body [:idx ]
227- rest := body [idx + 2 :]
228+ chunk := string (buf [:idx ])
229+ remaining := make ([]byte , len (buf )- idx - 2 )
230+ copy (remaining , buf [idx + 2 :])
228231 r .body .Reset ()
229- _ , _ = r .body .WriteString ( rest )
232+ _ , _ = r .body .Write ( remaining )
230233
231234 r .consumeChunk (chunk )
232235 }
@@ -304,11 +307,7 @@ func ensurePipeChat(pipeObj *object.Pipe, incoming *pipepkg.IncomingMessage) (*o
304307 Store : storeName ,
305308 ModelProvider : "" ,
306309 Category : "Pipe" ,
307- Type : "AI" ,
308310 User : chatName ,
309- User1 : "" ,
310- User2 : "" ,
311- Users : []string {},
312311 ClientIp : "" ,
313312 UserAgent : fmt .Sprintf ("pipe/%s" , pipeObj .Type ),
314313 MessageCount : 0 ,
@@ -422,10 +421,18 @@ func (s *defaultPipeAnswerSender) WriteError(text string) error {
422421}
423422
424423func (s * defaultPipeAnswerSender ) CloseMessage (text string ) error {
425- if text == "" {
426- return nil
424+ finalText := text
425+ if finalText == "" {
426+ switch {
427+ case s .text != "" :
428+ finalText = s .text
429+ case s .errorText != "" :
430+ finalText = s .errorText
431+ default :
432+ return nil
433+ }
427434 }
428- return s .provider .SendMessage (s .chatId , text )
435+ return s .provider .SendMessage (s .chatId , finalText )
429436}
430437
431438func newStreamPipeAnswerSender (writer pipepkg.PipeMessageWriter ) * streamPipeAnswerSender {
0 commit comments