Files
Memoh/internal/channel/manager.go
T
Ran 6acdd191c7 Squashed commit of the following:
commit bcdb026ae43e4f95d0b2c4f9bd440a2df9d6b514
Author: Ran <16112591+chen-ran@users.noreply.github.com>
Date:   Thu Feb 12 17:10:32 2026 +0800

    chore: update DEVELOPMENT.md

commit 30281742ef
Merge: ca5c6a1 5b05f13
Author: BBQ <bbq@BBQdeMacBook-Air.local>
Date:   Thu Feb 12 15:49:17 2026 +0800

    merge(github/main): integrate fx dependency injection framework

    Merge upstream fx refactor and adapt all services to use go.uber.org/fx
    for dependency injection. Resolve conflicts in main.go, server.go,
    and service constructors while preserving our domain model changes.

    - Fix telegram adapter panic on shutdown (double close channel)
    - Fix feishu adapter processing messages after stop
    - Increase directory lookup timeout from 2s to 5s

commit ca5c6a1866
Author: BBQ <bbq@BBQdeMacBook-Air.local>
Date:   Thu Feb 12 15:33:09 2026 +0800

    refactor(core): restructure conversation, channel and message domains

    - Rename chat module to conversation with flow-based architecture
    - Move channelidentities into channel/identities subpackage
    - Add channel/route for routing logic
    - Add message service with event hub
    - Add MCP providers: container, directory, schedule
    - Refactor Feishu/Telegram adapters with directory and stream support
    - Add platform management page and channel badges in web UI
    - Update database schema for conversations, messages and channel routes
    - Add @memoh/shared package for cross-package type definitions

commit 75e2ef0467
Merge: d99ba38 01cb6c8
Author: BBQ <bbq@BBQdeMacBook-Air.local>
Date:   Thu Feb 12 14:45:49 2026 +0800

    merge(github): merge github/main, resolve index.ts URL conflict

    Keep our defensive absolute-URL check in createAuthFetcher.

commit d99ba38b7d
Merge: 860e20f 35ce7d1
Author: BBQ <bbq@BBQdeMacBook-Air.local>
Date:   Thu Feb 12 05:20:18 2026 +0800

    merge(github): merge github/main, keep our code and docs/spec

commit 860e20fe70
Author: BBQ <bbq@BBQdeMacBook-Air.local>
Date:   Wed Feb 11 22:13:27 2026 +0800

    docs(docs): add concepts and style guides for VitePress site

    - Add concepts: identity-and-binding, index (en/zh)
    - Add style: terminology (en/zh)
    - Update index and zh/index
    - Update .vitepress/config.ts

commit a75fdb8040
Author: BBQ <bbq@BBQdeMacBook-Air.local>
Date:   Wed Feb 11 17:37:16 2026 +0800

    refactor(mcp): standardize unified tool gateway on go-sdk

    Split business executors from federation sources and migrate unified tool/federation transports to the official go-sdk for stricter MCP compliance and safer session lifecycle handling. Add targeted regression tests for accept compatibility, initialization retries, pending cleanup, and include updated swagger artifacts.

commit 02b33c8e85
Author: BBQ <bbq@BBQdeMacBook-Air.local>
Date:   Wed Feb 11 15:42:21 2026 +0800

    refactor(core): finalize user-centric identity and policy cleanup

    Unify auth and chat identity semantics around user_id, enforce personal-bot owner-only authorization, and remove legacy compatibility branches in integration tests.

commit 06e8619a37
Author: BBQ <bbq@BBQdeMacBook-Air.local>
Date:   Wed Feb 11 14:47:03 2026 +0800

    refactor(core): migrate channel identity and binding across app

    Align channel identity and bind flow across backend and app-facing layers, including generated swagger artifacts and package lock updates while excluding docs content changes.
2026-02-12 17:13:03 +08:00

235 lines
7.2 KiB
Go

