diff --git a/cmd/agent/main.go b/cmd/agent/main.go index f1ab0d26..f564ca26 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -799,6 +799,22 @@ func (a *mediaAssetResolverAdapter) GetByStorageKey(ctx context.Context, botID, }, nil } +func (a *mediaAssetResolverAdapter) IngestContainerFile(ctx context.Context, botID, containerPath string) (mcpmessage.AssetMeta, error) { + if a == nil || a.media == nil { + return mcpmessage.AssetMeta{}, fmt.Errorf("media service not configured") + } + asset, err := a.media.IngestContainerFile(ctx, botID, containerPath) + if err != nil { + return mcpmessage.AssetMeta{}, err + } + return mcpmessage.AssetMeta{ + ContentHash: asset.ContentHash, + Mime: asset.Mime, + SizeBytes: asset.SizeBytes, + StorageKey: asset.StorageKey, + }, nil +} + // gatewayAssetLoaderAdapter bridges media service to flow gateway asset loader. type gatewayAssetLoaderAdapter struct { media *media.Service diff --git a/internal/channel/inbound/channel.go b/internal/channel/inbound/channel.go index a02eafac..433a7009 100644 --- a/internal/channel/inbound/channel.go +++ b/internal/channel/inbound/channel.go @@ -44,6 +44,8 @@ type mediaIngestor interface { GetByStorageKey(ctx context.Context, botID, storageKey string) (media.Asset, error) // AccessPath returns a consumer-accessible reference for a persisted asset. AccessPath(asset media.Asset) string + // IngestContainerFile reads a file from /data/ and ingests it into media store. + IngestContainerFile(ctx context.Context, botID, containerPath string) (media.Asset, error) } // ChannelInboundProcessor routes channel inbound messages to the chat gateway. @@ -1589,17 +1591,38 @@ func isHTTPURL(raw string) bool { // resolveContainerPathAsset attempts to match a container-internal file path // to an existing media asset by extracting the storage key from the path. +// For non-/data/media/ paths, it ingests the file into the media store first. // Returns true if the asset was resolved and item was updated. func (p *ChannelInboundProcessor) resolveContainerPathAsset(ctx context.Context, botID, accessPath string, item *channel.Attachment) bool { + // Try /data/media/ lookup first. storageKey := extractStorageKey(accessPath, botID) - if storageKey == "" { - return false + if storageKey != "" { + asset, err := p.mediaService.GetByStorageKey(ctx, botID, storageKey) + if err == nil { + applyAssetToAttachment(asset, botID, item) + return true + } } - asset, err := p.mediaService.GetByStorageKey(ctx, botID, storageKey) - if err != nil { - return false + + // For any /data/ path, ingest the file into media store. + if strings.HasPrefix(accessPath, "/data/") { + asset, err := p.mediaService.IngestContainerFile(ctx, botID, accessPath) + if err != nil { + if p.logger != nil { + p.logger.Warn("ingest container file for stream failed", slog.String("path", accessPath), slog.Any("error", err)) + } + return false + } + applyAssetToAttachment(asset, botID, item) + return true } + + return false +} + +func applyAssetToAttachment(asset media.Asset, botID string, item *channel.Attachment) { item.ContentHash = asset.ContentHash + item.URL = "" if item.Metadata == nil { item.Metadata = make(map[string]any) } @@ -1611,7 +1634,6 @@ func (p *ChannelInboundProcessor) resolveContainerPathAsset(ctx context.Context, if item.Size == 0 && asset.SizeBytes > 0 { item.Size = asset.SizeBytes } - return true } // extractStorageKey derives the media storage key from a container-internal diff --git a/internal/channel/inbound/channel_test.go b/internal/channel/inbound/channel_test.go index 3961b1f4..3e75386f 100644 --- a/internal/channel/inbound/channel_test.go +++ b/internal/channel/inbound/channel_test.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "encoding/json" "errors" + "fmt" "io" "log/slog" "net/http" @@ -216,6 +217,10 @@ func (f *fakeMediaIngestor) GetByStorageKey(_ context.Context, _, _ string) (med return f.storageKeyAsset, f.storageKeyErr } +func (f *fakeMediaIngestor) IngestContainerFile(_ context.Context, _, _ string) (media.Asset, error) { + return media.Asset{}, fmt.Errorf("not implemented in test") +} + func (f *fakeMediaIngestor) AccessPath(asset media.Asset) string { return "/data/media/" + asset.StorageKey } diff --git a/internal/mcp/providers/message/provider.go b/internal/mcp/providers/message/provider.go index 324ae391..85ffbef2 100644 --- a/internal/mcp/providers/message/provider.go +++ b/internal/mcp/providers/message/provider.go @@ -43,6 +43,9 @@ type AssetMeta struct { // AssetResolver looks up persisted media assets by storage key. type AssetResolver interface { GetByStorageKey(ctx context.Context, botID, storageKey string) (AssetMeta, error) + // IngestContainerFile reads a file from the container's /data directory, + // ingests it into the media store, and returns the resulting asset metadata. + IngestContainerFile(ctx context.Context, botID, containerPath string) (AssetMeta, error) } // Executor exposes send and react as MCP tools. @@ -389,29 +392,27 @@ func (p *Executor) resolveAttachmentRef(ctx context.Context, botID, ref, attType storageKey := ref[idx+len(mediaMarker):] asset, err := p.assetResolver.GetByStorageKey(ctx, botID, storageKey) if err == nil { - t := channel.AttachmentType(attType) - if t == "" { - t = inferAttachmentTypeFromMime(asset.Mime) - } - att := channel.Attachment{ - Type: t, - ContentHash: asset.ContentHash, - Mime: asset.Mime, - Size: asset.SizeBytes, - Name: name, - Metadata: map[string]any{ - "bot_id": botID, - "storage_key": asset.StorageKey, - }, - } - return &att + return assetMetaToAttachment(asset, botID, attType, name) } if p.logger != nil { p.logger.Warn("resolve media path failed", slog.String("path", ref), slog.Any("error", err)) } } - // Unknown container path — pass through with the path as URL. + // Other container /data/ path — ingest into media store first. + const dataPrefix = "/data/" + if strings.HasPrefix(ref, dataPrefix) && p.assetResolver != nil { + asset, err := p.assetResolver.IngestContainerFile(ctx, botID, ref) + if err == nil { + return assetMetaToAttachment(asset, botID, attType, name) + } + if p.logger != nil { + p.logger.Warn("ingest container file failed", slog.String("path", ref), slog.Any("error", err)) + } + return nil + } + + // Unknown path — pass through as URL (may fail for non-HTTP paths). t := channel.AttachmentType(attType) if t == "" { t = inferAttachmentTypeFromExt(ref) @@ -423,6 +424,24 @@ func (p *Executor) resolveAttachmentRef(ctx context.Context, botID, ref, attType } } +func assetMetaToAttachment(asset AssetMeta, botID, attType, name string) *channel.Attachment { + t := channel.AttachmentType(attType) + if t == "" { + t = inferAttachmentTypeFromMime(asset.Mime) + } + return &channel.Attachment{ + Type: t, + ContentHash: asset.ContentHash, + Mime: asset.Mime, + Size: asset.SizeBytes, + Name: name, + Metadata: map[string]any{ + "bot_id": botID, + "storage_key": asset.StorageKey, + }, + } +} + func inferAttachmentTypeFromMime(mime string) channel.AttachmentType { mime = strings.ToLower(strings.TrimSpace(mime)) switch { diff --git a/internal/media/service.go b/internal/media/service.go index d61b61cc..897b547f 100644 --- a/internal/media/service.go +++ b/internal/media/service.go @@ -142,6 +142,25 @@ func (s *Service) AccessPath(asset Asset) string { return s.provider.AccessPath(routingKey) } +// IngestContainerFile reads an arbitrary file from a bot's /data/ directory +// and ingests it into the media store. The provider must implement ContainerFileOpener. +func (s *Service) IngestContainerFile(ctx context.Context, botID, containerPath string) (Asset, error) { + if s.provider == nil { + return Asset{}, ErrProviderUnavailable + } + opener, ok := s.provider.(storage.ContainerFileOpener) + if !ok { + return Asset{}, fmt.Errorf("provider does not support container file reading") + } + f, err := opener.OpenContainerFile(botID, containerPath) + if err != nil { + return Asset{}, fmt.Errorf("open container file: %w", err) + } + defer f.Close() + mime := mimeFromExtension(path.Ext(containerPath)) + return s.Ingest(ctx, IngestInput{BotID: botID, Mime: mime, Reader: f}) +} + // resolveByContentHash scans hash-prefix directory by extension to find the file. func (s *Service) resolveByContentHash(ctx context.Context, botID, contentHash string) (Asset, error) { if strings.TrimSpace(contentHash) == "" || len(contentHash) < 2 { diff --git a/internal/storage/providers/containerfs/provider.go b/internal/storage/providers/containerfs/provider.go index 5042eada..2a34ae1a 100644 --- a/internal/storage/providers/containerfs/provider.go +++ b/internal/storage/providers/containerfs/provider.go @@ -103,6 +103,24 @@ func (p *Provider) hostPath(key string) (string, error) { return joined, nil } +// OpenContainerFile opens a file from a bot's /data/ directory on the host. +// containerPath must start with "/data/". +func (p *Provider) OpenContainerFile(botID, containerPath string) (io.ReadCloser, error) { + const dataPrefix = "/data/" + if !strings.HasPrefix(containerPath, dataPrefix) { + return nil, fmt.Errorf("path must start with /data/") + } + subPath := containerPath[len(dataPrefix):] + if subPath == "" || strings.Contains(subPath, "..") { + return nil, fmt.Errorf("invalid container path") + } + hostPath := filepath.Join(p.dataRoot, "bots", botID, subPath) + if !strings.HasPrefix(hostPath, p.dataRoot+string(filepath.Separator)) { + return nil, fmt.Errorf("path escapes data root") + } + return os.Open(hostPath) +} + // splitRoutingKey splits a routing key "/" into its parts. func splitRoutingKey(key string) (botID, storageKey string) { idx := strings.IndexByte(key, filepath.Separator) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 28c1b053..cdd7199c 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -18,3 +18,9 @@ type Provider interface { // The format depends on the backend (e.g. container path, signed URL). AccessPath(key string) string } + +// ContainerFileOpener is an optional interface that providers can implement +// to open arbitrary files from a bot's container data directory. +type ContainerFileOpener interface { + OpenContainerFile(botID, containerPath string) (io.ReadCloser, error) +}