mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
feat: memory search/compact/rebuild api
This commit is contained in:
@@ -130,6 +130,36 @@ func (c *LLMClient) Decide(ctx context.Context, req DecideRequest) (DecideRespon
|
||||
return DecideResponse{Actions: actions}, nil
|
||||
}
|
||||
|
||||
func (c *LLMClient) Compact(ctx context.Context, req CompactRequest) (CompactResponse, error) {
|
||||
if len(req.Memories) == 0 {
|
||||
return CompactResponse{}, fmt.Errorf("memories is required")
|
||||
}
|
||||
memories := make([]map[string]string, 0, len(req.Memories))
|
||||
for _, m := range req.Memories {
|
||||
entry := map[string]string{
|
||||
"id": m.ID,
|
||||
"text": m.Memory,
|
||||
}
|
||||
if m.CreatedAt != "" {
|
||||
entry["created_at"] = m.CreatedAt
|
||||
}
|
||||
memories = append(memories, entry)
|
||||
}
|
||||
systemPrompt, userPrompt := getCompactMemoryMessages(memories, req.TargetCount, req.DecayDays)
|
||||
content, err := c.callChat(ctx, []chatMessage{
|
||||
{Role: "system", Content: systemPrompt},
|
||||
{Role: "user", Content: userPrompt},
|
||||
})
|
||||
if err != nil {
|
||||
return CompactResponse{}, err
|
||||
}
|
||||
var parsed CompactResponse
|
||||
if err := json.Unmarshal([]byte(removeCodeBlocks(content)), &parsed); err != nil {
|
||||
return CompactResponse{}, fmt.Errorf("failed to parse compact response: %w", err)
|
||||
}
|
||||
return parsed, nil
|
||||
}
|
||||
|
||||
func (c *LLMClient) DetectLanguage(ctx context.Context, text string) (string, error) {
|
||||
if strings.TrimSpace(text) == "" {
|
||||
return "", fmt.Errorf("text is required")
|
||||
|
||||
@@ -0,0 +1,327 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mcpgw "github.com/memohai/memoh/internal/mcp"
|
||||
"github.com/memohai/memoh/internal/mcp/providers/container"
|
||||
)
|
||||
|
||||
const (
|
||||
manifestPath = "index/manifest.json"
|
||||
memoryDirPath = "memory"
|
||||
manifestVer = 1
|
||||
)
|
||||
|
||||
// MemoryFS persists memory entries as files inside the bot container via ExecRunner.
|
||||
type MemoryFS struct {
|
||||
execRunner container.ExecRunner
|
||||
workDir string // e.g. "/data"
|
||||
logger *slog.Logger
|
||||
mu sync.Mutex // serialize manifest updates
|
||||
}
|
||||
|
||||
// Manifest is the index file that records everything needed to rebuild memories.
|
||||
type Manifest struct {
|
||||
Version int `json:"version"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
Entries map[string]ManifestEntry `json:"entries"`
|
||||
}
|
||||
|
||||
// ManifestEntry records metadata for a single memory entry.
|
||||
type ManifestEntry struct {
|
||||
Hash string `json:"hash"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
Lang string `json:"lang,omitempty"`
|
||||
Filters map[string]any `json:"filters,omitempty"`
|
||||
}
|
||||
|
||||
// NewMemoryFS creates a MemoryFS that writes through the given ExecRunner.
|
||||
func NewMemoryFS(log *slog.Logger, runner container.ExecRunner, workDir string) *MemoryFS {
|
||||
if log == nil {
|
||||
log = slog.Default()
|
||||
}
|
||||
if strings.TrimSpace(workDir) == "" {
|
||||
workDir = "/data"
|
||||
}
|
||||
return &MemoryFS{
|
||||
execRunner: runner,
|
||||
workDir: workDir,
|
||||
logger: log.With(slog.String("component", "memoryfs")),
|
||||
}
|
||||
}
|
||||
|
||||
// ----- write operations -----
|
||||
|
||||
// PersistMemories writes .md files for new items and incrementally updates the manifest.
|
||||
// Used after Add — does NOT delete existing files.
|
||||
func (fs *MemoryFS) PersistMemories(ctx context.Context, botID string, items []MemoryItem, filters map[string]any) error {
|
||||
if len(items) == 0 {
|
||||
return nil
|
||||
}
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
// Read existing manifest (or create new one).
|
||||
manifest, _ := fs.readManifestLocked(ctx, botID)
|
||||
if manifest == nil {
|
||||
manifest = &Manifest{
|
||||
Version: manifestVer,
|
||||
Entries: map[string]ManifestEntry{},
|
||||
}
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
if strings.TrimSpace(item.ID) == "" || strings.TrimSpace(item.Memory) == "" {
|
||||
continue
|
||||
}
|
||||
// Write individual .md file.
|
||||
if err := fs.writeMemoryFile(ctx, botID, item); err != nil {
|
||||
fs.logger.Warn("write memory file failed", slog.String("id", item.ID), slog.Any("error", err))
|
||||
continue
|
||||
}
|
||||
// Update manifest entry.
|
||||
manifest.Entries[item.ID] = ManifestEntry{
|
||||
Hash: item.Hash,
|
||||
CreatedAt: item.CreatedAt,
|
||||
Filters: filters,
|
||||
}
|
||||
}
|
||||
|
||||
manifest.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
||||
return fs.writeManifest(ctx, botID, manifest)
|
||||
}
|
||||
|
||||
// RebuildFiles does a full replace: deletes all old memory/*.md files, writes new ones,
|
||||
// and rewrites manifest from scratch. Used after Compact.
|
||||
func (fs *MemoryFS) RebuildFiles(ctx context.Context, botID string, items []MemoryItem, filters map[string]any) error {
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
// Delete old memory dir contents.
|
||||
fs.execDeleteDir(ctx, botID, memoryDirPath)
|
||||
|
||||
manifest := &Manifest{
|
||||
Version: manifestVer,
|
||||
UpdatedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
Entries: make(map[string]ManifestEntry, len(items)),
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
if strings.TrimSpace(item.ID) == "" || strings.TrimSpace(item.Memory) == "" {
|
||||
continue
|
||||
}
|
||||
if err := fs.writeMemoryFile(ctx, botID, item); err != nil {
|
||||
fs.logger.Warn("rebuild write memory file failed", slog.String("id", item.ID), slog.Any("error", err))
|
||||
continue
|
||||
}
|
||||
manifest.Entries[item.ID] = ManifestEntry{
|
||||
Hash: item.Hash,
|
||||
CreatedAt: item.CreatedAt,
|
||||
Filters: filters,
|
||||
}
|
||||
}
|
||||
|
||||
return fs.writeManifest(ctx, botID, manifest)
|
||||
}
|
||||
|
||||
// RemoveMemories removes specific memory files from the FS and updates the manifest.
|
||||
func (fs *MemoryFS) RemoveMemories(ctx context.Context, botID string, ids []string) error {
|
||||
if len(ids) == 0 {
|
||||
return nil
|
||||
}
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
manifest, _ := fs.readManifestLocked(ctx, botID)
|
||||
if manifest == nil {
|
||||
manifest = &Manifest{Version: manifestVer, Entries: map[string]ManifestEntry{}}
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
id = strings.TrimSpace(id)
|
||||
if id == "" {
|
||||
continue
|
||||
}
|
||||
fs.execDeleteFile(ctx, botID, fmt.Sprintf("%s/%s.md", memoryDirPath, id))
|
||||
delete(manifest.Entries, id)
|
||||
}
|
||||
|
||||
manifest.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
||||
return fs.writeManifest(ctx, botID, manifest)
|
||||
}
|
||||
|
||||
// RemoveAllMemories deletes all memory files and the manifest.
|
||||
func (fs *MemoryFS) RemoveAllMemories(ctx context.Context, botID string) error {
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
fs.execDeleteDir(ctx, botID, memoryDirPath)
|
||||
emptyManifest := &Manifest{
|
||||
Version: manifestVer,
|
||||
UpdatedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
Entries: map[string]ManifestEntry{},
|
||||
}
|
||||
return fs.writeManifest(ctx, botID, emptyManifest)
|
||||
}
|
||||
|
||||
// ----- read operations -----
|
||||
|
||||
// ReadManifest reads and parses the manifest.json file.
|
||||
func (fs *MemoryFS) ReadManifest(ctx context.Context, botID string) (*Manifest, error) {
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
return fs.readManifestLocked(ctx, botID)
|
||||
}
|
||||
|
||||
func (fs *MemoryFS) readManifestLocked(ctx context.Context, botID string) (*Manifest, error) {
|
||||
content, err := container.ExecRead(ctx, fs.execRunner, botID, fs.workDir, manifestPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var manifest Manifest
|
||||
if err := json.Unmarshal([]byte(content), &manifest); err != nil {
|
||||
return nil, fmt.Errorf("parse manifest: %w", err)
|
||||
}
|
||||
return &manifest, nil
|
||||
}
|
||||
|
||||
// ReadAllMemoryFiles lists and reads all .md files under memory/ and parses their frontmatter.
|
||||
func (fs *MemoryFS) ReadAllMemoryFiles(ctx context.Context, botID string) ([]MemoryItem, error) {
|
||||
entries, err := container.ExecList(ctx, fs.execRunner, botID, fs.workDir, memoryDirPath, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list memory dir: %w", err)
|
||||
}
|
||||
|
||||
var items []MemoryItem
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir || !strings.HasSuffix(entry.Path, ".md") {
|
||||
continue
|
||||
}
|
||||
filePath := memoryDirPath + "/" + entry.Path
|
||||
content, err := container.ExecRead(ctx, fs.execRunner, botID, fs.workDir, filePath)
|
||||
if err != nil {
|
||||
fs.logger.Warn("read memory file failed", slog.String("path", filePath), slog.Any("error", err))
|
||||
continue
|
||||
}
|
||||
item, err := parseMemoryMD(content)
|
||||
if err != nil {
|
||||
fs.logger.Warn("parse memory file failed", slog.String("path", filePath), slog.Any("error", err))
|
||||
continue
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// ----- internal helpers -----
|
||||
|
||||
func (fs *MemoryFS) writeMemoryFile(ctx context.Context, botID string, item MemoryItem) error {
|
||||
content := formatMemoryMD(item)
|
||||
filePath := fmt.Sprintf("%s/%s.md", memoryDirPath, item.ID)
|
||||
return container.ExecWrite(ctx, fs.execRunner, botID, fs.workDir, filePath, content)
|
||||
}
|
||||
|
||||
func (fs *MemoryFS) writeManifest(ctx context.Context, botID string, manifest *Manifest) error {
|
||||
data, err := json.MarshalIndent(manifest, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal manifest: %w", err)
|
||||
}
|
||||
return container.ExecWrite(ctx, fs.execRunner, botID, fs.workDir, manifestPath, string(data))
|
||||
}
|
||||
|
||||
// execDeleteDir removes all files inside a directory (but keeps the directory itself).
|
||||
func (fs *MemoryFS) execDeleteDir(ctx context.Context, botID, dirPath string) {
|
||||
// Use find + rm to avoid shell quoting issues with glob wildcards.
|
||||
script := fmt.Sprintf("find %s -type f -delete 2>/dev/null; true", container.ShellQuote(dirPath))
|
||||
_, err := fs.execRunner.ExecWithCapture(ctx, mcpgw.ExecRequest{
|
||||
BotID: botID,
|
||||
Command: []string{"/bin/sh", "-c", script},
|
||||
WorkDir: fs.workDir,
|
||||
})
|
||||
if err != nil {
|
||||
fs.logger.Warn("exec delete dir failed", slog.String("path", dirPath), slog.Any("error", err))
|
||||
}
|
||||
}
|
||||
|
||||
// execDeleteFile removes a single file.
|
||||
func (fs *MemoryFS) execDeleteFile(ctx context.Context, botID, filePath string) {
|
||||
script := fmt.Sprintf("rm -f %s", container.ShellQuote(filePath))
|
||||
_, err := fs.execRunner.ExecWithCapture(ctx, mcpgw.ExecRequest{
|
||||
BotID: botID,
|
||||
Command: []string{"/bin/sh", "-c", script},
|
||||
WorkDir: fs.workDir,
|
||||
})
|
||||
if err != nil {
|
||||
fs.logger.Warn("exec delete file failed", slog.String("path", filePath), slog.Any("error", err))
|
||||
}
|
||||
}
|
||||
|
||||
// ----- .md formatting / parsing -----
|
||||
|
||||
func formatMemoryMD(item MemoryItem) string {
|
||||
var b strings.Builder
|
||||
b.WriteString("---\n")
|
||||
b.WriteString(fmt.Sprintf("id: %s\n", item.ID))
|
||||
if item.Hash != "" {
|
||||
b.WriteString(fmt.Sprintf("hash: %s\n", item.Hash))
|
||||
}
|
||||
if item.CreatedAt != "" {
|
||||
b.WriteString(fmt.Sprintf("created_at: %s\n", item.CreatedAt))
|
||||
}
|
||||
if item.UpdatedAt != "" {
|
||||
b.WriteString(fmt.Sprintf("updated_at: %s\n", item.UpdatedAt))
|
||||
}
|
||||
b.WriteString("---\n")
|
||||
b.WriteString(item.Memory)
|
||||
b.WriteString("\n")
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func parseMemoryMD(content string) (MemoryItem, error) {
|
||||
content = strings.TrimSpace(content)
|
||||
if !strings.HasPrefix(content, "---") {
|
||||
return MemoryItem{}, fmt.Errorf("missing frontmatter")
|
||||
}
|
||||
// Split on "---" delimiters.
|
||||
parts := strings.SplitN(content[3:], "---", 2)
|
||||
if len(parts) < 2 {
|
||||
return MemoryItem{}, fmt.Errorf("incomplete frontmatter")
|
||||
}
|
||||
frontmatter := strings.TrimSpace(parts[0])
|
||||
body := strings.TrimSpace(parts[1])
|
||||
|
||||
item := MemoryItem{Memory: body}
|
||||
for _, line := range strings.Split(frontmatter, "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
key, value, found := strings.Cut(line, ":")
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
key = strings.TrimSpace(key)
|
||||
value = strings.TrimSpace(value)
|
||||
switch key {
|
||||
case "id":
|
||||
item.ID = value
|
||||
case "hash":
|
||||
item.Hash = value
|
||||
case "created_at":
|
||||
item.CreatedAt = value
|
||||
case "updated_at":
|
||||
item.UpdatedAt = value
|
||||
}
|
||||
}
|
||||
if item.ID == "" {
|
||||
return MemoryItem{}, fmt.Errorf("missing id in frontmatter")
|
||||
}
|
||||
return item, nil
|
||||
}
|
||||
@@ -106,6 +106,42 @@ Follow the instruction mentioned below:
|
||||
Do not return anything except the JSON format.`, toJSON(retrievedOldMemory), toJSON(newRetrievedFacts), "```json", "```")
|
||||
}
|
||||
|
||||
func getCompactMemoryMessages(memories []map[string]string, targetCount int, decayDays int) (string, string) {
|
||||
decayInstruction := ""
|
||||
if decayDays > 0 {
|
||||
decayInstruction = fmt.Sprintf(`
|
||||
10. TIME DECAY: Today's date is %s. Memories older than %d days are LOW PRIORITY.
|
||||
- When deciding which facts to merge or drop, prefer dropping/merging older low-priority memories over newer ones.
|
||||
- If an older memory and a newer memory convey similar information, keep the newer one.
|
||||
- Very old memories should only be kept if they contain unique, still-relevant information (e.g. name, identity, long-term preferences).
|
||||
`, time.Now().UTC().Format("2006-01-02"), decayDays)
|
||||
}
|
||||
|
||||
systemPrompt := fmt.Sprintf(`You are a Memory Compactor. Your job is to consolidate a list of memory entries into a smaller, more concise set.
|
||||
|
||||
Guidelines:
|
||||
1. Merge similar or redundant entries into single, concise facts.
|
||||
2. If two entries contradict each other, keep only the more recent or more specific one.
|
||||
3. Preserve all unique, non-redundant information — do not lose important facts.
|
||||
4. Each output fact should be a single, self-contained statement.
|
||||
5. Target approximately %d output facts (but use fewer if the information naturally consolidates to less, and never produce more than the input count).
|
||||
6. Keep the same language as the original memories. Do not translate.
|
||||
7. Return a JSON object with a single key "facts" containing an array of strings.
|
||||
8. DO NOT RETURN ANYTHING ELSE OTHER THAN THE JSON FORMAT.
|
||||
9. DO NOT ADD ANY ADDITIONAL TEXT OR CODEBLOCK IN THE JSON FIELDS WHICH MAKE IT INVALID SUCH AS "%s" OR "%s".%s
|
||||
|
||||
Example:
|
||||
Input memories:
|
||||
[{"id":"1","text":"User likes dark mode","created_at":"2026-01-01"},{"id":"2","text":"User prefers dark theme for all apps","created_at":"2026-02-10"},{"id":"3","text":"User is a software engineer","created_at":"2026-01-15"},{"id":"4","text":"User works as a developer","created_at":"2026-02-01"}]
|
||||
Target: 2
|
||||
|
||||
Output: {"facts": ["User prefers dark theme for all apps", "User is a software engineer"]}
|
||||
`, targetCount, "```json", "```", decayInstruction)
|
||||
|
||||
userPrompt := fmt.Sprintf("Consolidate the following memories into approximately %d concise facts:\n\n%s", targetCount, toJSON(memories))
|
||||
return systemPrompt, userPrompt
|
||||
}
|
||||
|
||||
func getLanguageDetectionMessages(text string) (string, string) {
|
||||
systemPrompt := `You are a language classifier for the given input text.
|
||||
Return a JSON object with a single key "language" whose value is one of the allowed codes.
|
||||
|
||||
@@ -329,6 +329,22 @@ func (s *QdrantStore) Delete(ctx context.Context, id string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *QdrantStore) DeleteBatch(ctx context.Context, ids []string) error {
|
||||
if len(ids) == 0 {
|
||||
return nil
|
||||
}
|
||||
pointIDs := make([]*qdrant.PointId, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
pointIDs = append(pointIDs, qdrant.NewIDUUID(id))
|
||||
}
|
||||
_, err := s.client.Delete(ctx, &qdrant.DeletePoints{
|
||||
CollectionName: s.collection,
|
||||
Wait: qdrant.PtrOf(true),
|
||||
Points: qdrant.NewPointsSelectorIDs(pointIDs),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *QdrantStore) List(ctx context.Context, limit int, filters map[string]any) ([]qdrantPoint, error) {
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
@@ -379,6 +395,19 @@ func (s *QdrantStore) Scroll(ctx context.Context, limit int, filters map[string]
|
||||
return result, nextOffset, nil
|
||||
}
|
||||
|
||||
func (s *QdrantStore) Count(ctx context.Context, filters map[string]any) (uint64, error) {
|
||||
filter := buildQdrantFilter(filters)
|
||||
result, err := s.client.Count(ctx, &qdrant.CountPoints{
|
||||
CollectionName: s.collection,
|
||||
Filter: filter,
|
||||
Exact: qdrant.PtrOf(true),
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *QdrantStore) DeleteAll(ctx context.Context, filters map[string]any) error {
|
||||
filter := buildQdrantFilter(filters)
|
||||
if filter == nil {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -448,6 +449,26 @@ func (s *Service) Delete(ctx context.Context, memoryID string) (DeleteResponse,
|
||||
return DeleteResponse{Message: "Memory deleted successfully!"}, nil
|
||||
}
|
||||
|
||||
func (s *Service) DeleteBatch(ctx context.Context, memoryIDs []string) (DeleteResponse, error) {
|
||||
if len(memoryIDs) == 0 {
|
||||
return DeleteResponse{}, fmt.Errorf("memory_ids is required")
|
||||
}
|
||||
cleaned := make([]string, 0, len(memoryIDs))
|
||||
for _, id := range memoryIDs {
|
||||
id = strings.TrimSpace(id)
|
||||
if id != "" {
|
||||
cleaned = append(cleaned, id)
|
||||
}
|
||||
}
|
||||
if len(cleaned) == 0 {
|
||||
return DeleteResponse{}, fmt.Errorf("memory_ids is required")
|
||||
}
|
||||
if err := s.store.DeleteBatch(ctx, cleaned); err != nil {
|
||||
return DeleteResponse{}, err
|
||||
}
|
||||
return DeleteResponse{Message: fmt.Sprintf("%d memories deleted successfully!", len(cleaned))}, nil
|
||||
}
|
||||
|
||||
func (s *Service) DeleteAll(ctx context.Context, req DeleteAllRequest) (DeleteResponse, error) {
|
||||
filters := map[string]any{}
|
||||
for k, v := range req.Filters {
|
||||
@@ -471,6 +492,142 @@ func (s *Service) DeleteAll(ctx context.Context, req DeleteAllRequest) (DeleteRe
|
||||
return DeleteResponse{Message: "Memories deleted successfully!"}, nil
|
||||
}
|
||||
|
||||
func (s *Service) Compact(ctx context.Context, filters map[string]any, ratio float64, decayDays int) (CompactResult, error) {
|
||||
if s.llm == nil {
|
||||
return CompactResult{}, fmt.Errorf("llm not configured")
|
||||
}
|
||||
if s.store == nil {
|
||||
return CompactResult{}, fmt.Errorf("qdrant store not configured")
|
||||
}
|
||||
if ratio <= 0 || ratio > 1 {
|
||||
ratio = 0.5
|
||||
}
|
||||
|
||||
// Fetch all existing memories.
|
||||
points, err := s.store.List(ctx, 0, filters)
|
||||
if err != nil {
|
||||
return CompactResult{}, err
|
||||
}
|
||||
beforeCount := len(points)
|
||||
if beforeCount <= 1 {
|
||||
// Nothing to compact.
|
||||
items := make([]MemoryItem, 0, len(points))
|
||||
for _, p := range points {
|
||||
items = append(items, payloadToMemoryItem(p.ID, p.Payload))
|
||||
}
|
||||
return CompactResult{
|
||||
BeforeCount: beforeCount,
|
||||
AfterCount: beforeCount,
|
||||
Ratio: 1.0,
|
||||
Results: items,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Build candidate list and compute target.
|
||||
candidates := make([]CandidateMemory, 0, beforeCount)
|
||||
for _, p := range points {
|
||||
candidates = append(candidates, CandidateMemory{
|
||||
ID: p.ID,
|
||||
Memory: fmt.Sprint(p.Payload["data"]),
|
||||
CreatedAt: fmt.Sprint(p.Payload["created_at"]),
|
||||
})
|
||||
}
|
||||
targetCount := int(math.Round(float64(beforeCount) * ratio))
|
||||
if targetCount < 1 {
|
||||
targetCount = 1
|
||||
}
|
||||
|
||||
// Ask LLM to consolidate.
|
||||
compactResp, err := s.llm.Compact(ctx, CompactRequest{
|
||||
Memories: candidates,
|
||||
TargetCount: targetCount,
|
||||
DecayDays: decayDays,
|
||||
})
|
||||
if err != nil {
|
||||
return CompactResult{}, fmt.Errorf("compact llm call failed: %w", err)
|
||||
}
|
||||
if len(compactResp.Facts) == 0 {
|
||||
return CompactResult{}, fmt.Errorf("compact returned no facts")
|
||||
}
|
||||
|
||||
// Delete old memories.
|
||||
if err := s.store.DeleteAll(ctx, filters); err != nil {
|
||||
return CompactResult{}, fmt.Errorf("compact delete old failed: %w", err)
|
||||
}
|
||||
|
||||
// Reset BM25 stats for deleted documents.
|
||||
if s.bm25 != nil {
|
||||
for _, p := range points {
|
||||
text := fmt.Sprint(p.Payload["data"])
|
||||
lang := fmt.Sprint(p.Payload["lang"])
|
||||
if strings.TrimSpace(text) == "" || strings.TrimSpace(lang) == "" {
|
||||
continue
|
||||
}
|
||||
freq, docLen, err := s.bm25.TermFrequencies(lang, text)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
s.bm25.RemoveDocument(lang, freq, docLen)
|
||||
}
|
||||
}
|
||||
|
||||
// Add compacted facts.
|
||||
results := make([]MemoryItem, 0, len(compactResp.Facts))
|
||||
for _, fact := range compactResp.Facts {
|
||||
if strings.TrimSpace(fact) == "" {
|
||||
continue
|
||||
}
|
||||
item, err := s.applyAdd(ctx, fact, filters, nil, false)
|
||||
if err != nil {
|
||||
return CompactResult{}, fmt.Errorf("compact add failed: %w", err)
|
||||
}
|
||||
results = append(results, item)
|
||||
}
|
||||
|
||||
afterCount := len(results)
|
||||
actualRatio := float64(afterCount) / float64(beforeCount)
|
||||
return CompactResult{
|
||||
BeforeCount: beforeCount,
|
||||
AfterCount: afterCount,
|
||||
Ratio: math.Round(actualRatio*100) / 100,
|
||||
Results: results,
|
||||
}, nil
|
||||
}
|
||||
|
||||
const (
|
||||
// Estimated sparse vector overhead per point: ~200 dims * 8 bytes (4 index + 4 value).
|
||||
sparseVectorOverheadBytes = 1600
|
||||
// Estimated payload metadata overhead per point (hash, dates, filters, lang, metadata JSON).
|
||||
payloadMetadataOverheadBytes = 256
|
||||
)
|
||||
|
||||
func (s *Service) Usage(ctx context.Context, filters map[string]any) (UsageResponse, error) {
|
||||
if s.store == nil {
|
||||
return UsageResponse{}, fmt.Errorf("qdrant store not configured")
|
||||
}
|
||||
points, err := s.store.List(ctx, 0, filters)
|
||||
if err != nil {
|
||||
return UsageResponse{}, err
|
||||
}
|
||||
count := len(points)
|
||||
var totalTextBytes int64
|
||||
for _, p := range points {
|
||||
text := fmt.Sprint(p.Payload["data"])
|
||||
totalTextBytes += int64(len(text))
|
||||
}
|
||||
var avgTextBytes int64
|
||||
if count > 0 {
|
||||
avgTextBytes = totalTextBytes / int64(count)
|
||||
}
|
||||
estimatedStorage := totalTextBytes + int64(count)*(sparseVectorOverheadBytes+payloadMetadataOverheadBytes)
|
||||
return UsageResponse{
|
||||
Count: count,
|
||||
TotalTextBytes: totalTextBytes,
|
||||
AvgTextBytes: avgTextBytes,
|
||||
EstimatedStorageBytes: estimatedStorage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) WarmupBM25(ctx context.Context, batchSize int) error {
|
||||
if s.bm25 == nil || s.store == nil {
|
||||
return nil
|
||||
@@ -602,6 +759,42 @@ func (s *Service) applyAdd(ctx context.Context, text string, filters map[string]
|
||||
return payloadToMemoryItem(id, payload), nil
|
||||
}
|
||||
|
||||
// RebuildAdd inserts a memory with a specific ID (from filesystem recovery).
|
||||
// Like applyAdd but preserves the given ID instead of generating a new UUID.
|
||||
func (s *Service) RebuildAdd(ctx context.Context, id, text string, filters map[string]any) (MemoryItem, error) {
|
||||
if s.store == nil {
|
||||
return MemoryItem{}, fmt.Errorf("qdrant store not configured")
|
||||
}
|
||||
if s.bm25 == nil {
|
||||
return MemoryItem{}, fmt.Errorf("bm25 indexer not configured")
|
||||
}
|
||||
if strings.TrimSpace(id) == "" {
|
||||
return MemoryItem{}, fmt.Errorf("id is required for rebuild")
|
||||
}
|
||||
lang, err := s.detectLanguage(ctx, text)
|
||||
if err != nil {
|
||||
return MemoryItem{}, err
|
||||
}
|
||||
termFreq, docLen, err := s.bm25.TermFrequencies(lang, text)
|
||||
if err != nil {
|
||||
return MemoryItem{}, err
|
||||
}
|
||||
sparseIndices, sparseValues := s.bm25.AddDocument(lang, termFreq, docLen)
|
||||
payload := buildPayload(text, filters, nil, "")
|
||||
payload["lang"] = lang
|
||||
point := qdrantPoint{
|
||||
ID: id,
|
||||
SparseIndices: sparseIndices,
|
||||
SparseValues: sparseValues,
|
||||
SparseVectorName: s.store.sparseVectorName,
|
||||
Payload: payload,
|
||||
}
|
||||
if err := s.store.Upsert(ctx, []qdrantPoint{point}); err != nil {
|
||||
return MemoryItem{}, err
|
||||
}
|
||||
return payloadToMemoryItem(id, payload), nil
|
||||
}
|
||||
|
||||
func (s *Service) applyUpdate(ctx context.Context, id, text string, filters map[string]any, metadata map[string]any, embeddingEnabled bool) (MemoryItem, error) {
|
||||
if strings.TrimSpace(id) == "" {
|
||||
return MemoryItem{}, fmt.Errorf("update action missing id")
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
type MockLLM struct {
|
||||
ExtractFunc func(ctx context.Context, req ExtractRequest) (ExtractResponse, error)
|
||||
DecideFunc func(ctx context.Context, req DecideRequest) (DecideResponse, error)
|
||||
CompactFunc func(ctx context.Context, req CompactRequest) (CompactResponse, error)
|
||||
DetectLanguageFunc func(ctx context.Context, text string) (string, error)
|
||||
}
|
||||
|
||||
@@ -20,6 +21,12 @@ func (m *MockLLM) Extract(ctx context.Context, req ExtractRequest) (ExtractRespo
|
||||
func (m *MockLLM) Decide(ctx context.Context, req DecideRequest) (DecideResponse, error) {
|
||||
return m.DecideFunc(ctx, req)
|
||||
}
|
||||
func (m *MockLLM) Compact(ctx context.Context, req CompactRequest) (CompactResponse, error) {
|
||||
if m.CompactFunc != nil {
|
||||
return m.CompactFunc(ctx, req)
|
||||
}
|
||||
return CompactResponse{}, fmt.Errorf("compact not mocked")
|
||||
}
|
||||
func (m *MockLLM) DetectLanguage(ctx context.Context, text string) (string, error) {
|
||||
return m.DetectLanguageFunc(ctx, text)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import "context"
|
||||
type LLM interface {
|
||||
Extract(ctx context.Context, req ExtractRequest) (ExtractResponse, error)
|
||||
Decide(ctx context.Context, req DecideRequest) (DecideResponse, error)
|
||||
Compact(ctx context.Context, req CompactRequest) (CompactResponse, error)
|
||||
DetectLanguage(ctx context.Context, text string) (string, error)
|
||||
}
|
||||
|
||||
@@ -117,9 +118,10 @@ type ExtractResponse struct {
|
||||
}
|
||||
|
||||
type CandidateMemory struct {
|
||||
ID string `json:"id"`
|
||||
Memory string `json:"memory"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
ID string `json:"id"`
|
||||
Memory string `json:"memory"`
|
||||
CreatedAt string `json:"created_at,omitempty"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
type DecideRequest struct {
|
||||
@@ -139,3 +141,34 @@ type DecisionAction struct {
|
||||
type DecideResponse struct {
|
||||
Actions []DecisionAction `json:"actions"`
|
||||
}
|
||||
|
||||
type CompactRequest struct {
|
||||
Memories []CandidateMemory `json:"memories"`
|
||||
TargetCount int `json:"target_count"`
|
||||
DecayDays int `json:"decay_days,omitempty"`
|
||||
}
|
||||
|
||||
type CompactResponse struct {
|
||||
Facts []string `json:"facts"`
|
||||
}
|
||||
|
||||
type CompactResult struct {
|
||||
BeforeCount int `json:"before_count"`
|
||||
AfterCount int `json:"after_count"`
|
||||
Ratio float64 `json:"ratio"`
|
||||
Results []MemoryItem `json:"results"`
|
||||
}
|
||||
|
||||
type UsageResponse struct {
|
||||
Count int `json:"count"`
|
||||
TotalTextBytes int64 `json:"total_text_bytes"`
|
||||
AvgTextBytes int64 `json:"avg_text_bytes"`
|
||||
EstimatedStorageBytes int64 `json:"estimated_storage_bytes"`
|
||||
}
|
||||
|
||||
type RebuildResult struct {
|
||||
FsCount int `json:"fs_count"`
|
||||
QdrantCount int `json:"qdrant_count"`
|
||||
MissingCount int `json:"missing_count"`
|
||||
RestoredCount int `json:"restored_count"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user