package channel
import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"time"
)
// ConfigLister lists channel configs for periodic refresh. Used by connection lifecycle.
type ConfigLister interface {
ListConfigsByType(ctx context.Context, channelType ChannelType) ([]ChannelConfig, error)
}
// ConfigResolver resolves effective configs and user bindings. Used for outbound sending.
type ConfigResolver interface {
ResolveEffectiveConfig(ctx context.Context, botID string, channelType ChannelType) (ChannelConfig, error)
GetChannelIdentityConfig(ctx context.Context, channelIdentityID string, channelType ChannelType) (ChannelIdentityBinding, error)
}
// BindingStore resolves channel-identity bindings. Used by identity resolution.
type BindingStore interface {
ResolveChannelIdentityBinding(ctx context.Context, channelType ChannelType, criteria BindingCriteria) (string, error)
}
// ConfigStore is the full persistence interface. Components should depend on smaller
// interfaces above; ConfigStore exists as a convenience for wiring.
type ConfigStore interface {
ConfigLister
ConfigResolver
BindingStore
UpsertChannelIdentityConfig(ctx context.Context, channelIdentityID string, channelType ChannelType, req UpsertChannelIdentityConfigRequest) (ChannelIdentityBinding, error)
}
// Middleware wraps an InboundHandler to add cross-cutting behavior.
type Middleware func(next InboundHandler) InboundHandler
// ManagerStore is the minimal persistence interface required by Manager.
type ManagerStore interface {
ConfigLister
ConfigResolver
}
// Manager coordinates channel adapters, connection lifecycle, and message dispatch.
// Connection lifecycle lives in connection.go, inbound dispatch in inbound.go,
// and outbound pipeline in outbound.go.
type Manager struct {
registry *Registry
service ManagerStore
processor InboundProcessor
refreshInterval time.Duration
logger *slog.Logger
middlewares []Middleware
inboundQueue chan inboundTask
inboundWorkers int
inboundOnce sync.Once
inboundCtx context.Context
inboundCancel context.CancelFunc
mu sync.Mutex
refreshMu sync.Mutex
connections map[string]*connectionEntry
}
// NewManager creates a Manager with the given logger, registry, config store, and inbound processor.
func NewManager(log *slog.Logger, registry *Registry, service ManagerStore, processor InboundProcessor) *Manager {
if log == nil {
log = slog.Default()
}
if registry == nil {
registry = NewRegistry()
}
return &Manager{
registry: registry,
service: service,
processor: processor,
refreshInterval: 30 * time.Second,
connections: map[string]*connectionEntry{},
logger: log.With(slog.String("component", "channel")),
middlewares: []Middleware{},
inboundQueue: make(chan inboundTask, 256),
inboundWorkers: 4,
}
}
// Registry returns the adapter registry used by this manager.
func (m *Manager) Registry() *Registry {
return m.registry
}
// Use appends middleware to the inbound processing chain.
func (m *Manager) Use(mw ...Middleware) {
m.middlewares = append(m.middlewares, mw...)
}
// RegisterAdapter adds an adapter to the registry and logs the registration.
func (m *Manager) RegisterAdapter(adapter Adapter) {
if adapter == nil {
return
}
if err := m.registry.Register(adapter); err != nil {
if m.logger != nil {
m.logger.Warn("adapter registration failed", slog.String("channel", adapter.Type().String()), slog.Any("error", err))
}
return
}
if m.logger != nil {
m.logger.Info("adapter registered", slog.String("channel", adapter.Type().String()))
}
}
// AddAdapter registers an adapter and triggers an immediate refresh for hot-plug support.
func (m *Manager) AddAdapter(ctx context.Context, adapter Adapter) {
m.RegisterAdapter(adapter)
if ctx != nil {
m.refresh(ctx)
}
}
// RemoveAdapter unregisters an adapter and stops all its active connections.
func (m *Manager) RemoveAdapter(ctx context.Context, channelType ChannelType) {
if ctx == nil {
ctx = context.Background()
}
m.mu.Lock()
for id, entry := range m.connections {
if entry != nil && entry.config.ChannelType == channelType {
if entry.connection != nil {
if err := entry.connection.Stop(ctx); err != nil && !errors.Is(err, ErrStopNotSupported) && m.logger != nil {
m.logger.Warn("adapter stop failed", slog.String("config_id", id), slog.Any("error", err))
}
}
delete(m.connections, id)
}
}
m.mu.Unlock()
m.registry.Unregister(channelType)
}
// Start begins the periodic config refresh loop and inbound worker pool.
func (m *Manager) Start(ctx context.Context) {
if m.logger != nil {
m.logger.Info("manager start")
}
m.startInboundWorkers(ctx)
go func() {
m.refresh(ctx)
ticker := time.NewTicker(m.refreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
if m.logger != nil {
m.logger.Info("manager stop")
}
m.stopAll(ctx)
return
case <-ticker.C:
m.refresh(ctx)
}
}
}()
}
// Send delivers an outbound message to the specified channel, resolving target and config automatically.
func (m *Manager) Send(ctx context.Context, botID string, channelType ChannelType, req SendRequest) error {
if m.service == nil {
return fmt.Errorf("channel manager not configured")
}
sender, ok := m.registry.GetSender(channelType)
if !ok {
return fmt.Errorf("unsupported channel type: %s", channelType)
}
config, err := m.service.ResolveEffectiveConfig(ctx, botID, channelType)
if err != nil {
return err
}
target := strings.TrimSpace(req.Target)
if target == "" {
targetChannelIdentityID := strings.TrimSpace(req.ChannelIdentityID)
if targetChannelIdentityID == "" {
return fmt.Errorf("target or user_id is required")
}
userCfg, err := m.service.GetChannelIdentityConfig(ctx, targetChannelIdentityID, channelType)
if err != nil {
if m.logger != nil {
m.logger.Warn("channel binding missing", slog.String("channel", channelType.String()), slog.String("channel_identity_id", targetChannelIdentityID))
}
return fmt.Errorf("channel binding required")
}
target, err = m.registry.ResolveTargetFromUserConfig(channelType, userCfg.Config)
if err != nil {
return err
}
}
if normalized, ok := m.registry.NormalizeTarget(channelType, target); ok {
target = normalized
}
if req.Message.IsEmpty() {
return fmt.Errorf("message is required")
}
if m.logger != nil {
m.logger.Info("send outbound", slog.String("channel", channelType.String()), slog.String("bot_id", botID))
}
policy := m.resolveOutboundPolicy(channelType)
outbound, err := buildOutboundMessages(OutboundMessage{
Target: target,
Message: req.Message,
}, policy)
if err != nil {
return err
}
for _, item := range outbound {
if err := m.sendWithConfig(ctx, sender, config, item, policy); err != nil {
if m.logger != nil {
m.logger.Error("send outbound failed", slog.String("channel", channelType.String()), slog.String("bot_id", botID), slog.Any("error", err))
}
return err
}
}
return nil
}
// Shutdown cancels the inbound worker pool and stops all active connections.
func (m *Manager) Shutdown(ctx context.Context) error {
if m.inboundCancel != nil {
m.inboundCancel()
}
m.stopAll(ctx)
return nil
}