From 1bb90c70f4ef1c7971d083cc990a0bed8b93e2f9 Mon Sep 17 00:00:00 2001 From: BBQ Date: Sun, 8 Mar 2026 16:29:38 +0800 Subject: [PATCH 1/2] fix(text): avoid breaking UTF-8 during truncation Use rune-aware truncation for user-facing text and log previews so multibyte content is not corrupted in memory context, Telegram messages, or diagnostics. --- internal/channel/adapters/common/logging.go | 11 ++-- .../channel/adapters/telegram/telegram.go | 12 +--- .../adapters/telegram/telegram_test.go | 13 +++-- internal/conversation/flow/resolver.go | 6 +- internal/mcp/oauth.go | 6 +- internal/memory/provider/builtin.go | 6 +- internal/textutil/truncate.go | 52 ++++++++++++++++++ internal/textutil/truncate_test.go | 55 +++++++++++++++++++ 8 files changed, 131 insertions(+), 30 deletions(-) create mode 100644 internal/textutil/truncate.go create mode 100644 internal/textutil/truncate_test.go diff --git a/internal/channel/adapters/common/logging.go b/internal/channel/adapters/common/logging.go index bc680421..379cf724 100644 --- a/internal/channel/adapters/common/logging.go +++ b/internal/channel/adapters/common/logging.go @@ -1,7 +1,11 @@ // Package common provides shared utilities for channel adapters. package common -import "strings" +import ( + "strings" + + "github.com/memohai/memoh/internal/textutil" +) // SummarizeText returns a truncated preview of the text, limited to 120 characters. func SummarizeText(text string) string { @@ -10,8 +14,5 @@ func SummarizeText(text string) string { return "" } const limit = 120 - if len(value) <= limit { - return value - } - return value[:limit] + "..." + return textutil.TruncateRunesWithSuffix(value, limit, "...") } diff --git a/internal/channel/adapters/telegram/telegram.go b/internal/channel/adapters/telegram/telegram.go index b62be257..8dc2105a 100644 --- a/internal/channel/adapters/telegram/telegram.go +++ b/internal/channel/adapters/telegram/telegram.go @@ -20,6 +20,7 @@ import ( "github.com/memohai/memoh/internal/channel" "github.com/memohai/memoh/internal/channel/adapters/common" "github.com/memohai/memoh/internal/media" + "github.com/memohai/memoh/internal/textutil" ) const ( @@ -1515,16 +1516,7 @@ func sanitizeTelegramText(text string) string { // truncateTelegramText truncates text to telegramMaxMessageLength on a valid // UTF-8 rune boundary, appending "..." when truncation occurs. func truncateTelegramText(text string) string { - if len(text) <= telegramMaxMessageLength { - return text - } - const suffix = "..." - limit := telegramMaxMessageLength - len(suffix) - // Walk backwards to a rune boundary. - for limit > 0 && !utf8.RuneStart(text[limit]) { - limit-- - } - return text[:limit] + suffix + return textutil.TruncateRunesWithSuffix(text, telegramMaxMessageLength, "...") } // ProcessingStarted sends a "typing" chat action to indicate processing. diff --git a/internal/channel/adapters/telegram/telegram_test.go b/internal/channel/adapters/telegram/telegram_test.go index 42e8b172..df4d202b 100644 --- a/internal/channel/adapters/telegram/telegram_test.go +++ b/internal/channel/adapters/telegram/telegram_test.go @@ -593,22 +593,25 @@ func TestTruncateTelegramText(t *testing.T) { // Over limit with ASCII. over := strings.Repeat("a", telegramMaxMessageLength+100) got := truncateTelegramText(over) - if len(got) > telegramMaxMessageLength { - t.Fatalf("truncated text should be <= %d bytes: got %d", telegramMaxMessageLength, len(got)) + if utf8.RuneCountInString(got) > telegramMaxMessageLength { + t.Fatalf("truncated text should be <= %d chars: got %d", telegramMaxMessageLength, utf8.RuneCountInString(got)) } if !strings.HasSuffix(got, "...") { t.Fatalf("truncated text should end with '...': %q", got[len(got)-10:]) } // Over limit with multi-byte characters (Chinese: 3 bytes each). - multi := strings.Repeat("\u4f60", telegramMaxMessageLength) + multi := strings.Repeat("\u4f60", telegramMaxMessageLength+1) got = truncateTelegramText(multi) - if len(got) > telegramMaxMessageLength { - t.Fatalf("truncated multi-byte text should be <= %d bytes: got %d", telegramMaxMessageLength, len(got)) + if utf8.RuneCountInString(got) > telegramMaxMessageLength { + t.Fatalf("truncated multi-byte text should be <= %d chars: got %d", telegramMaxMessageLength, utf8.RuneCountInString(got)) } if !strings.HasSuffix(got, "...") { t.Fatal("truncated multi-byte text should end with '...'") } + if utf8.RuneCountInString(got) != telegramMaxMessageLength { + t.Fatalf("truncated multi-byte text should keep exact char budget: got %d", utf8.RuneCountInString(got)) + } // Verify no broken runes. trimmed := strings.TrimSuffix(got, "...") for i := 0; i < len(trimmed); { diff --git a/internal/conversation/flow/resolver.go b/internal/conversation/flow/resolver.go index 6ff720b9..78f2f84c 100644 --- a/internal/conversation/flow/resolver.go +++ b/internal/conversation/flow/resolver.go @@ -28,6 +28,7 @@ import ( "github.com/memohai/memoh/internal/models" "github.com/memohai/memoh/internal/schedule" "github.com/memohai/memoh/internal/settings" + "github.com/memohai/memoh/internal/textutil" ) const ( @@ -1918,10 +1919,7 @@ func nonNilModelMessages(m []conversation.ModelMessage) []conversation.ModelMess } func truncate(s string, n int) string { - if len(s) <= n { - return s - } - return s[:n] + "..." + return textutil.TruncateRunesWithSuffix(s, n, "...") } func parseResolverUUID(id string) (pgtype.UUID, error) { diff --git a/internal/mcp/oauth.go b/internal/mcp/oauth.go index 614cad56..8d0fffdc 100644 --- a/internal/mcp/oauth.go +++ b/internal/mcp/oauth.go @@ -19,6 +19,7 @@ import ( "github.com/memohai/memoh/internal/db" "github.com/memohai/memoh/internal/db/sqlc" + "github.com/memohai/memoh/internal/textutil" ) // OAuthService manages OAuth flows for MCP connections. @@ -667,10 +668,7 @@ func parseTokenResponse(body []byte) (*tokenResponse, error) { } func truncate(s string, maxLen int) string { - if len(s) <= maxLen { - return s - } - return s[:maxLen] + "..." + return textutil.TruncateRunesWithSuffix(s, maxLen, "...") } func (s *OAuthService) refreshToken(ctx context.Context, tokenEndpoint, refreshToken, clientID, resourceURI string) (*tokenResponse, error) { diff --git a/internal/memory/provider/builtin.go b/internal/memory/provider/builtin.go index 3ceb9e1c..497b2149 100644 --- a/internal/memory/provider/builtin.go +++ b/internal/memory/provider/builtin.go @@ -9,6 +9,7 @@ import ( "github.com/memohai/memoh/internal/conversation" "github.com/memohai/memoh/internal/mcp" + "github.com/memohai/memoh/internal/textutil" ) const ( @@ -379,10 +380,11 @@ func (p *BuiltinProvider) Usage(ctx context.Context, filters map[string]any) (Us func truncateSnippet(s string, n int) string { trimmed := strings.TrimSpace(s) - if len(trimmed) <= n { + truncated := textutil.TruncateRunes(trimmed, n) + if truncated == trimmed { return trimmed } - return strings.TrimSpace(trimmed[:n]) + "..." + return strings.TrimSpace(truncated) + "..." } func deduplicateItems(items []MemoryItem) []MemoryItem { diff --git a/internal/textutil/truncate.go b/internal/textutil/truncate.go new file mode 100644 index 00000000..606b0828 --- /dev/null +++ b/internal/textutil/truncate.go @@ -0,0 +1,52 @@ +package textutil + +import "unicode/utf8" + +// TruncateRunes returns s truncated to at most maxRunes Unicode code points. +func TruncateRunes(s string, maxRunes int) string { + if maxRunes <= 0 || s == "" { + return "" + } + cut, truncated := byteIndexAfterRunes(s, maxRunes) + if !truncated { + return s + } + return s[:cut] +} + +// TruncateRunesWithSuffix returns s truncated to at most maxRunes Unicode code +// points, appending suffix when truncation occurs. +func TruncateRunesWithSuffix(s string, maxRunes int, suffix string) string { + if maxRunes <= 0 || s == "" { + return "" + } + if _, truncated := byteIndexAfterRunes(s, maxRunes); !truncated { + return s + } + if suffix == "" { + return TruncateRunes(s, maxRunes) + } + suffixRunes := utf8.RuneCountInString(suffix) + if suffixRunes >= maxRunes { + return TruncateRunes(s, maxRunes) + } + cut, truncated := byteIndexAfterRunes(s, maxRunes-suffixRunes) + if !truncated { + return s + } + return s[:cut] + suffix +} + +func byteIndexAfterRunes(s string, maxRunes int) (int, bool) { + if maxRunes <= 0 || s == "" { + return 0, len(s) > 0 + } + count := 0 + for i := range s { + if count == maxRunes { + return i, true + } + count++ + } + return len(s), false +} diff --git a/internal/textutil/truncate_test.go b/internal/textutil/truncate_test.go new file mode 100644 index 00000000..410b6b29 --- /dev/null +++ b/internal/textutil/truncate_test.go @@ -0,0 +1,55 @@ +package textutil + +import ( + "strings" + "testing" + "unicode/utf8" +) + +func TestTruncateRunes(t *testing.T) { + t.Parallel() + + text := "你好世界" + got := TruncateRunes(text, 3) + if got != "你好世" { + t.Fatalf("TruncateRunes() = %q, want %q", got, "你好世") + } + if !utf8.ValidString(got) { + t.Fatalf("TruncateRunes() returned invalid UTF-8: %q", got) + } +} + +func TestTruncateRunesWithSuffix(t *testing.T) { + t.Parallel() + + text := strings.Repeat("你", 10) + "abc" + got := TruncateRunesWithSuffix(text, 8, "...") + if utf8.RuneCountInString(got) != 8 { + t.Fatalf("TruncateRunesWithSuffix() rune count = %d, want 8", utf8.RuneCountInString(got)) + } + if got != strings.Repeat("你", 5)+"..." { + t.Fatalf("TruncateRunesWithSuffix() = %q", got) + } + if !utf8.ValidString(got) { + t.Fatalf("TruncateRunesWithSuffix() returned invalid UTF-8: %q", got) + } +} + +func TestTruncateRunesWithSuffixNoTruncation(t *testing.T) { + t.Parallel() + + text := "你好世界" + if got := TruncateRunesWithSuffix(text, 8, "..."); got != text { + t.Fatalf("TruncateRunesWithSuffix() = %q, want %q", got, text) + } +} + +func TestTruncateRunesWithSuffixKeepsInvalidUTF8Bytes(t *testing.T) { + t.Parallel() + + text := "ab\xffcd" + got := TruncateRunesWithSuffix(text, 4, "...") + if got != "a..." { + t.Fatalf("TruncateRunesWithSuffix() = %q, want %q", got, "a...") + } +} From a1e58792c9e83853a07cc48b50560a2c4c36565d Mon Sep 17 00:00:00 2001 From: BBQ <35603386+HoneyBBQ@users.noreply.github.com> Date: Wed, 11 Mar 2026 19:06:47 +0800 Subject: [PATCH 2/2] fix(mcp): recover data from orphaned snapshots on container rebuild (#228) When a container is deleted but its snapshot survives (dev image rebuild, containerd metadata loss, manual ctr deletion), the reconciliation path previously created a fresh container and unconditionally destroyed the old snapshot via prepareSnapshot, causing complete data loss. Manager.Start now detects orphaned snapshots before EnsureBot runs, exports /data to a backup archive, and restores it into the new container's snapshot before the task starts. --- internal/mcp/dataio.go | 98 +++++++++++++++++++++++++++++++++++++++++ internal/mcp/manager.go | 33 +++++++++++--- 2 files changed, 125 insertions(+), 6 deletions(-) diff --git a/internal/mcp/dataio.go b/internal/mcp/dataio.go index 9c2ecfbd..86e93c53 100644 --- a/internal/mcp/dataio.go +++ b/internal/mcp/dataio.go @@ -239,6 +239,104 @@ func (m *Manager) importLegacyDir(ctx context.Context, botID, srcDir string) err return nil } +// recoverOrphanedSnapshot detects a snapshot whose container was deleted +// (e.g. dev image rebuild, containerd metadata loss) and exports /data to a +// backup archive. The caller should invoke restorePreservedIntoSnapshot after +// creating the replacement container. Returns true when data was preserved. +func (m *Manager) recoverOrphanedSnapshot(ctx context.Context, botID string) bool { + snapshotter := m.cfg.Snapshotter + if snapshotter == "" { + return false + } + + snapshotKey := m.containerID(botID) + raw, err := m.service.SnapshotMounts(ctx, snapshotter, snapshotKey) + if err != nil { + return false + } + + mounts := make([]mount.Mount, len(raw)) + for i, r := range raw { + mounts[i] = mount.Mount{Type: r.Type, Source: r.Source, Options: r.Options} + } + + backupPath := m.backupPath(botID) + if err := os.MkdirAll(filepath.Dir(backupPath), 0o750); err != nil { + m.logger.Warn("recover orphaned snapshot: mkdir failed", + slog.String("bot_id", botID), slog.Any("error", err)) + return false + } + + f, err := os.Create(backupPath) //nolint:gosec // G304: operator-controlled path + if err != nil { + m.logger.Warn("recover orphaned snapshot: create backup failed", + slog.String("bot_id", botID), slog.Any("error", err)) + return false + } + + writeErr := mount.WithReadonlyTempMount(ctx, mounts, func(root string) error { + dataDir := mountedDataDir(root) + if _, statErr := os.Stat(dataDir); statErr != nil { + return nil + } + return tarGzDir(f, dataDir) + }) + + closeErr := f.Close() + if writeErr != nil { + _ = os.Remove(backupPath) + m.logger.Warn("recover orphaned snapshot: export failed", + slog.String("bot_id", botID), slog.Any("error", writeErr)) + return false + } + if closeErr != nil { + _ = os.Remove(backupPath) + return false + } + + m.logger.Info("recovered data from orphaned snapshot", + slog.String("bot_id", botID), slog.String("backup", backupPath)) + return true +} + +// restorePreservedIntoSnapshot restores a preserved backup directly into +// the container's snapshot before the task is started. This avoids the +// stop/start cycle that RestorePreservedData (via ImportData) requires. +func (m *Manager) restorePreservedIntoSnapshot(ctx context.Context, botID string) error { + bp := m.backupPath(botID) + f, err := os.Open(bp) //nolint:gosec // G304: operator-controlled path + if err != nil { + return err + } + defer func() { _ = f.Close() }() + + containerID := m.containerID(botID) + info, err := m.service.GetContainer(ctx, containerID) + if err != nil { + return fmt.Errorf("get container: %w", err) + } + + mounts, err := m.snapshotMounts(ctx, info) + if err != nil { + return err + } + + if err := mount.WithTempMount(ctx, mounts, func(root string) error { + dataDir := mountedDataDir(root) + if err := os.MkdirAll(dataDir, 0o750); err != nil { + return err + } + return untarGzDir(f, dataDir) + }); err != nil { + return err + } + + _ = os.Remove(bp) + m.logger.Info("restored preserved data into new container", + slog.String("bot_id", botID)) + return nil +} + // errMountNotSupported indicates the backend doesn't support snapshot mounts // (e.g. Apple Virtualization). Callers fall back to gRPC-based data operations. var errMountNotSupported = errors.New("snapshot mount not supported on this backend") diff --git a/internal/mcp/manager.go b/internal/mcp/manager.go index 51d553f2..26b737ec 100644 --- a/internal/mcp/manager.go +++ b/internal/mcp/manager.go @@ -282,27 +282,48 @@ func (m *Manager) ListBots(ctx context.Context) ([]string, error) { } func (m *Manager) Start(ctx context.Context, botID string) error { + containerID := m.containerID(botID) + + // Before creating a new container, check for an orphaned snapshot + // (container deleted but snapshot with /data survived). Export /data + // to a backup so it can be restored after EnsureBot creates a fresh + // container. This covers dev image rebuilds, containerd metadata loss, + // and manual container deletion. + if _, err := m.service.GetContainer(ctx, containerID); errdefs.IsNotFound(err) { + m.recoverOrphanedSnapshot(ctx, botID) + } + if err := m.EnsureBot(ctx, botID); err != nil { return err } - if err := m.service.StartContainer(ctx, m.containerID(botID), nil); err != nil { + // Restore preserved data (from orphaned snapshot recovery or a previous + // CleanupBotContainer with preserveData) into the fresh snapshot before + // starting the task, avoiding a redundant stop/start cycle. + if m.HasPreservedData(botID) { + if err := m.restorePreservedIntoSnapshot(ctx, botID); err != nil { + m.logger.Warn("restore preserved data into new container failed", + slog.String("bot_id", botID), slog.Any("error", err)) + } + } + + if err := m.service.StartContainer(ctx, containerID, nil); err != nil { return err } netResult, err := m.service.SetupNetwork(ctx, ctr.NetworkSetupRequest{ - ContainerID: m.containerID(botID), + ContainerID: containerID, CNIBinDir: m.cfg.CNIBinaryDir, CNIConfDir: m.cfg.CNIConfigDir, }) if err != nil { - if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil { - m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr)) + if stopErr := m.service.StopContainer(ctx, containerID, &ctr.StopTaskOptions{Force: true}); stopErr != nil { + m.logger.Warn("cleanup: stop task failed", slog.String("container_id", containerID), slog.Any("error", stopErr)) } return err } if netResult.IP == "" { - if stopErr := m.service.StopContainer(ctx, m.containerID(botID), &ctr.StopTaskOptions{Force: true}); stopErr != nil { - m.logger.Warn("cleanup: stop task failed", slog.String("container_id", m.containerID(botID)), slog.Any("error", stopErr)) + if stopErr := m.service.StopContainer(ctx, containerID, &ctr.StopTaskOptions{Force: true}); stopErr != nil { + m.logger.Warn("cleanup: stop task failed", slog.String("container_id", containerID), slog.Any("error", stopErr)) } return fmt.Errorf("network setup returned no IP for bot %s", botID) }