Files
Memoh/internal/channel/manager.go

319 lines
9.9 KiB
Go

package channel
import (
"context"
"errors"
"fmt"
"log/slog"
"sort"
"strings"
"sync"
"time"
)
// ConfigLister lists channel configs for periodic refresh. Used by connection lifecycle.
type ConfigLister interface {
ListConfigsByType(ctx context.Context, channelType ChannelType) ([]ChannelConfig, error)
}
// ConfigResolver resolves effective configs and user bindings. Used for outbound sending.
type ConfigResolver interface {
ResolveEffectiveConfig(ctx context.Context, botID string, channelType ChannelType) (ChannelConfig, error)
GetChannelIdentityConfig(ctx context.Context, channelIdentityID string, channelType ChannelType) (ChannelIdentityBinding, error)
}
// BindingStore resolves channel-identity bindings. Used by identity resolution.
type BindingStore interface {
ResolveChannelIdentityBinding(ctx context.Context, channelType ChannelType, criteria BindingCriteria) (string, error)
}
// ConfigStore is the full persistence interface. Components should depend on smaller
// interfaces above; ConfigStore exists as a convenience for wiring.
type ConfigStore interface {
ConfigLister
ConfigResolver
BindingStore
UpsertChannelIdentityConfig(ctx context.Context, channelIdentityID string, channelType ChannelType, req UpsertChannelIdentityConfigRequest) (ChannelIdentityBinding, error)
}
// Middleware wraps an InboundHandler to add cross-cutting behavior.
type Middleware func(next InboundHandler) InboundHandler
// ManagerStore is the minimal persistence interface required by Manager.
type ManagerStore interface {
ConfigLister
ConfigResolver
}
// ConnectionStatus describes runtime status for one configured channel connection.
type ConnectionStatus struct {
ConfigID string `json:"config_id"`
BotID string `json:"bot_id"`
ChannelType ChannelType `json:"channel_type"`
Running bool `json:"running"`
LastError string `json:"last_error,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
// Manager coordinates channel adapters, connection lifecycle, and message dispatch.
// Connection lifecycle lives in connection.go, inbound dispatch in inbound.go,
// and outbound pipeline in outbound.go.
type Manager struct {
registry *Registry
service ManagerStore
processor InboundProcessor
refreshInterval time.Duration
logger *slog.Logger
middlewares []Middleware
inboundQueue chan inboundTask
inboundWorkers int
inboundOnce sync.Once
inboundCtx context.Context
inboundCancel context.CancelFunc
mu sync.Mutex
refreshMu sync.Mutex
connections map[string]*connectionEntry
connectionMeta map[string]ConnectionStatus
}
// NewManager creates a Manager with the given logger, registry, config store, and inbound processor.
func NewManager(log *slog.Logger, registry *Registry, service ManagerStore, processor InboundProcessor) *Manager {
if log == nil {
log = slog.Default()
}
if registry == nil {
registry = NewRegistry()
}
return &Manager{
registry: registry,
service: service,
processor: processor,
refreshInterval: 5 * time.Minute,
connections: map[string]*connectionEntry{},
connectionMeta: map[string]ConnectionStatus{},
logger: log.With(slog.String("component", "channel")),
middlewares: []Middleware{},
inboundQueue: make(chan inboundTask, 256),
inboundWorkers: 4,
}
}
// Registry returns the adapter registry used by this manager.
func (m *Manager) Registry() *Registry {
return m.registry
}
// Use appends middleware to the inbound processing chain.
func (m *Manager) Use(mw ...Middleware) {
m.middlewares = append(m.middlewares, mw...)
}
// RegisterAdapter adds an adapter to the registry and logs the registration.
func (m *Manager) RegisterAdapter(adapter Adapter) {
if adapter == nil {
return
}
if err := m.registry.Register(adapter); err != nil {
if m.logger != nil {
m.logger.Warn("adapter registration failed", slog.String("channel", adapter.Type().String()), slog.Any("error", err))
}
return
}
if m.logger != nil {
m.logger.Info("adapter registered", slog.String("channel", adapter.Type().String()))
}
}
// AddAdapter registers an adapter and triggers an immediate refresh for hot-plug support.
func (m *Manager) AddAdapter(ctx context.Context, adapter Adapter) {
m.RegisterAdapter(adapter)
if ctx != nil {
m.refresh(ctx)
}
}
// RemoveAdapter unregisters an adapter and stops all its active connections.
func (m *Manager) RemoveAdapter(ctx context.Context, channelType ChannelType) {
m.mu.Lock()
for id, entry := range m.connections {
if entry != nil && entry.config.ChannelType == channelType {
if entry.connection != nil {
if err := entry.connection.Stop(ctx); err != nil && !errors.Is(err, ErrStopNotSupported) && m.logger != nil {
m.logger.Warn("adapter stop failed", slog.String("config_id", id), slog.Any("error", err))
}
}
delete(m.connections, id)
}
}
m.mu.Unlock()
m.registry.Unregister(channelType)
}
// Refresh performs a full reconcile of all adapter connections against the DB.
// Prefer EnsureConnection / RemoveConnection for targeted changes after API operations.
// Refresh is mainly used at startup and as a periodic safety net.
func (m *Manager) Refresh(ctx context.Context) {
if ctx != nil {
m.refresh(ctx)
}
}
// Start begins the periodic config refresh loop and inbound worker pool.
func (m *Manager) Start(ctx context.Context) {
if m.logger != nil {
m.logger.Info("manager start")
}
m.startInboundWorkers(ctx)
go func() {
m.refresh(ctx)
ticker := time.NewTicker(m.refreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
if m.logger != nil {
m.logger.Info("manager stop")
}
m.stopAll(ctx)
return
case <-ticker.C:
m.refresh(ctx)
}
}
}()
}
// Send delivers an outbound message to the specified channel, resolving target and config automatically.
func (m *Manager) Send(ctx context.Context, botID string, channelType ChannelType, req SendRequest) error {
if m.service == nil {
return errors.New("channel manager not configured")
}
sender, ok := m.registry.GetSender(channelType)
if !ok {
return fmt.Errorf("unsupported channel type: %s", channelType)
}
config, err := m.service.ResolveEffectiveConfig(ctx, botID, channelType)
if err != nil {
return err
}
target := strings.TrimSpace(req.Target)
if target == "" {
targetChannelIdentityID := strings.TrimSpace(req.ChannelIdentityID)
if targetChannelIdentityID == "" {
return errors.New("target or channel_identity_id is required")
}
userCfg, err := m.service.GetChannelIdentityConfig(ctx, targetChannelIdentityID, channelType)
if err != nil {
if m.logger != nil {
m.logger.Warn("channel binding missing", slog.String("channel", channelType.String()), slog.String("channel_identity_id", targetChannelIdentityID))
}
return errors.New("channel binding required")
}
target, err = m.registry.ResolveTargetFromUserConfig(channelType, userCfg.Config)
if err != nil {
return err
}
}
if normalized, ok := m.registry.NormalizeTarget(channelType, target); ok {
target = normalized
}
if req.Message.IsEmpty() {
return errors.New("message is required")
}
if m.logger != nil {
m.logger.Info("send outbound", slog.String("channel", channelType.String()), slog.String("bot_id", botID))
}
policy := m.resolveOutboundPolicy(channelType)
outbound, err := buildOutboundMessages(OutboundMessage{
Target: target,
Message: req.Message,
}, policy)
if err != nil {
return err
}
for _, item := range outbound {
if err := m.sendWithConfig(ctx, sender, config, item, policy); err != nil {
if m.logger != nil {
m.logger.Error("send outbound failed", slog.String("channel", channelType.String()), slog.String("bot_id", botID), slog.Any("error", err))
}
return err
}
}
return nil
}
// React adds or removes an emoji reaction on a channel message.
func (m *Manager) React(ctx context.Context, botID string, channelType ChannelType, req ReactRequest) error {
if m.service == nil {
return errors.New("channel manager not configured")
}
reactor, ok := m.registry.GetReactor(channelType)
if !ok {
return fmt.Errorf("channel %s does not support reactions", channelType)
}
config, err := m.service.ResolveEffectiveConfig(ctx, botID, channelType)
if err != nil {
return err
}
target := strings.TrimSpace(req.Target)
if target == "" {
return errors.New("target is required for reactions")
}
if normalized, ok := m.registry.NormalizeTarget(channelType, target); ok {
target = normalized
}
messageID := strings.TrimSpace(req.MessageID)
if messageID == "" {
return errors.New("message_id is required for reactions")
}
emoji := strings.TrimSpace(req.Emoji)
if !req.Remove && emoji == "" {
return errors.New("emoji is required when adding a reaction")
}
if m.logger != nil {
m.logger.Info("react outbound",
slog.String("channel", channelType.String()),
slog.String("bot_id", botID),
slog.String("message_id", messageID),
slog.Bool("remove", req.Remove),
)
}
if req.Remove {
return reactor.Unreact(ctx, config, target, messageID, emoji)
}
return reactor.React(ctx, config, target, messageID, emoji)
}
// Shutdown cancels the inbound worker pool and stops all active connections.
func (m *Manager) Shutdown(ctx context.Context) error {
if m.inboundCancel != nil {
m.inboundCancel()
}
m.stopAll(ctx)
return nil
}
// ConnectionStatusesByBot returns observed channel connection statuses for a bot.
func (m *Manager) ConnectionStatusesByBot(botID string) []ConnectionStatus {
botID = strings.TrimSpace(botID)
if botID == "" {
return []ConnectionStatus{}
}
m.mu.Lock()
defer m.mu.Unlock()
items := make([]ConnectionStatus, 0, len(m.connectionMeta))
for _, status := range m.connectionMeta {
if status.BotID == botID {
items = append(items, status)
}
}
sort.Slice(items, func(i, j int) bool {
if items[i].ChannelType == items[j].ChannelType {
return items[i].ConfigID < items[j].ConfigID
}
return items[i].ChannelType < items[j].ChannelType
})
return items
}