mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-25 07:00:48 +09:00
refactor(core): restructure conversation, channel and message domains
- Rename chat module to conversation with flow-based architecture - Move channelidentities into channel/identities subpackage - Add channel/route for routing logic - Add message service with event hub - Add MCP providers: container, directory, schedule - Refactor Feishu/Telegram adapters with directory and stream support - Add platform management page and channel badges in web UI - Update database schema for conversations, messages and channel routes - Add @memoh/shared package for cross-package type definitions
This commit is contained in:
@@ -0,0 +1,124 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultBufferSize is the default per-subscriber channel buffer.
|
||||
DefaultBufferSize = 64
|
||||
)
|
||||
|
||||
// EventType identifies the event category published by the message event hub.
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
// EventTypeMessageCreated is emitted after a message is persisted successfully.
|
||||
EventTypeMessageCreated EventType = "message_created"
|
||||
)
|
||||
|
||||
// Event is the normalized payload emitted by the in-process message event hub.
|
||||
type Event struct {
|
||||
Type EventType `json:"type"`
|
||||
BotID string `json:"bot_id"`
|
||||
Data json.RawMessage `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
// Publisher publishes events to subscribers.
|
||||
type Publisher interface {
|
||||
Publish(event Event)
|
||||
}
|
||||
|
||||
// Subscriber subscribes to bot-scoped events.
|
||||
type Subscriber interface {
|
||||
Subscribe(botID string, buffer int) (string, <-chan Event, func())
|
||||
}
|
||||
|
||||
// Hub is an in-process pub/sub dispatcher for bot-scoped message events.
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
streams map[string]map[string]chan Event
|
||||
}
|
||||
|
||||
// NewHub creates an empty message event hub.
|
||||
func NewHub() *Hub {
|
||||
return &Hub{
|
||||
streams: map[string]map[string]chan Event{},
|
||||
}
|
||||
}
|
||||
|
||||
// Publish broadcasts one event to all subscribers under the same bot ID.
|
||||
// Slow subscribers are dropped in a non-blocking way.
|
||||
func (h *Hub) Publish(event Event) {
|
||||
if h == nil {
|
||||
return
|
||||
}
|
||||
botID := strings.TrimSpace(event.BotID)
|
||||
if botID == "" {
|
||||
return
|
||||
}
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for _, ch := range h.streams[botID] {
|
||||
select {
|
||||
case ch <- event:
|
||||
default:
|
||||
// Drop if receiver is slow to avoid blocking persistence path.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe registers one subscriber under a bot ID.
|
||||
// It returns a stream ID, read-only event channel, and a cancel function.
|
||||
func (h *Hub) Subscribe(botID string, buffer int) (string, <-chan Event, func()) {
|
||||
if h == nil {
|
||||
ch := make(chan Event)
|
||||
close(ch)
|
||||
return "", ch, func() {}
|
||||
}
|
||||
botID = strings.TrimSpace(botID)
|
||||
if botID == "" {
|
||||
ch := make(chan Event)
|
||||
close(ch)
|
||||
return "", ch, func() {}
|
||||
}
|
||||
if buffer <= 0 {
|
||||
buffer = DefaultBufferSize
|
||||
}
|
||||
|
||||
streamID := uuid.NewString()
|
||||
ch := make(chan Event, buffer)
|
||||
|
||||
h.mu.Lock()
|
||||
streams, ok := h.streams[botID]
|
||||
if !ok {
|
||||
streams = map[string]chan Event{}
|
||||
h.streams[botID] = streams
|
||||
}
|
||||
streams[streamID] = ch
|
||||
h.mu.Unlock()
|
||||
|
||||
var once sync.Once
|
||||
cancel := func() {
|
||||
once.Do(func() {
|
||||
h.mu.Lock()
|
||||
streams := h.streams[botID]
|
||||
if streams != nil {
|
||||
if current, ok := streams[streamID]; ok {
|
||||
delete(streams, streamID)
|
||||
close(current)
|
||||
}
|
||||
if len(streams) == 0 {
|
||||
delete(h.streams, botID)
|
||||
}
|
||||
}
|
||||
h.mu.Unlock()
|
||||
})
|
||||
}
|
||||
|
||||
return streamID, ch, cancel
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestHubPublishScopedByBotID(t *testing.T) {
|
||||
hub := NewHub()
|
||||
_, botAStream, cancelA := hub.Subscribe("bot-a", 8)
|
||||
defer cancelA()
|
||||
_, botBStream, cancelB := hub.Subscribe("bot-b", 8)
|
||||
defer cancelB()
|
||||
|
||||
hub.Publish(Event{Type: EventTypeMessageCreated, BotID: "bot-a"})
|
||||
|
||||
select {
|
||||
case <-botAStream:
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatalf("expected event for bot-a subscriber")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-botBStream:
|
||||
t.Fatalf("did not expect bot-b subscriber to receive bot-a event")
|
||||
case <-time.After(120 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubCancelUnsubscribe(t *testing.T) {
|
||||
hub := NewHub()
|
||||
_, stream, cancel := hub.Subscribe("bot-a", 8)
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case _, ok := <-stream:
|
||||
if ok {
|
||||
t.Fatalf("expected stream to be closed after cancel")
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for stream close")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubSlowSubscriberDoesNotBlockPublish(t *testing.T) {
|
||||
hub := NewHub()
|
||||
_, stream, cancel := hub.Subscribe("bot-a", 1)
|
||||
defer cancel()
|
||||
|
||||
hub.Publish(Event{Type: EventTypeMessageCreated, BotID: "bot-a"})
|
||||
hub.Publish(Event{Type: EventTypeMessageCreated, BotID: "bot-a"})
|
||||
hub.Publish(Event{Type: EventTypeMessageCreated, BotID: "bot-a"})
|
||||
|
||||
select {
|
||||
case <-stream:
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatalf("expected at least one event in buffer")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,358 @@
|
||||
package message
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
|
||||
dbpkg "github.com/memohai/memoh/internal/db"
|
||||
"github.com/memohai/memoh/internal/db/sqlc"
|
||||
"github.com/memohai/memoh/internal/message/event"
|
||||
)
|
||||
|
||||
// DBService persists and reads bot history messages.
|
||||
type DBService struct {
|
||||
queries *sqlc.Queries
|
||||
logger *slog.Logger
|
||||
publisher event.Publisher
|
||||
}
|
||||
|
||||
// NewService creates a message service.
|
||||
func NewService(log *slog.Logger, queries *sqlc.Queries, publishers ...event.Publisher) *DBService {
|
||||
if log == nil {
|
||||
log = slog.Default()
|
||||
}
|
||||
var publisher event.Publisher
|
||||
if len(publishers) > 0 {
|
||||
publisher = publishers[0]
|
||||
}
|
||||
return &DBService{
|
||||
queries: queries,
|
||||
logger: log.With(slog.String("service", "message")),
|
||||
publisher: publisher,
|
||||
}
|
||||
}
|
||||
|
||||
// Persist writes a single message to bot_history_messages.
|
||||
func (s *DBService) Persist(ctx context.Context, input PersistInput) (Message, error) {
|
||||
pgBotID, err := dbpkg.ParseUUID(input.BotID)
|
||||
if err != nil {
|
||||
return Message{}, fmt.Errorf("invalid bot id: %w", err)
|
||||
}
|
||||
|
||||
pgRouteID, err := parseOptionalUUID(input.RouteID)
|
||||
if err != nil {
|
||||
return Message{}, fmt.Errorf("invalid route id: %w", err)
|
||||
}
|
||||
pgSenderChannelIdentityID, err := parseOptionalUUID(input.SenderChannelIdentityID)
|
||||
if err != nil {
|
||||
return Message{}, fmt.Errorf("invalid sender channel identity id: %w", err)
|
||||
}
|
||||
pgSenderUserID, err := parseOptionalUUID(input.SenderUserID)
|
||||
if err != nil {
|
||||
return Message{}, fmt.Errorf("invalid sender user id: %w", err)
|
||||
}
|
||||
|
||||
metaBytes, err := json.Marshal(nonNilMap(input.Metadata))
|
||||
if err != nil {
|
||||
return Message{}, fmt.Errorf("marshal message metadata: %w", err)
|
||||
}
|
||||
|
||||
content := input.Content
|
||||
if len(content) == 0 {
|
||||
content = []byte("{}")
|
||||
}
|
||||
|
||||
row, err := s.queries.CreateMessage(ctx, sqlc.CreateMessageParams{
|
||||
BotID: pgBotID,
|
||||
RouteID: pgRouteID,
|
||||
SenderChannelIdentityID: pgSenderChannelIdentityID,
|
||||
SenderUserID: pgSenderUserID,
|
||||
Platform: toPgText(input.Platform),
|
||||
ExternalMessageID: toPgText(input.ExternalMessageID),
|
||||
SourceReplyToMessageID: toPgText(input.SourceReplyToMessageID),
|
||||
Role: input.Role,
|
||||
Content: content,
|
||||
Metadata: metaBytes,
|
||||
})
|
||||
if err != nil {
|
||||
return Message{}, err
|
||||
}
|
||||
|
||||
result := toMessageFromCreate(row)
|
||||
s.publishMessageCreated(result)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// List returns all messages for a bot.
|
||||
func (s *DBService) List(ctx context.Context, botID string) ([]Message, error) {
|
||||
pgBotID, err := dbpkg.ParseUUID(botID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows, err := s.queries.ListMessages(ctx, pgBotID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return toMessagesFromList(rows), nil
|
||||
}
|
||||
|
||||
// ListSince returns bot messages since a given time.
|
||||
func (s *DBService) ListSince(ctx context.Context, botID string, since time.Time) ([]Message, error) {
|
||||
pgBotID, err := dbpkg.ParseUUID(botID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows, err := s.queries.ListMessagesSince(ctx, sqlc.ListMessagesSinceParams{
|
||||
BotID: pgBotID,
|
||||
CreatedAt: pgtype.Timestamptz{Time: since, Valid: true},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return toMessagesFromSince(rows), nil
|
||||
}
|
||||
|
||||
// ListLatest returns the latest N bot messages (newest first in DB; caller may reverse for ASC).
|
||||
func (s *DBService) ListLatest(ctx context.Context, botID string, limit int32) ([]Message, error) {
|
||||
pgBotID, err := dbpkg.ParseUUID(botID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows, err := s.queries.ListMessagesLatest(ctx, sqlc.ListMessagesLatestParams{
|
||||
BotID: pgBotID,
|
||||
MaxCount: limit,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return toMessagesFromLatest(rows), nil
|
||||
}
|
||||
|
||||
// ListBefore returns up to limit messages older than before (created_at < before), ordered oldest-first.
|
||||
func (s *DBService) ListBefore(ctx context.Context, botID string, before time.Time, limit int32) ([]Message, error) {
|
||||
pgBotID, err := dbpkg.ParseUUID(botID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows, err := s.queries.ListMessagesBefore(ctx, sqlc.ListMessagesBeforeParams{
|
||||
BotID: pgBotID,
|
||||
CreatedAt: pgtype.Timestamptz{Time: before, Valid: true},
|
||||
MaxCount: limit,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return toMessagesFromBefore(rows), nil
|
||||
}
|
||||
|
||||
// DeleteByBot deletes all messages for a bot.
|
||||
func (s *DBService) DeleteByBot(ctx context.Context, botID string) error {
|
||||
pgBotID, err := dbpkg.ParseUUID(botID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.queries.DeleteMessagesByBot(ctx, pgBotID)
|
||||
}
|
||||
|
||||
func toMessageFromCreate(row sqlc.CreateMessageRow) Message {
|
||||
return toMessageFields(
|
||||
row.ID,
|
||||
row.BotID,
|
||||
row.RouteID,
|
||||
row.SenderChannelIdentityID,
|
||||
row.SenderUserID,
|
||||
row.Platform,
|
||||
row.ExternalMessageID,
|
||||
row.SourceReplyToMessageID,
|
||||
row.Role,
|
||||
row.Content,
|
||||
row.Metadata,
|
||||
row.CreatedAt,
|
||||
)
|
||||
}
|
||||
|
||||
func toMessageFromListRow(row sqlc.ListMessagesRow) Message {
|
||||
return toMessageFields(
|
||||
row.ID,
|
||||
row.BotID,
|
||||
row.RouteID,
|
||||
row.SenderChannelIdentityID,
|
||||
row.SenderUserID,
|
||||
row.Platform,
|
||||
row.ExternalMessageID,
|
||||
row.SourceReplyToMessageID,
|
||||
row.Role,
|
||||
row.Content,
|
||||
row.Metadata,
|
||||
row.CreatedAt,
|
||||
)
|
||||
}
|
||||
|
||||
func toMessageFromSinceRow(row sqlc.ListMessagesSinceRow) Message {
|
||||
return toMessageFields(
|
||||
row.ID,
|
||||
row.BotID,
|
||||
row.RouteID,
|
||||
row.SenderChannelIdentityID,
|
||||
row.SenderUserID,
|
||||
row.Platform,
|
||||
row.ExternalMessageID,
|
||||
row.SourceReplyToMessageID,
|
||||
row.Role,
|
||||
row.Content,
|
||||
row.Metadata,
|
||||
row.CreatedAt,
|
||||
)
|
||||
}
|
||||
|
||||
func toMessageFromLatestRow(row sqlc.ListMessagesLatestRow) Message {
|
||||
return toMessageFields(
|
||||
row.ID,
|
||||
row.BotID,
|
||||
row.RouteID,
|
||||
row.SenderChannelIdentityID,
|
||||
row.SenderUserID,
|
||||
row.Platform,
|
||||
row.ExternalMessageID,
|
||||
row.SourceReplyToMessageID,
|
||||
row.Role,
|
||||
row.Content,
|
||||
row.Metadata,
|
||||
row.CreatedAt,
|
||||
)
|
||||
}
|
||||
|
||||
func toMessageFields(
|
||||
id pgtype.UUID,
|
||||
botID pgtype.UUID,
|
||||
routeID pgtype.UUID,
|
||||
senderChannelIdentityID pgtype.UUID,
|
||||
senderUserID pgtype.UUID,
|
||||
platform pgtype.Text,
|
||||
externalMessageID pgtype.Text,
|
||||
sourceReplyToMessageID pgtype.Text,
|
||||
role string,
|
||||
content []byte,
|
||||
metadata []byte,
|
||||
createdAt pgtype.Timestamptz,
|
||||
) Message {
|
||||
return Message{
|
||||
ID: id.String(),
|
||||
BotID: botID.String(),
|
||||
RouteID: routeID.String(),
|
||||
SenderChannelIdentityID: senderChannelIdentityID.String(),
|
||||
SenderUserID: senderUserID.String(),
|
||||
Platform: dbpkg.TextToString(platform),
|
||||
ExternalMessageID: dbpkg.TextToString(externalMessageID),
|
||||
SourceReplyToMessageID: dbpkg.TextToString(sourceReplyToMessageID),
|
||||
Role: role,
|
||||
Content: json.RawMessage(content),
|
||||
Metadata: parseJSONMap(metadata),
|
||||
CreatedAt: createdAt.Time,
|
||||
}
|
||||
}
|
||||
|
||||
func toMessagesFromList(rows []sqlc.ListMessagesRow) []Message {
|
||||
messages := make([]Message, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
messages = append(messages, toMessageFromListRow(row))
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
func toMessagesFromSince(rows []sqlc.ListMessagesSinceRow) []Message {
|
||||
messages := make([]Message, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
messages = append(messages, toMessageFromSinceRow(row))
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
func toMessagesFromLatest(rows []sqlc.ListMessagesLatestRow) []Message {
|
||||
messages := make([]Message, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
messages = append(messages, toMessageFromLatestRow(row))
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
func toMessageFromBeforeRow(row sqlc.ListMessagesBeforeRow) Message {
|
||||
return toMessageFields(
|
||||
row.ID,
|
||||
row.BotID,
|
||||
row.RouteID,
|
||||
row.SenderChannelIdentityID,
|
||||
row.SenderUserID,
|
||||
row.Platform,
|
||||
row.ExternalMessageID,
|
||||
row.SourceReplyToMessageID,
|
||||
row.Role,
|
||||
row.Content,
|
||||
row.Metadata,
|
||||
row.CreatedAt,
|
||||
)
|
||||
}
|
||||
|
||||
// toMessagesFromBefore returns messages in oldest-first order (ListMessagesBefore returns DESC; we reverse).
|
||||
func toMessagesFromBefore(rows []sqlc.ListMessagesBeforeRow) []Message {
|
||||
messages := make([]Message, 0, len(rows))
|
||||
for i := len(rows) - 1; i >= 0; i-- {
|
||||
messages = append(messages, toMessageFromBeforeRow(rows[i]))
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
func parseOptionalUUID(id string) (pgtype.UUID, error) {
|
||||
if strings.TrimSpace(id) == "" {
|
||||
return pgtype.UUID{}, nil
|
||||
}
|
||||
return dbpkg.ParseUUID(id)
|
||||
}
|
||||
|
||||
func toPgText(value string) pgtype.Text {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
return pgtype.Text{}
|
||||
}
|
||||
return pgtype.Text{String: value, Valid: true}
|
||||
}
|
||||
|
||||
func nonNilMap(m map[string]any) map[string]any {
|
||||
if m == nil {
|
||||
return map[string]any{}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func parseJSONMap(data []byte) map[string]any {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
var m map[string]any
|
||||
_ = json.Unmarshal(data, &m)
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *DBService) publishMessageCreated(message Message) {
|
||||
if s.publisher == nil {
|
||||
return
|
||||
}
|
||||
payload, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
if s.logger != nil {
|
||||
s.logger.Warn("marshal message event failed", slog.Any("error", err))
|
||||
}
|
||||
return
|
||||
}
|
||||
s.publisher.Publish(event.Event{
|
||||
Type: event.EventTypeMessageCreated,
|
||||
BotID: strings.TrimSpace(message.BotID),
|
||||
Data: payload,
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package message
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Message represents a single persisted bot message.
|
||||
type Message struct {
|
||||
ID string `json:"id"`
|
||||
BotID string `json:"bot_id"`
|
||||
RouteID string `json:"route_id,omitempty"`
|
||||
SenderChannelIdentityID string `json:"sender_channel_identity_id,omitempty"`
|
||||
SenderUserID string `json:"sender_user_id,omitempty"`
|
||||
Platform string `json:"platform,omitempty"`
|
||||
ExternalMessageID string `json:"external_message_id,omitempty"`
|
||||
SourceReplyToMessageID string `json:"source_reply_to_message_id,omitempty"`
|
||||
Role string `json:"role"`
|
||||
Content json.RawMessage `json:"content"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// PersistInput is the input for persisting a message.
|
||||
type PersistInput struct {
|
||||
BotID string
|
||||
RouteID string
|
||||
SenderChannelIdentityID string
|
||||
SenderUserID string
|
||||
Platform string
|
||||
ExternalMessageID string
|
||||
SourceReplyToMessageID string
|
||||
Role string
|
||||
Content json.RawMessage
|
||||
Metadata map[string]any
|
||||
}
|
||||
|
||||
// Writer defines write behavior needed by the inbound router.
|
||||
type Writer interface {
|
||||
Persist(ctx context.Context, input PersistInput) (Message, error)
|
||||
}
|
||||
|
||||
// Service defines message read/write behavior.
|
||||
type Service interface {
|
||||
Writer
|
||||
List(ctx context.Context, botID string) ([]Message, error)
|
||||
ListSince(ctx context.Context, botID string, since time.Time) ([]Message, error)
|
||||
ListLatest(ctx context.Context, botID string, limit int32) ([]Message, error)
|
||||
ListBefore(ctx context.Context, botID string, before time.Time, limit int32) ([]Message, error)
|
||||
DeleteByBot(ctx context.Context, botID string) error
|
||||
}
|
||||
Reference in New Issue
Block a user