Files
Memoh/internal/messaging/executor.go
T
BBQ d3bf6bc90a fix(channel,attachment): channel quality refactor & attachment pipeline fixes (#349)
* feat(channel): add DingTalk channel adapter

- Add DingTalk channel adapter (`internal/channel/adapters/dingtalk/`) using dingtalk-stream-sdk-go, supporting inbound message receiving and outbound text/markdown reply
- Register DingTalk adapter in cmd/agent and cmd/memoh
- Add go.mod dependency: github.com/memohai/dingtalk-stream-sdk-go
- Add Dingtalk and Wecom SVG icons and Vue components to @memohai/icon
- Refactor existing icon components to remove redundant inline wrappers
- Add `channelTypeDisplayName` util for consistent channel label resolution
- Add DingTalk/WeCom i18n entries (en/zh) for types and typesShort
- Extend channel-icon, bot-channels, channel-settings-panel to support dingtalk/wecom
- Use channelTypeDisplayName in profile page to replace ad-hoc i18n lookup

* fix(channel,attachment): channel quality refactor & attachment pipeline fixes

Channel module:
- Fix RemoveAdapter not cleaning connectionMeta (stale status leak)
- Fix preparedAttachmentTypeFromMime misclassifying image/gif
- Fix sleepWithContext time.After goroutine/timer leak
- Export IsDataURL/IsHTTPURL/IsDataPath, dedup across packages
- Cache OutboundPolicy in managerOutboundStream to avoid repeated lookups
- Split OutboundAttachmentStore: extract ContainerAttachmentIngester interface
- Add ManagerOption funcs (WithInboundQueueSize, WithInboundWorkers, WithRefreshInterval)
- Add thread-safety docs on OutboundStream / managerOutboundStream
- Add debug logs on successful send/edit paths
- Expand outbound_prepare_test.go with 21 new cases
- Convert no-receiver adapter helpers to package-level funcs; drop unused params

DingTalk adapter:
- Implement AttachmentResolver: download inbound media via /v1.0/robot/messageFiles/download
- Fix pure-image inbound messages failing due to missing resolver

Attachment pipeline:
- Fix images invisible to LLM in pipeline (DCP) path: inject InlineImages into
  last user message when cfg.Query is empty
- Fix public_url fallback: skip direct URL-to-LLM when ContentHash is set,
  always prefer inlined persisted asset
- Inject path: carry ImageParts through agent.InjectMessage; inline persisted
  attachments in resolver inject goroutine so mid-stream images reach the model
- Fix ResolveMime for images: prefer content-sniffed MIME over platform-declared
  MIME (fixes Feishu sending image/png header for actual JPEG content → API 400)
2026-04-09 14:36:11 +08:00

632 lines
18 KiB
Go

package messaging
import (
"context"
"encoding/json"
"errors"
"log/slog"
"path/filepath"
"strings"
"github.com/memohai/memoh/internal/channel"
"github.com/memohai/memoh/internal/media"
)
// SessionContext carries request-scoped identity for tool execution.
type SessionContext struct {
BotID string
ChatID string
CurrentPlatform string
ReplyTarget string
}
// Sender sends outbound messages through a channel manager.
type Sender interface {
Send(ctx context.Context, botID string, channelType channel.ChannelType, req channel.SendRequest) error
}
// Reactor adds or removes emoji reactions through a channel manager.
type Reactor interface {
React(ctx context.Context, botID string, channelType channel.ChannelType, req channel.ReactRequest) error
}
// ChannelTypeResolver parses a platform name to a channel type.
type ChannelTypeResolver interface {
ParseChannelType(raw string) (channel.ChannelType, error)
}
// AssetMeta holds resolved metadata for a media asset.
type AssetMeta = media.Asset
// AssetResolver looks up persisted media assets by storage key and can
// optionally ingest files from a bot's container filesystem.
type AssetResolver interface {
channel.OutboundAttachmentStore
channel.ContainerAttachmentIngester
}
// Executor provides send and react operations for channel messaging.
type Executor struct {
Sender Sender
Reactor Reactor
Resolver ChannelTypeResolver
AssetResolver AssetResolver
Logger *slog.Logger
}
// SendResult is the success payload returned after sending a message.
type SendResult struct {
BotID string
Platform string
Target string
MessageID string
// Local is true when the message targets the current conversation.
// The caller should emit the resolved attachments as stream events.
Local bool
LocalAttachments []channel.Attachment
}
// ReactResult is the success payload returned after reacting.
type ReactResult struct {
BotID string
Platform string
Target string
MessageID string
Emoji string
Action string // "added" or "removed"
}
type sendMode struct {
name string
allowLocalShortcut bool
requireTarget bool
promoteDataAttachments bool
}
type sendPlan struct {
botID string
channelType channel.ChannelType
target string
sameConv bool
message channel.Message
}
// Send executes a send-message action. args are the tool call arguments.
func (e *Executor) Send(ctx context.Context, session SessionContext, args map[string]any) (*SendResult, error) {
return e.sendWithMode(ctx, session, "", args, sendMode{
name: "send",
allowLocalShortcut: true,
})
}
// SendDirect sends a message via the channel adapter without the same-conversation
// local shortcut. Used by discuss mode where there is no active stream emitter.
func (e *Executor) SendDirect(ctx context.Context, session SessionContext, target string, args map[string]any) (*SendResult, error) {
return e.sendWithMode(ctx, session, target, args, sendMode{
name: "send direct",
requireTarget: true,
promoteDataAttachments: true,
})
}
func (e *Executor) sendWithMode(
ctx context.Context,
session SessionContext,
explicitTarget string,
args map[string]any,
mode sendMode,
) (*SendResult, error) {
if e.Sender == nil || e.Resolver == nil {
return nil, errors.New("message service not available")
}
plan, err := e.prepareSendPlan(ctx, session, explicitTarget, args, mode)
if err != nil {
return nil, err
}
if mode.allowLocalShortcut && plan.sameConv {
return &SendResult{
BotID: plan.botID,
Platform: plan.channelType.String(),
Target: plan.target,
Local: true,
LocalAttachments: plan.message.Attachments,
}, nil
}
if mode.promoteDataAttachments {
e.promoteDataPathAttachmentsToAssets(ctx, plan.botID, plan.channelType, &plan.message)
}
if err := e.Sender.Send(ctx, plan.botID, plan.channelType, channel.SendRequest{
Target: plan.target,
Message: plan.message,
}); err != nil {
if e.Logger != nil {
e.Logger.Warn("outbound send failed",
slog.String("mode", mode.name),
slog.Any("error", err),
slog.String("bot_id", plan.botID),
slog.String("platform", string(plan.channelType)),
)
}
return nil, err
}
return &SendResult{
BotID: plan.botID,
Platform: plan.channelType.String(),
Target: plan.target,
}, nil
}
func (e *Executor) prepareSendPlan(
ctx context.Context,
session SessionContext,
explicitTarget string,
args map[string]any,
mode sendMode,
) (*sendPlan, error) {
botID, err := e.resolveBotID(args, session)
if err != nil {
return nil, err
}
channelType, err := e.resolvePlatform(args, session)
if err != nil {
return nil, err
}
target := strings.TrimSpace(explicitTarget)
if target == "" {
target = firstStringArg(args, "target")
}
if target == "" {
target = strings.TrimSpace(session.ReplyTarget)
}
sameConv := target == "" || IsSameConversation(session, channelType.String(), target)
if mode.requireTarget && target == "" {
return nil, errors.New("target is required")
}
if !mode.allowLocalShortcut && target == "" {
return nil, errors.New("target is required for cross-conversation send")
}
msg, err := e.buildOutboundMessage(ctx, botID, session, channelType, target, args, sameConv)
if err != nil {
return nil, err
}
return &sendPlan{
botID: botID,
channelType: channelType,
target: target,
sameConv: sameConv,
message: msg,
}, nil
}
func (e *Executor) buildOutboundMessage(
ctx context.Context,
botID string,
session SessionContext,
channelType channel.ChannelType,
target string,
args map[string]any,
isSameConv bool,
) (channel.Message, error) {
messageText := firstStringArg(args, "text")
outboundMessage, parseErr := ParseOutboundMessage(args, messageText)
if parseErr != nil {
if rawAtt, ok := args["attachments"]; !ok || rawAtt == nil {
return channel.Message{}, parseErr
}
outboundMessage = channel.Message{Text: strings.TrimSpace(messageText)}
}
if rawAttachments, ok := args["attachments"]; ok && rawAttachments != nil {
attachments, err := e.resolveOutboundAttachments(ctx, botID, session, channelType, target, rawAttachments, isSameConv)
if err != nil {
return channel.Message{}, err
}
outboundMessage.Attachments = append(outboundMessage.Attachments, attachments...)
}
if outboundMessage.IsEmpty() {
return channel.Message{}, errors.New("message or attachments required")
}
if replyTo := firstStringArg(args, "reply_to"); replyTo != "" {
outboundMessage.Reply = &channel.ReplyRef{MessageID: replyTo}
}
if outboundMessage.Format == "" && channel.ContainsMarkdown(outboundMessage.Text) {
outboundMessage.Format = channel.MessageFormatMarkdown
}
return outboundMessage, nil
}
func (e *Executor) resolveOutboundAttachments(
ctx context.Context,
botID string,
session SessionContext,
channelType channel.ChannelType,
target string,
rawAttachments any,
isSameConv bool,
) ([]channel.Attachment, error) {
if isSameConv || IsSameConversation(session, channelType.String(), target) {
return resolveSameConversationAttachments(rawAttachments), nil
}
items := NormalizeAttachmentInputs(rawAttachments)
if items == nil {
return nil, errors.New("attachments must be a string, object, or array")
}
if len(items) == 0 {
return nil, nil
}
resolved := e.ResolveAttachments(ctx, botID, items)
if len(resolved) == 0 {
return nil, errors.New("attachments could not be resolved")
}
return resolved, nil
}
func resolveSameConversationAttachments(rawAttachments any) []channel.Attachment {
items := NormalizeAttachmentInputs(rawAttachments)
if items == nil {
return nil
}
result := make([]channel.Attachment, 0, len(items))
for _, item := range items {
ref := ""
name := ""
attType := ""
switch v := item.(type) {
case string:
ref = strings.TrimSpace(v)
case map[string]any:
ref = firstStringArg(v, "path", "url")
name = firstStringArg(v, "name")
attType = firstStringArg(v, "type")
}
if ref == "" {
continue
}
lower := strings.ToLower(ref)
if !strings.HasPrefix(ref, "/") &&
!strings.HasPrefix(lower, "http://") &&
!strings.HasPrefix(lower, "https://") &&
!strings.HasPrefix(lower, "data:") {
ref = "/data/" + ref
}
if name == "" {
name = filepath.Base(ref)
}
t := channel.AttachmentType(attType)
if t == "" {
t = InferAttachmentTypeFromExt(ref)
}
result = append(result, channel.Attachment{Type: t, URL: ref, Name: name})
}
return result
}
// promoteDataPathAttachmentsToAssets converts /data/* attachment references
// into content_hash-backed attachments before channel send.
// This avoids adapters treating local container paths as HTTP URLs.
func (e *Executor) promoteDataPathAttachmentsToAssets(ctx context.Context, botID string, channelType channel.ChannelType, msg *channel.Message) {
if e.AssetResolver == nil || msg == nil || len(msg.Attachments) == 0 {
return
}
prepared, err := channel.PrepareOutboundMessage(ctx, e.AssetResolver, channel.ChannelConfig{
BotID: botID,
ChannelType: channelType,
}, channel.OutboundMessage{
Message: *msg,
})
if err != nil {
return
}
msg.Attachments = prepared.Message.Message.Attachments
}
// React executes a react action. args are the tool call arguments.
func (e *Executor) React(ctx context.Context, session SessionContext, args map[string]any) (*ReactResult, error) {
if e.Reactor == nil || e.Resolver == nil {
return nil, errors.New("reaction service not available")
}
botID, err := e.resolveBotID(args, session)
if err != nil {
return nil, err
}
channelType, err := e.resolvePlatform(args, session)
if err != nil {
return nil, err
}
target := firstStringArg(args, "target")
if target == "" {
target = strings.TrimSpace(session.ReplyTarget)
}
if target == "" {
return nil, errors.New("target is required")
}
messageID := firstStringArg(args, "message_id")
if messageID == "" {
return nil, errors.New("message_id is required")
}
emoji := firstStringArg(args, "emoji")
remove, _, _ := boolArg(args, "remove")
if err := e.Reactor.React(ctx, botID, channelType, channel.ReactRequest{
Target: target, MessageID: messageID, Emoji: emoji, Remove: remove,
}); err != nil {
if e.Logger != nil {
e.Logger.Warn("react failed", slog.Any("error", err), slog.String("bot_id", botID), slog.String("platform", string(channelType)))
}
return nil, err
}
action := "added"
if remove {
action = "removed"
}
return &ReactResult{
BotID: botID, Platform: channelType.String(), Target: target,
MessageID: messageID, Emoji: emoji, Action: action,
}, nil
}
// CanSend returns true if the executor has a sender and resolver configured.
func (e *Executor) CanSend() bool { return e.Sender != nil && e.Resolver != nil }
// CanReact returns true if the executor has a reactor and resolver configured.
func (e *Executor) CanReact() bool { return e.Reactor != nil && e.Resolver != nil }
// IsSameConversation reports whether platform+target matches the session's
// current conversation.
func IsSameConversation(session SessionContext, platform, target string) bool {
replyTarget := strings.TrimSpace(session.ReplyTarget)
if replyTarget == "" {
return false
}
if platform == "" {
platform = strings.TrimSpace(session.CurrentPlatform)
}
if target == "" {
target = replyTarget
}
return strings.EqualFold(platform, strings.TrimSpace(session.CurrentPlatform)) &&
target == replyTarget
}
func (*Executor) resolveBotID(args map[string]any, session SessionContext) (string, error) {
botID := firstStringArg(args, "bot_id")
if botID == "" {
botID = strings.TrimSpace(session.BotID)
}
if botID == "" {
return "", errors.New("bot_id is required")
}
if strings.TrimSpace(session.BotID) != "" && botID != strings.TrimSpace(session.BotID) {
return "", errors.New("bot_id mismatch")
}
return botID, nil
}
func (e *Executor) resolvePlatform(args map[string]any, session SessionContext) (channel.ChannelType, error) {
platform := firstStringArg(args, "platform")
if platform == "" {
platform = strings.TrimSpace(session.CurrentPlatform)
}
if platform == "" {
return "", errors.New("platform is required")
}
return e.Resolver.ParseChannelType(platform)
}
// ResolveAttachments converts raw attachment arguments into channel.Attachment values.
func (e *Executor) ResolveAttachments(ctx context.Context, botID string, items []any) []channel.Attachment {
var result []channel.Attachment
for _, item := range items {
switch v := item.(type) {
case string:
if att := e.resolveAttachmentRef(ctx, botID, strings.TrimSpace(v), "", ""); att != nil {
result = append(result, *att)
}
case map[string]any:
path := firstStringArg(v, "path")
urlVal := firstStringArg(v, "url")
attType := firstStringArg(v, "type")
name := firstStringArg(v, "name")
ref := path
if ref == "" {
ref = urlVal
}
if ref == "" {
continue
}
if att := e.resolveAttachmentRef(ctx, botID, ref, attType, name); att != nil {
result = append(result, *att)
}
}
}
return result
}
func (e *Executor) resolveAttachmentRef(ctx context.Context, botID, ref, attType, name string) *channel.Attachment {
ref = strings.TrimSpace(ref)
if ref == "" {
return nil
}
lower := strings.ToLower(ref)
if strings.HasPrefix(lower, "http://") || strings.HasPrefix(lower, "https://") {
t := channel.AttachmentType(attType)
if t == "" {
t = InferAttachmentTypeFromExt(ref)
}
return &channel.Attachment{Type: t, URL: ref, Name: name}
}
if strings.HasPrefix(lower, "data:") {
t := channel.AttachmentType(attType)
if t == "" {
t = channel.AttachmentImage
}
return &channel.Attachment{Type: t, Base64: ref, Name: name}
}
// Resolve relative paths against the container's data mount.
// LLMs often pass bare filenames like "IDENTITY.md" instead of "/data/IDENTITY.md".
if !strings.HasPrefix(ref, "/") {
ref = "/data/" + ref
}
if name == "" {
name = filepath.Base(ref)
}
mediaMarker := filepath.Join("/data", "media")
if !strings.HasSuffix(mediaMarker, "/") {
mediaMarker += "/"
}
if idx := strings.Index(ref, mediaMarker); idx >= 0 && e.AssetResolver != nil {
storageKey := ref[idx+len(mediaMarker):]
asset, err := e.AssetResolver.GetByStorageKey(ctx, botID, storageKey)
if err == nil {
return AssetMetaToAttachment(asset, botID, attType, name)
}
}
dataPrefix := "/data/"
if strings.HasPrefix(ref, dataPrefix) && e.AssetResolver != nil {
asset, err := e.AssetResolver.IngestContainerFile(ctx, botID, ref)
if err == nil {
return AssetMetaToAttachment(asset, botID, attType, name)
}
return nil
}
t := channel.AttachmentType(attType)
if t == "" {
t = InferAttachmentTypeFromExt(ref)
}
return &channel.Attachment{Type: t, URL: ref, Name: name}
}
// NormalizeAttachmentInputs normalizes raw attachment argument into a slice.
func NormalizeAttachmentInputs(raw any) []any {
switch v := raw.(type) {
case nil:
return nil
case []any:
if v == nil {
return []any{}
}
return v
case []string:
items := make([]any, 0, len(v))
for _, item := range v {
items = append(items, item)
}
return items
case string, map[string]any:
return []any{v}
default:
return nil
}
}
// ParseOutboundMessage parses a message from tool call arguments.
func ParseOutboundMessage(arguments map[string]any, fallbackText string) (channel.Message, error) {
var msg channel.Message
if raw, ok := arguments["message"]; ok && raw != nil {
switch value := raw.(type) {
case string:
msg.Text = strings.TrimSpace(value)
case map[string]any:
data, err := json.Marshal(value)
if err != nil {
return channel.Message{}, err
}
if err := json.Unmarshal(data, &msg); err != nil {
return channel.Message{}, err
}
default:
return channel.Message{}, errors.New("message must be object or string")
}
}
if msg.IsEmpty() && strings.TrimSpace(fallbackText) != "" {
msg.Text = strings.TrimSpace(fallbackText)
}
if msg.IsEmpty() {
return channel.Message{}, errors.New("message is required")
}
return msg, nil
}
// AssetMetaToAttachment converts an AssetMeta to a channel.Attachment.
func AssetMetaToAttachment(asset media.Asset, botID, attType, name string) *channel.Attachment {
t := channel.AttachmentType(attType)
if t == "" {
t = InferAttachmentTypeFromMime(asset.Mime)
}
return &channel.Attachment{
Type: t, ContentHash: asset.ContentHash, Mime: asset.Mime, Size: asset.SizeBytes, Name: name,
Metadata: map[string]any{"bot_id": botID, "storage_key": asset.StorageKey},
}
}
// InferAttachmentTypeFromMime infers attachment type from MIME type.
func InferAttachmentTypeFromMime(mime string) channel.AttachmentType {
mime = strings.ToLower(strings.TrimSpace(mime))
switch {
case strings.HasPrefix(mime, "image/"):
return channel.AttachmentImage
case strings.HasPrefix(mime, "audio/"):
return channel.AttachmentAudio
case strings.HasPrefix(mime, "video/"):
return channel.AttachmentVideo
default:
return channel.AttachmentFile
}
}
// InferAttachmentTypeFromExt infers attachment type from file extension.
func InferAttachmentTypeFromExt(path string) channel.AttachmentType {
ext := strings.ToLower(filepath.Ext(path))
switch ext {
case ".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg":
return channel.AttachmentImage
case ".mp3", ".wav", ".ogg", ".flac", ".aac":
return channel.AttachmentAudio
case ".mp4", ".webm", ".avi", ".mov":
return channel.AttachmentVideo
default:
return channel.AttachmentFile
}
}
func firstStringArg(args map[string]any, keys ...string) string {
for _, key := range keys {
if args == nil {
continue
}
raw, ok := args[key]
if !ok {
continue
}
if s, ok := raw.(string); ok {
s = strings.TrimSpace(s)
if s != "" {
return s
}
}
}
return ""
}
func boolArg(args map[string]any, key string) (bool, bool, error) {
if args == nil {
return false, false, nil
}
raw, ok := args[key]
if !ok || raw == nil {
return false, false, nil
}
value, ok := raw.(bool)
if !ok {
return false, true, errors.New(key + " must be a boolean")
}
return value, true, nil
}