Files
Memoh/internal/media/service.go
T
BBQ bc374fe8cd refactor: content-addressed assets, cross-channel multimodal, infra simplification (#63)
* refactor(attachment): multimodal attachment refactor with snapshot schema and storage layer

- Add snapshot schema migration (0008) and update init/versions/snapshots
- Add internal/attachment and internal/channel normalize for unified attachment handling
- Move containerfs provider from internal/media to internal/storage
- Update agent types, channel adapters (Telegram/Feishu), inbound and handlers
- Add containerd snapshot lineage and local_channel tests
- Regenerate sqlc, swagger and SDK

* refactor(media): content-addressed asset system with unified naming

- Replace asset_id foreign key with content_hash as sole identifier
  for bot_history_message_assets (pure soft-link model)
- Remove mime, size_bytes, storage_key from DB; derive at read time
  via media.Resolve from actual storage
- Merge migrations 0008/0009 into single 0008; keep 0001 as canonical schema
- Add Docker initdb script for deterministic migration execution order
- Fix cross-channel real-time image display (Telegram → WebUI SSE)
- Fix message disappearing on refresh (null assets fallback)
- Fix file icon instead of image preview (mime derivation from storage)
- Unify AssetID → ContentHash naming across Go, Agent, and Frontend
- Change storage key prefix from 4-char to 2-char for directory sharding
- Add server-entrypoint.sh for Docker deployment migration handling

* refactor(infra): embedded migrations, Docker simplification, and config consolidation

- Embed SQL migrations into Go binary, removing shell-based migration scripts
- Consolidate config files into conf/ directory (app.example.toml, app.docker.toml, app.dev.toml)
- Simplify Docker setup: remove initdb.d scripts, streamline nginx config and entrypoint
- Remove legacy CLI, feishu-echo commands, and obsolete incremental migration files
- Update install script and docs to require sudo for one-click install
- Add mise tasks for dev environment orchestration

* chore: recover migrations

---------

Co-authored-by: Acbox <acbox0328@gmail.com>
2026-02-19 00:20:27 +08:00

277 lines
7.2 KiB
Go

package media
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log/slog"
"os"
"path"
"strings"
"github.com/memohai/memoh/internal/storage"
)
// Service provides content-addressed media asset persistence.
// All metadata is derived from the filesystem — no database, no sidecar files.
type Service struct {
provider storage.Provider
logger *slog.Logger
}
// NewService creates a media service with the given storage provider.
func NewService(log *slog.Logger, provider storage.Provider) *Service {
if log == nil {
log = slog.Default()
}
return &Service{
provider: provider,
logger: log.With(slog.String("service", "media")),
}
}
// Ingest persists a new media asset. It hashes the content, deduplicates by
// checking the filesystem, and stores the bytes. Returns a derived Asset.
func (s *Service) Ingest(ctx context.Context, input IngestInput) (Asset, error) {
if s.provider == nil {
return Asset{}, ErrProviderUnavailable
}
if strings.TrimSpace(input.BotID) == "" {
return Asset{}, fmt.Errorf("bot id is required")
}
if input.Reader == nil {
return Asset{}, fmt.Errorf("reader is required")
}
maxBytes := input.MaxBytes
if maxBytes <= 0 {
maxBytes = MaxAssetBytes
}
contentHash, sizeBytes, tempPath, err := spoolAndHashWithLimit(input.Reader, maxBytes)
if err != nil {
return Asset{}, fmt.Errorf("read input: %w", err)
}
defer func() {
_ = os.Remove(tempPath)
}()
mime := coalesce(input.Mime, "application/octet-stream")
ext := extensionFromMime(mime)
storageKey := path.Join(contentHash[:2], contentHash+ext)
routingKey := path.Join(input.BotID, storageKey)
// Filesystem dedup: if the file already exists, skip write.
if _, openErr := s.provider.Open(ctx, routingKey); openErr == nil {
return Asset{
ContentHash: contentHash,
BotID: input.BotID,
Mime: mime,
SizeBytes: sizeBytes,
StorageKey: storageKey,
}, nil
}
tempFile, err := os.Open(tempPath)
if err != nil {
return Asset{}, fmt.Errorf("open temp file: %w", err)
}
defer func() {
_ = tempFile.Close()
}()
if err := s.provider.Put(ctx, routingKey, tempFile); err != nil {
return Asset{}, fmt.Errorf("store media: %w", err)
}
return Asset{
ContentHash: contentHash,
BotID: input.BotID,
Mime: mime,
SizeBytes: sizeBytes,
StorageKey: storageKey,
}, nil
}
// Resolve finds an asset by content hash (no stream open). Used to fill mime/storage_key when DB has none.
func (s *Service) Resolve(ctx context.Context, botID, contentHash string) (Asset, error) {
if s.provider == nil {
return Asset{}, ErrProviderUnavailable
}
return s.resolveByContentHash(ctx, botID, contentHash)
}
// Open returns a reader for the media asset identified by content hash.
// It locates the file by scanning extensions under the hash prefix and derives MIME from the extension.
func (s *Service) Open(ctx context.Context, botID, contentHash string) (io.ReadCloser, Asset, error) {
if s.provider == nil {
return nil, Asset{}, ErrProviderUnavailable
}
asset, err := s.resolveByContentHash(ctx, botID, contentHash)
if err != nil {
return nil, Asset{}, err
}
routingKey := path.Join(botID, asset.StorageKey)
reader, err := s.provider.Open(ctx, routingKey)
if err != nil {
return nil, Asset{}, fmt.Errorf("open storage: %w", err)
}
return reader, asset, nil
}
// GetByStorageKey returns an asset derived from a known storage key.
func (s *Service) GetByStorageKey(ctx context.Context, botID, storageKey string) (Asset, error) {
if s.provider == nil {
return Asset{}, ErrProviderUnavailable
}
routingKey := path.Join(botID, storageKey)
rc, err := s.provider.Open(ctx, routingKey)
if err != nil {
return Asset{}, ErrAssetNotFound
}
_ = rc.Close()
return deriveAssetFromKey(botID, storageKey), nil
}
// AccessPath returns a consumer-accessible reference for a persisted asset.
func (s *Service) AccessPath(asset Asset) string {
if s.provider == nil {
return ""
}
routingKey := path.Join(asset.BotID, asset.StorageKey)
return s.provider.AccessPath(routingKey)
}
// resolveByContentHash scans hash-prefix directory by extension to find the file.
func (s *Service) resolveByContentHash(ctx context.Context, botID, contentHash string) (Asset, error) {
if strings.TrimSpace(contentHash) == "" || len(contentHash) < 2 {
return Asset{}, ErrAssetNotFound
}
prefix := contentHash[:2]
for _, ext := range knownExtensions {
storageKey := path.Join(prefix, contentHash+ext)
routingKey := path.Join(botID, storageKey)
rc, err := s.provider.Open(ctx, routingKey)
if err != nil {
continue
}
_ = rc.Close()
return deriveAssetFromKey(botID, storageKey), nil
}
return Asset{}, ErrAssetNotFound
}
// deriveAssetFromKey builds an Asset from the storage key (hash_2char_prefix/hash.ext).
func deriveAssetFromKey(botID, storageKey string) Asset {
base := path.Base(storageKey)
ext := path.Ext(base)
hash := strings.TrimSuffix(base, ext)
return Asset{
ContentHash: hash,
BotID: botID,
Mime: mimeFromExtension(ext),
StorageKey: storageKey,
}
}
var knownExtensions = []string{".jpg", ".png", ".gif", ".webp", ".mp3", ".wav", ".ogg", ".mp4", ".webm", ".pdf", ".bin"}
func mimeFromExtension(ext string) string {
switch strings.ToLower(ext) {
case ".jpg", ".jpeg":
return "image/jpeg"
case ".png":
return "image/png"
case ".gif":
return "image/gif"
case ".webp":
return "image/webp"
case ".mp3":
return "audio/mpeg"
case ".wav":
return "audio/wav"
case ".ogg":
return "audio/ogg"
case ".mp4":
return "video/mp4"
case ".webm":
return "video/webm"
case ".pdf":
return "application/pdf"
default:
return "application/octet-stream"
}
}
func extensionFromMime(mime string) string {
switch strings.ToLower(strings.TrimSpace(mime)) {
case "image/png":
return ".png"
case "image/jpeg", "image/jpg":
return ".jpg"
case "image/gif":
return ".gif"
case "image/webp":
return ".webp"
case "audio/mpeg", "audio/mp3":
return ".mp3"
case "audio/wav":
return ".wav"
case "audio/ogg":
return ".ogg"
case "video/mp4":
return ".mp4"
case "video/webm":
return ".webm"
case "application/pdf":
return ".pdf"
default:
return ".bin"
}
}
func coalesce(values ...string) string {
for _, v := range values {
if strings.TrimSpace(v) != "" {
return v
}
}
return ""
}
func spoolAndHashWithLimit(reader io.Reader, maxBytes int64) (string, int64, string, error) {
if reader == nil {
return "", 0, "", fmt.Errorf("reader is required")
}
if maxBytes <= 0 {
return "", 0, "", fmt.Errorf("max bytes must be greater than 0")
}
tempFile, err := os.CreateTemp("", "memoh-media-*")
if err != nil {
return "", 0, "", fmt.Errorf("create temp file: %w", err)
}
tempPath := tempFile.Name()
keepFile := false
defer func() {
_ = tempFile.Close()
if !keepFile {
_ = os.Remove(tempPath)
}
}()
hasher := sha256.New()
limited := &io.LimitedReader{R: reader, N: maxBytes + 1}
written, err := io.Copy(io.MultiWriter(tempFile, hasher), limited)
if err != nil {
return "", 0, "", fmt.Errorf("copy to temp file: %w", err)
}
if written > maxBytes {
return "", 0, "", fmt.Errorf("%w: max %d bytes", ErrAssetTooLarge, maxBytes)
}
if written == 0 {
return "", 0, "", fmt.Errorf("asset payload is empty")
}
keepFile = true
return hex.EncodeToString(hasher.Sum(nil)), written, tempPath, nil
}