refactor: content-addressed assets, cross-channel multimodal, infra simplification (#63)

* refactor(attachment): multimodal attachment refactor with snapshot schema and storage layer

- Add snapshot schema migration (0008) and update init/versions/snapshots
- Add internal/attachment and internal/channel normalize for unified attachment handling
- Move containerfs provider from internal/media to internal/storage
- Update agent types, channel adapters (Telegram/Feishu), inbound and handlers
- Add containerd snapshot lineage and local_channel tests
- Regenerate sqlc, swagger and SDK

* refactor(media): content-addressed asset system with unified naming

- Replace asset_id foreign key with content_hash as sole identifier
  for bot_history_message_assets (pure soft-link model)
- Remove mime, size_bytes, storage_key from DB; derive at read time
  via media.Resolve from actual storage
- Merge migrations 0008/0009 into single 0008; keep 0001 as canonical schema
- Add Docker initdb script for deterministic migration execution order
- Fix cross-channel real-time image display (Telegram → WebUI SSE)
- Fix message disappearing on refresh (null assets fallback)
- Fix file icon instead of image preview (mime derivation from storage)
- Unify AssetID → ContentHash naming across Go, Agent, and Frontend
- Change storage key prefix from 4-char to 2-char for directory sharding
- Add server-entrypoint.sh for Docker deployment migration handling

* refactor(infra): embedded migrations, Docker simplification, and config consolidation

- Embed SQL migrations into Go binary, removing shell-based migration scripts
- Consolidate config files into conf/ directory (app.example.toml, app.docker.toml, app.dev.toml)
- Simplify Docker setup: remove initdb.d scripts, streamline nginx config and entrypoint
- Remove legacy CLI, feishu-echo commands, and obsolete incremental migration files
- Update install script and docs to require sudo for one-click install
- Add mise tasks for dev environment orchestration

* chore: recover migrations

---------

Co-authored-by: Acbox <acbox0328@gmail.com>
This commit is contained in:
BBQ
2026-02-19 00:20:27 +08:00
committed by GitHub
parent 740f620fe4
commit bc374fe8cd
104 changed files with 6133 additions and 2987 deletions
+88 -10
View File
@@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"log/slog"
"net/http"
"os"
@@ -17,6 +19,7 @@ import (
"go.uber.org/fx/fxevent"
"golang.org/x/crypto/bcrypt"
dbembed "github.com/memohai/memoh/db"
"github.com/memohai/memoh/internal/accounts"
"github.com/memohai/memoh/internal/bind"
"github.com/memohai/memoh/internal/boot"
@@ -49,7 +52,6 @@ import (
mcpweb "github.com/memohai/memoh/internal/mcp/providers/web"
mcpfederation "github.com/memohai/memoh/internal/mcp/sources/federation"
"github.com/memohai/memoh/internal/media"
"github.com/memohai/memoh/internal/media/providers/containerfs"
"github.com/memohai/memoh/internal/memory"
"github.com/memohai/memoh/internal/message"
"github.com/memohai/memoh/internal/message/event"
@@ -61,11 +63,66 @@ import (
"github.com/memohai/memoh/internal/searchproviders"
"github.com/memohai/memoh/internal/server"
"github.com/memohai/memoh/internal/settings"
"github.com/memohai/memoh/internal/storage/providers/containerfs"
"github.com/memohai/memoh/internal/subagent"
"github.com/memohai/memoh/internal/version"
)
func migrationsFS() fs.FS {
sub, err := fs.Sub(dbembed.MigrationsFS, "migrations")
if err != nil {
panic(fmt.Sprintf("embedded migrations: %v", err))
}
return sub
}
func main() {
cmd := "serve"
if len(os.Args) > 1 {
cmd = os.Args[1]
}
switch cmd {
case "serve":
runServe()
case "migrate":
runMigrate(os.Args[2:])
case "version":
fmt.Printf("memoh-server %s\n", version.GetInfo())
default:
fmt.Fprintf(os.Stderr, "Usage: memoh-server <command>\n\nCommands:\n serve Start the server (default)\n migrate Run database migrations (up|down|version|force)\n version Print version information\n")
os.Exit(1)
}
}
func runMigrate(args []string) {
if len(args) == 0 {
fmt.Fprintf(os.Stderr, "Usage: memoh-server migrate <up|down|version|force N>\n")
os.Exit(1)
}
cfg, err := provideConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "config: %v\n", err)
os.Exit(1)
}
logger.Init(cfg.Log.Level, cfg.Log.Format)
log := logger.L
migrateCmd := args[0]
var migrateArgs []string
if len(args) > 1 {
migrateArgs = args[1:]
}
if err := db.RunMigrate(log, cfg.Postgres, migrationsFS(), migrateCmd, migrateArgs); err != nil {
log.Error("migration failed", slog.Any("error", err))
os.Exit(1)
}
}
func runServe() {
fx.New(
fx.Provide(
provideConfig,
@@ -316,9 +373,10 @@ func provideScheduleTriggerer(resolver *flow.Resolver) schedule.Triggerer {
// conversation flow
// ---------------------------------------------------------------------------
func provideChatResolver(log *slog.Logger, cfg config.Config, modelsService *models.Service, queries *dbsqlc.Queries, memoryService *memory.Service, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, containerdHandler *handlers.ContainerdHandler) *flow.Resolver {
func provideChatResolver(log *slog.Logger, cfg config.Config, modelsService *models.Service, queries *dbsqlc.Queries, memoryService *memory.Service, chatService *conversation.Service, msgService *message.DBService, settingsService *settings.Service, mediaService *media.Service, containerdHandler *handlers.ContainerdHandler) *flow.Resolver {
resolver := flow.NewResolver(log, modelsService, queries, memoryService, chatService, msgService, settingsService, cfg.AgentGateway.BaseURL(), 120*time.Second)
resolver.SetSkillLoader(&skillLoaderAdapter{handler: containerdHandler})
resolver.SetGatewayAssetLoader(&gatewayAssetLoaderAdapter{media: mediaService})
return resolver
}
@@ -326,9 +384,11 @@ func provideChatResolver(log *slog.Logger, cfg config.Config, modelsService *mod
// channel providers
// ---------------------------------------------------------------------------
func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub) *channel.Registry {
func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub, mediaService *media.Service) *channel.Registry {
registry := channel.NewRegistry()
registry.MustRegister(telegram.NewTelegramAdapter(log))
tgAdapter := telegram.NewTelegramAdapter(log)
tgAdapter.SetAssetOpener(mediaService)
registry.MustRegister(tgAdapter)
registry.MustRegister(feishu.NewFeishuAdapter(log))
registry.MustRegister(local.NewCLIAdapter(hub))
registry.MustRegister(local.NewWebAdapter(hub))
@@ -338,6 +398,7 @@ func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub) *channel.Regi
func provideChannelRouter(
log *slog.Logger,
registry *channel.Registry,
hub *local.RouteHub,
routeService *route.DBService,
msgService *message.DBService,
resolver *flow.Resolver,
@@ -351,6 +412,7 @@ func provideChannelRouter(
) *inbound.ChannelInboundProcessor {
processor := inbound.NewChannelInboundProcessor(log, registry, routeService, msgService, resolver, identityService, botService, policyService, preauthService, bindService, rc.JwtSecret, 5*time.Minute)
processor.SetMediaService(mediaService)
processor.SetStreamObserver(local.NewRouteHubBroadcaster(hub))
return processor
}
@@ -370,8 +432,8 @@ func provideChannelLifecycleService(channelStore *channel.Store, channelManager
// containerd handler & tool gateway
// ---------------------------------------------------------------------------
func provideContainerdHandler(log *slog.Logger, service ctr.Service, cfg config.Config, botService *bots.Service, accountService *accounts.Service, policyService *policy.Service, queries *dbsqlc.Queries) *handlers.ContainerdHandler {
return handlers.NewContainerdHandler(log, service, cfg.MCP, cfg.Containerd.Namespace, botService, accountService, policyService, queries)
func provideContainerdHandler(log *slog.Logger, service ctr.Service, manager *mcp.Manager, cfg config.Config, botService *bots.Service, accountService *accounts.Service, policyService *policy.Service, queries *dbsqlc.Queries) *handlers.ContainerdHandler {
return handlers.NewContainerdHandler(log, service, manager, cfg.MCP, cfg.Containerd.Namespace, botService, accountService, policyService, queries)
}
func provideToolGatewayService(log *slog.Logger, cfg config.Config, channelManager *channel.Manager, registry *channel.Registry, channelStore *channel.Store, scheduleService *schedule.Service, memoryService *memory.Service, chatService *conversation.Service, accountService *accounts.Service, settingsService *settings.Service, searchProviderService *searchproviders.Service, manager *mcp.Manager, containerdHandler *handlers.ContainerdHandler, mcpConnService *mcp.ConnectionService) *mcp.ToolGatewayService {
@@ -418,13 +480,13 @@ func provideAuthHandler(log *slog.Logger, accountService *accounts.Service, rc *
return handlers.NewAuthHandler(log, accountService, rc.JwtSecret, rc.JwtExpiresIn)
}
func provideMessageHandler(log *slog.Logger, resolver *flow.Resolver, chatService *conversation.Service, msgService *message.DBService, mediaService *media.Service, botService *bots.Service, accountService *accounts.Service, identityService *identities.Service, hub *event.Hub) *handlers.MessageHandler {
h := handlers.NewMessageHandler(log, resolver, chatService, msgService, botService, accountService, identityService, hub)
func provideMessageHandler(log *slog.Logger, chatService *conversation.Service, msgService *message.DBService, mediaService *media.Service, botService *bots.Service, accountService *accounts.Service, hub *event.Hub) *handlers.MessageHandler {
h := handlers.NewMessageHandler(log, chatService, msgService, botService, accountService, hub)
h.SetMediaService(mediaService)
return h
}
func provideMediaService(log *slog.Logger, queries *dbsqlc.Queries, cfg config.Config) (*media.Service, error) {
func provideMediaService(log *slog.Logger, cfg config.Config) (*media.Service, error) {
dataRoot := strings.TrimSpace(cfg.MCP.DataRoot)
if dataRoot == "" {
dataRoot = config.DefaultDataRoot
@@ -433,7 +495,7 @@ func provideMediaService(log *slog.Logger, queries *dbsqlc.Queries, cfg config.C
if err != nil {
return nil, fmt.Errorf("init media provider: %w", err)
}
return media.NewService(log, queries, provider), nil
return media.NewService(log, provider), nil
}
func provideUsersHandler(log *slog.Logger, accountService *accounts.Service, identityService *identities.Service, botService *bots.Service, routeService *route.DBService, channelStore *channel.Store, channelLifecycle *channel.Lifecycle, channelManager *channel.Manager, registry *channel.Registry) *handlers.UsersHandler {
@@ -711,3 +773,19 @@ func (a *skillLoaderAdapter) LoadSkills(ctx context.Context, botID string) ([]fl
}
return entries, nil
}
// gatewayAssetLoaderAdapter bridges media service to flow gateway asset loader.
type gatewayAssetLoaderAdapter struct {
media *media.Service
}
func (a *gatewayAssetLoaderAdapter) OpenForGateway(ctx context.Context, botID, contentHash string) (io.ReadCloser, string, error) {
if a == nil || a.media == nil {
return nil, "", fmt.Errorf("media service not configured")
}
reader, asset, err := a.media.Open(ctx, botID, contentHash)
if err != nil {
return nil, "", err
}
return reader, strings.TrimSpace(asset.Mime), nil
}
-257
View File
@@ -1,257 +0,0 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
gocni "github.com/containerd/go-cni"
)
func main() {
flag.CommandLine.SetOutput(io.Discard)
containerID := flag.String("container-id", "", "")
flag.Parse()
if len(flag.Args()) > 0 {
switch flag.Arg(0) {
case "cni-setup":
os.Exit(runCNISetup(flag.Args()[1:]))
case "cni-remove":
os.Exit(runCNIRemove(flag.Args()[1:]))
case "cni-check":
os.Exit(runCNICheck(flag.Args()[1:]))
case "cni-status":
os.Exit(runCNIStatus(flag.Args()[1:]))
}
}
if *containerID == "" {
os.Exit(2)
}
cmd := buildMCPCommand(*containerID)
if err := runWithStdio(cmd); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
os.Exit(exitErr.ExitCode())
}
os.Exit(1)
}
}
func buildMCPCommand(containerID string) *exec.Cmd {
execID := "mcp-" + strconv.FormatInt(time.Now().UnixNano(), 10)
if runtime.GOOS == "darwin" {
return exec.Command(
"limactl",
"shell",
"--tty=false",
"default",
"--",
"sudo",
"-n",
"ctr",
"-n",
"default",
"tasks",
"exec",
"--exec-id",
execID,
containerID,
"/mcp",
)
}
return exec.Command(
"ctr",
"-n",
"default",
"tasks",
"exec",
"--exec-id",
execID,
containerID,
"/mcp",
)
}
func runWithStdio(cmd *exec.Cmd) error {
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
_ = stdin.Close()
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
_ = stdin.Close()
_ = stdout.Close()
return err
}
if err := cmd.Start(); err != nil {
_ = stdin.Close()
_ = stdout.Close()
_ = stderr.Close()
return err
}
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
_, _ = io.Copy(stdin, os.Stdin)
_ = stdin.Close()
}()
go func() {
defer wg.Done()
_, _ = io.Copy(os.Stdout, stdout)
}()
go func() {
defer wg.Done()
_, _ = io.Copy(os.Stderr, stderr)
}()
err = cmd.Wait()
wg.Wait()
return err
}
func runCNISetup(args []string) int {
id, netns, err := parseCNIArgs(args)
if err != nil {
return exitWithError(err)
}
cni, err := newCNIFromArgs(args)
if err != nil {
return exitWithError(err)
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return exitWithError(err)
}
result, err := cni.Setup(context.Background(), id, netns)
if err != nil {
return exitWithError(err)
}
if result != nil {
_ = json.NewEncoder(os.Stdout).Encode(result)
}
return 0
}
func runCNIRemove(args []string) int {
id, netns, err := parseCNIArgs(args)
if err != nil {
return exitWithError(err)
}
cni, err := newCNIFromArgs(args)
if err != nil {
return exitWithError(err)
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return exitWithError(err)
}
if err := cni.Remove(context.Background(), id, netns); err != nil {
return exitWithError(err)
}
return 0
}
func runCNICheck(args []string) int {
id, netns, err := parseCNIArgs(args)
if err != nil {
return exitWithError(err)
}
cni, err := newCNIFromArgs(args)
if err != nil {
return exitWithError(err)
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return exitWithError(err)
}
if err := cni.Check(context.Background(), id, netns); err != nil {
return exitWithError(err)
}
return 0
}
func runCNIStatus(args []string) int {
cni, err := newCNIFromArgs(args)
if err != nil {
return exitWithError(err)
}
if err := cni.Load(gocni.WithLoNetwork, gocni.WithDefaultConf); err != nil {
return exitWithError(err)
}
if err := cni.Status(); err != nil {
return exitWithError(err)
}
return 0
}
func parseCNIArgs(args []string) (string, string, error) {
fs := flag.NewFlagSet("cni", flag.ContinueOnError)
fs.SetOutput(io.Discard)
id := fs.String("id", "", "")
netns := fs.String("netns", "", "")
pid := fs.Int("pid", 0, "")
_ = fs.String("conf-dir", "", "")
_ = fs.String("bin-dir", "", "")
_ = fs.String("if-prefix", "", "")
if err := fs.Parse(args); err != nil {
return "", "", err
}
if *id == "" {
return "", "", fmt.Errorf("missing --id")
}
if *netns == "" && *pid == 0 {
return "", "", fmt.Errorf("missing --netns or --pid")
}
if *netns == "" {
*netns = filepath.Join("/proc", strconv.Itoa(*pid), "ns", "net")
}
return *id, *netns, nil
}
func newCNIFromArgs(args []string) (gocni.CNI, error) {
fs := flag.NewFlagSet("cni", flag.ContinueOnError)
fs.SetOutput(io.Discard)
confDir := fs.String("conf-dir", "", "")
binDir := fs.String("bin-dir", "", "")
ifPrefix := fs.String("if-prefix", "", "")
_ = fs.String("id", "", "")
_ = fs.String("netns", "", "")
_ = fs.Int("pid", 0, "")
if err := fs.Parse(args); err != nil {
return nil, err
}
opts := []gocni.Opt{}
if strings.TrimSpace(*binDir) != "" {
opts = append(opts, gocni.WithPluginDir([]string{*binDir}))
}
if strings.TrimSpace(*confDir) != "" {
opts = append(opts, gocni.WithPluginConfDir(*confDir))
}
if strings.TrimSpace(*ifPrefix) != "" {
opts = append(opts, gocni.WithInterfacePrefix(*ifPrefix))
}
return gocni.New(opts...)
}
func exitWithError(err error) int {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
return 1
}
-120
View File
@@ -1,120 +0,0 @@
// feishu-echo is a minimal Feishu bot that connects via WebSocket and counts received events.
// Used to verify whether message loss is due to our app logic or network/Feishu delivery.
//
// Usage:
//
// FEISHU_APP_ID=xxx FEISHU_APP_SECRET=xxx FEISHU_ENCRYPT=xxx FEISHU_VERIFY=xxx go run ./cmd/feishu-echo
package main
import (
"context"
"log"
"os"
"os/signal"
"strings"
"sync/atomic"
"time"
larkim "github.com/larksuite/oapi-sdk-go/v3/service/im/v1"
larkws "github.com/larksuite/oapi-sdk-go/v3/ws"
"github.com/larksuite/oapi-sdk-go/v3/event/dispatcher"
)
type eventCounts struct {
messageReceive atomic.Int64
messageRead atomic.Int64
reactionCreated atomic.Int64
reactionDeleted atomic.Int64
}
func (c *eventCounts) log() {
log.Printf("[feishu-echo] counts: receive=%d read=%d reaction_created=%d reaction_deleted=%d",
c.messageReceive.Load(), c.messageRead.Load(), c.reactionCreated.Load(), c.reactionDeleted.Load())
}
func main() {
appID := strings.TrimSpace(os.Getenv("FEISHU_APP_ID"))
appSecret := strings.TrimSpace(os.Getenv("FEISHU_APP_SECRET"))
encryptKey := strings.TrimSpace(os.Getenv("FEISHU_ENCRYPT"))
verifyToken := strings.TrimSpace(os.Getenv("FEISHU_VERIFY"))
if appID == "" || appSecret == "" {
log.Fatal("FEISHU_APP_ID and FEISHU_APP_SECRET are required")
}
log.Printf("[feishu-echo] starting with app_id=%s (encrypt=%v, verify=%v)", appID, encryptKey != "", verifyToken != "")
counts := new(eventCounts)
eventDispatcher := dispatcher.NewEventDispatcher(verifyToken, encryptKey)
eventDispatcher.OnP2MessageReceiveV1(func(_ context.Context, _ *larkim.P2MessageReceiveV1) error {
counts.messageReceive.Add(1)
counts.log()
return nil
})
eventDispatcher.OnP2MessageReadV1(func(_ context.Context, _ *larkim.P2MessageReadV1) error {
counts.messageRead.Add(1)
counts.log()
return nil
})
eventDispatcher.OnP2MessageReactionCreatedV1(func(_ context.Context, _ *larkim.P2MessageReactionCreatedV1) error {
counts.reactionCreated.Add(1)
counts.log()
return nil
})
eventDispatcher.OnP2MessageReactionDeletedV1(func(_ context.Context, _ *larkim.P2MessageReactionDeletedV1) error {
counts.reactionDeleted.Add(1)
counts.log()
return nil
})
client := larkws.NewClient(
appID,
appSecret,
larkws.WithEventHandler(eventDispatcher),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
<-sig
log.Println("[feishu-echo] interrupt, shutting down")
cancel()
counts.log()
os.Exit(0)
}()
const reconnectDelay = 3 * time.Second
run:
for {
if ctx.Err() != nil {
break run
}
log.Println("[feishu-echo] connecting to Feishu WebSocket...")
err := client.Start(ctx)
if ctx.Err() != nil {
break run
}
if err != nil {
log.Printf("[feishu-echo] client error: %v; reconnecting in %v", err, reconnectDelay)
} else {
log.Printf("[feishu-echo] connection closed; reconnecting in %v", reconnectDelay)
}
timer := time.NewTimer(reconnectDelay)
select {
case <-ctx.Done():
timer.Stop()
break run
case <-timer.C:
}
}
counts.log()
log.Println("[feishu-echo] stopped")
}
-20
View File
@@ -1,20 +0,0 @@
package main
import (
"testing"
)
func TestEventCounts(t *testing.T) {
c := new(eventCounts)
c.log()
if c.messageReceive.Load() != 0 || c.messageRead.Load() != 0 {
t.Fatalf("initial counts should be 0")
}
c.messageReceive.Add(2)
c.messageRead.Add(1)
c.reactionCreated.Add(1)
if c.messageReceive.Load() != 2 || c.messageRead.Load() != 1 || c.reactionCreated.Load() != 1 {
t.Fatalf("counts after add: receive=2 read=1 reaction_created=1")
}
c.log()
}