fix: send file out of /data/media failed

This commit is contained in:
Acbox
2026-02-21 00:04:38 +08:00
parent cb5d2c5fab
commit 7b12fb0b0c
7 changed files with 128 additions and 23 deletions
+16
View File
@@ -799,6 +799,22 @@ func (a *mediaAssetResolverAdapter) GetByStorageKey(ctx context.Context, botID,
}, nil
}
func (a *mediaAssetResolverAdapter) IngestContainerFile(ctx context.Context, botID, containerPath string) (mcpmessage.AssetMeta, error) {
if a == nil || a.media == nil {
return mcpmessage.AssetMeta{}, fmt.Errorf("media service not configured")
}
asset, err := a.media.IngestContainerFile(ctx, botID, containerPath)
if err != nil {
return mcpmessage.AssetMeta{}, err
}
return mcpmessage.AssetMeta{
ContentHash: asset.ContentHash,
Mime: asset.Mime,
SizeBytes: asset.SizeBytes,
StorageKey: asset.StorageKey,
}, nil
}
// gatewayAssetLoaderAdapter bridges media service to flow gateway asset loader.
type gatewayAssetLoaderAdapter struct {
media *media.Service
+28 -6
View File
@@ -44,6 +44,8 @@ type mediaIngestor interface {
GetByStorageKey(ctx context.Context, botID, storageKey string) (media.Asset, error)
// AccessPath returns a consumer-accessible reference for a persisted asset.
AccessPath(asset media.Asset) string
// IngestContainerFile reads a file from /data/ and ingests it into media store.
IngestContainerFile(ctx context.Context, botID, containerPath string) (media.Asset, error)
}
// ChannelInboundProcessor routes channel inbound messages to the chat gateway.
@@ -1589,17 +1591,38 @@ func isHTTPURL(raw string) bool {
// resolveContainerPathAsset attempts to match a container-internal file path
// to an existing media asset by extracting the storage key from the path.
// For non-/data/media/ paths, it ingests the file into the media store first.
// Returns true if the asset was resolved and item was updated.
func (p *ChannelInboundProcessor) resolveContainerPathAsset(ctx context.Context, botID, accessPath string, item *channel.Attachment) bool {
// Try /data/media/ lookup first.
storageKey := extractStorageKey(accessPath, botID)
if storageKey == "" {
return false
if storageKey != "" {
asset, err := p.mediaService.GetByStorageKey(ctx, botID, storageKey)
if err == nil {
applyAssetToAttachment(asset, botID, item)
return true
}
}
asset, err := p.mediaService.GetByStorageKey(ctx, botID, storageKey)
if err != nil {
return false
// For any /data/ path, ingest the file into media store.
if strings.HasPrefix(accessPath, "/data/") {
asset, err := p.mediaService.IngestContainerFile(ctx, botID, accessPath)
if err != nil {
if p.logger != nil {
p.logger.Warn("ingest container file for stream failed", slog.String("path", accessPath), slog.Any("error", err))
}
return false
}
applyAssetToAttachment(asset, botID, item)
return true
}
return false
}
func applyAssetToAttachment(asset media.Asset, botID string, item *channel.Attachment) {
item.ContentHash = asset.ContentHash
item.URL = ""
if item.Metadata == nil {
item.Metadata = make(map[string]any)
}
@@ -1611,7 +1634,6 @@ func (p *ChannelInboundProcessor) resolveContainerPathAsset(ctx context.Context,
if item.Size == 0 && asset.SizeBytes > 0 {
item.Size = asset.SizeBytes
}
return true
}
// extractStorageKey derives the media storage key from a container-internal
+5
View File
@@ -5,6 +5,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
@@ -216,6 +217,10 @@ func (f *fakeMediaIngestor) GetByStorageKey(_ context.Context, _, _ string) (med
return f.storageKeyAsset, f.storageKeyErr
}
func (f *fakeMediaIngestor) IngestContainerFile(_ context.Context, _, _ string) (media.Asset, error) {
return media.Asset{}, fmt.Errorf("not implemented in test")
}
func (f *fakeMediaIngestor) AccessPath(asset media.Asset) string {
return "/data/media/" + asset.StorageKey
}
+36 -17
View File
@@ -43,6 +43,9 @@ type AssetMeta struct {
// AssetResolver looks up persisted media assets by storage key.
type AssetResolver interface {
GetByStorageKey(ctx context.Context, botID, storageKey string) (AssetMeta, error)
// IngestContainerFile reads a file from the container's /data directory,
// ingests it into the media store, and returns the resulting asset metadata.
IngestContainerFile(ctx context.Context, botID, containerPath string) (AssetMeta, error)
}
// Executor exposes send and react as MCP tools.
@@ -389,29 +392,27 @@ func (p *Executor) resolveAttachmentRef(ctx context.Context, botID, ref, attType
storageKey := ref[idx+len(mediaMarker):]
asset, err := p.assetResolver.GetByStorageKey(ctx, botID, storageKey)
if err == nil {
t := channel.AttachmentType(attType)
if t == "" {
t = inferAttachmentTypeFromMime(asset.Mime)
}
att := channel.Attachment{
Type: t,
ContentHash: asset.ContentHash,
Mime: asset.Mime,
Size: asset.SizeBytes,
Name: name,
Metadata: map[string]any{
"bot_id": botID,
"storage_key": asset.StorageKey,
},
}
return &att
return assetMetaToAttachment(asset, botID, attType, name)
}
if p.logger != nil {
p.logger.Warn("resolve media path failed", slog.String("path", ref), slog.Any("error", err))
}
}
// Unknown container path — pass through with the path as URL.
// Other container /data/ path — ingest into media store first.
const dataPrefix = "/data/"
if strings.HasPrefix(ref, dataPrefix) && p.assetResolver != nil {
asset, err := p.assetResolver.IngestContainerFile(ctx, botID, ref)
if err == nil {
return assetMetaToAttachment(asset, botID, attType, name)
}
if p.logger != nil {
p.logger.Warn("ingest container file failed", slog.String("path", ref), slog.Any("error", err))
}
return nil
}
// Unknown path — pass through as URL (may fail for non-HTTP paths).
t := channel.AttachmentType(attType)
if t == "" {
t = inferAttachmentTypeFromExt(ref)
@@ -423,6 +424,24 @@ func (p *Executor) resolveAttachmentRef(ctx context.Context, botID, ref, attType
}
}
func assetMetaToAttachment(asset AssetMeta, botID, attType, name string) *channel.Attachment {
t := channel.AttachmentType(attType)
if t == "" {
t = inferAttachmentTypeFromMime(asset.Mime)
}
return &channel.Attachment{
Type: t,
ContentHash: asset.ContentHash,
Mime: asset.Mime,
Size: asset.SizeBytes,
Name: name,
Metadata: map[string]any{
"bot_id": botID,
"storage_key": asset.StorageKey,
},
}
}
func inferAttachmentTypeFromMime(mime string) channel.AttachmentType {
mime = strings.ToLower(strings.TrimSpace(mime))
switch {
+19
View File
@@ -142,6 +142,25 @@ func (s *Service) AccessPath(asset Asset) string {
return s.provider.AccessPath(routingKey)
}
// IngestContainerFile reads an arbitrary file from a bot's /data/ directory
// and ingests it into the media store. The provider must implement ContainerFileOpener.
func (s *Service) IngestContainerFile(ctx context.Context, botID, containerPath string) (Asset, error) {
if s.provider == nil {
return Asset{}, ErrProviderUnavailable
}
opener, ok := s.provider.(storage.ContainerFileOpener)
if !ok {
return Asset{}, fmt.Errorf("provider does not support container file reading")
}
f, err := opener.OpenContainerFile(botID, containerPath)
if err != nil {
return Asset{}, fmt.Errorf("open container file: %w", err)
}
defer f.Close()
mime := mimeFromExtension(path.Ext(containerPath))
return s.Ingest(ctx, IngestInput{BotID: botID, Mime: mime, Reader: f})
}
// resolveByContentHash scans hash-prefix directory by extension to find the file.
func (s *Service) resolveByContentHash(ctx context.Context, botID, contentHash string) (Asset, error) {
if strings.TrimSpace(contentHash) == "" || len(contentHash) < 2 {
@@ -103,6 +103,24 @@ func (p *Provider) hostPath(key string) (string, error) {
return joined, nil
}
// OpenContainerFile opens a file from a bot's /data/ directory on the host.
// containerPath must start with "/data/".
func (p *Provider) OpenContainerFile(botID, containerPath string) (io.ReadCloser, error) {
const dataPrefix = "/data/"
if !strings.HasPrefix(containerPath, dataPrefix) {
return nil, fmt.Errorf("path must start with /data/")
}
subPath := containerPath[len(dataPrefix):]
if subPath == "" || strings.Contains(subPath, "..") {
return nil, fmt.Errorf("invalid container path")
}
hostPath := filepath.Join(p.dataRoot, "bots", botID, subPath)
if !strings.HasPrefix(hostPath, p.dataRoot+string(filepath.Separator)) {
return nil, fmt.Errorf("path escapes data root")
}
return os.Open(hostPath)
}
// splitRoutingKey splits a routing key "<bot_id>/<storage_key>" into its parts.
func splitRoutingKey(key string) (botID, storageKey string) {
idx := strings.IndexByte(key, filepath.Separator)
+6
View File
@@ -18,3 +18,9 @@ type Provider interface {
// The format depends on the backend (e.g. container path, signed URL).
AccessPath(key string) string
}
// ContainerFileOpener is an optional interface that providers can implement
// to open arbitrary files from a bot's container data directory.
type ContainerFileOpener interface {
OpenContainerFile(botID, containerPath string) (io.ReadCloser, error)
}