Files
Memoh/internal/channel/adapters/matrix/stream.go
T
BBQ d3bf6bc90a fix(channel,attachment): channel quality refactor & attachment pipeline fixes (#349)
* feat(channel): add DingTalk channel adapter

- Add DingTalk channel adapter (`internal/channel/adapters/dingtalk/`) using dingtalk-stream-sdk-go, supporting inbound message receiving and outbound text/markdown reply
- Register DingTalk adapter in cmd/agent and cmd/memoh
- Add go.mod dependency: github.com/memohai/dingtalk-stream-sdk-go
- Add Dingtalk and Wecom SVG icons and Vue components to @memohai/icon
- Refactor existing icon components to remove redundant inline wrappers
- Add `channelTypeDisplayName` util for consistent channel label resolution
- Add DingTalk/WeCom i18n entries (en/zh) for types and typesShort
- Extend channel-icon, bot-channels, channel-settings-panel to support dingtalk/wecom
- Use channelTypeDisplayName in profile page to replace ad-hoc i18n lookup

* fix(channel,attachment): channel quality refactor & attachment pipeline fixes

Channel module:
- Fix RemoveAdapter not cleaning connectionMeta (stale status leak)
- Fix preparedAttachmentTypeFromMime misclassifying image/gif
- Fix sleepWithContext time.After goroutine/timer leak
- Export IsDataURL/IsHTTPURL/IsDataPath, dedup across packages
- Cache OutboundPolicy in managerOutboundStream to avoid repeated lookups
- Split OutboundAttachmentStore: extract ContainerAttachmentIngester interface
- Add ManagerOption funcs (WithInboundQueueSize, WithInboundWorkers, WithRefreshInterval)
- Add thread-safety docs on OutboundStream / managerOutboundStream
- Add debug logs on successful send/edit paths
- Expand outbound_prepare_test.go with 21 new cases
- Convert no-receiver adapter helpers to package-level funcs; drop unused params

DingTalk adapter:
- Implement AttachmentResolver: download inbound media via /v1.0/robot/messageFiles/download
- Fix pure-image inbound messages failing due to missing resolver

Attachment pipeline:
- Fix images invisible to LLM in pipeline (DCP) path: inject InlineImages into
  last user message when cfg.Query is empty
- Fix public_url fallback: skip direct URL-to-LLM when ContentHash is set,
  always prefer inlined persisted asset
- Inject path: carry ImageParts through agent.InjectMessage; inline persisted
  attachments in resolver inject goroutine so mid-stream images reach the model
- Fix ResolveMime for images: prefer content-sniffed MIME over platform-declared
  MIME (fixes Feishu sending image/png header for actual JPEG content → API 400)
2026-04-09 14:36:11 +08:00

232 lines
5.1 KiB
Go

package matrix
import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/memohai/memoh/internal/channel"
)
type matrixOutboundStream struct {
adapter *MatrixAdapter
cfg Config
target string
reply *channel.ReplyRef
closed atomic.Bool
mu sync.Mutex
roomID string
originalEventID string
rawBuffer strings.Builder
lastText string
lastFormat channel.MessageFormat
lastEditedAt time.Time
}
func (s *matrixOutboundStream) Push(ctx context.Context, event channel.PreparedStreamEvent) error {
if s == nil || s.adapter == nil {
return errors.New("matrix stream not configured")
}
if s.closed.Load() {
return errors.New("matrix stream is closed")
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
switch event.Type {
case channel.StreamEventStatus,
channel.StreamEventPhaseStart,
channel.StreamEventToolCallEnd,
channel.StreamEventAgentStart,
channel.StreamEventAgentEnd,
channel.StreamEventProcessingStarted,
channel.StreamEventProcessingCompleted,
channel.StreamEventProcessingFailed:
return nil
case channel.StreamEventPhaseEnd:
if event.Phase != channel.StreamPhaseText {
return nil
}
s.mu.Lock()
text := strings.TrimSpace(s.rawBuffer.String())
s.mu.Unlock()
return s.upsertText(ctx, text, channel.MessageFormatPlain, true)
case channel.StreamEventToolCallStart:
s.resetMessageState()
return nil
case channel.StreamEventDelta:
if event.Phase == channel.StreamPhaseReasoning || event.Delta == "" {
return nil
}
s.mu.Lock()
s.rawBuffer.WriteString(event.Delta)
s.mu.Unlock()
return nil
case channel.StreamEventError:
errText := strings.TrimSpace(event.Error)
if errText == "" {
return nil
}
return s.upsertText(ctx, "Error: "+errText, channel.MessageFormatPlain, true)
case channel.StreamEventAttachment:
return s.pushAttachments(ctx, event.Attachments)
case channel.StreamEventFinal:
if event.Final == nil {
return errors.New("matrix stream final payload is required")
}
text := strings.TrimSpace(event.Final.Message.Message.PlainText())
format := event.Final.Message.Message.Format
if format == "" {
format = channel.MessageFormatPlain
}
if text == "" {
s.mu.Lock()
text = strings.TrimSpace(s.rawBuffer.String())
s.mu.Unlock()
}
if err := s.upsertText(ctx, text, format, true); err != nil {
return err
}
if err := s.pushAttachments(ctx, event.Final.Message.Attachments); err != nil {
return err
}
s.resetMessageState()
return nil
default:
return nil
}
}
func (s *matrixOutboundStream) Close(ctx context.Context) error {
if s == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
s.closed.Store(true)
return nil
}
func (s *matrixOutboundStream) upsertText(ctx context.Context, text string, format channel.MessageFormat, force bool) error {
text = strings.TrimSpace(text)
if text == "" {
return nil
}
if format == "" {
format = channel.MessageFormatPlain
}
s.mu.Lock()
roomID := s.roomID
originalEventID := s.originalEventID
lastText := s.lastText
lastFormat := s.lastFormat
lastEditedAt := s.lastEditedAt
reply := s.reply
s.mu.Unlock()
if roomID == "" {
resolvedRoomID, err := s.adapter.resolveRoomTarget(ctx, s.cfg, s.target)
if err != nil {
return err
}
roomID = resolvedRoomID
s.mu.Lock()
s.roomID = resolvedRoomID
s.mu.Unlock()
}
if originalEventID == "" {
eventID, err := s.adapter.sendTextEvent(ctx, s.cfg, roomID, buildMatrixMessageContent(channel.Message{
Text: text,
Format: format,
Reply: reply,
}, false, ""))
if err != nil {
return err
}
s.mu.Lock()
s.originalEventID = eventID
s.lastText = text
s.lastFormat = format
s.lastEditedAt = time.Now()
s.mu.Unlock()
return nil
}
if text == lastText && format == lastFormat {
return nil
}
if !force && time.Since(lastEditedAt) < matrixEditThrottle {
return nil
}
_, err := s.adapter.sendTextEvent(ctx, s.cfg, roomID, buildMatrixMessageContent(channel.Message{
Text: text,
Format: format,
}, true, originalEventID))
if err != nil {
return err
}
s.mu.Lock()
s.lastText = text
s.lastFormat = format
s.lastEditedAt = time.Now()
s.mu.Unlock()
return nil
}
func (s *matrixOutboundStream) resetMessageState() {
s.mu.Lock()
s.originalEventID = ""
s.rawBuffer.Reset()
s.lastText = ""
s.lastFormat = ""
s.lastEditedAt = time.Time{}
s.mu.Unlock()
}
func (s *matrixOutboundStream) pushAttachments(ctx context.Context, attachments []channel.PreparedAttachment) error {
if len(attachments) == 0 {
return nil
}
s.mu.Lock()
roomID := s.roomID
originalEventID := s.originalEventID
reply := s.reply
s.mu.Unlock()
if roomID == "" {
resolvedRoomID, err := s.adapter.resolveRoomTarget(ctx, s.cfg, s.target)
if err != nil {
return err
}
roomID = resolvedRoomID
s.mu.Lock()
s.roomID = resolvedRoomID
s.mu.Unlock()
}
for idx, att := range attachments {
mediaMsg := channel.Message{}
if idx == 0 && originalEventID == "" {
mediaMsg.Reply = reply
}
if err := s.adapter.sendMediaAttachment(ctx, s.cfg, roomID, mediaMsg, att); err != nil {
return err
}
}
return nil
}