Files
Memoh/internal/channel/inbound.go
T
BBQ 29e6ddd1f9 refactor: replace global channel registry with instance-based Registry and interface-driven adapters
- Replace global channelRegistry singleton with explicit *Registry passed via dependency injection
- Split monolithic manager.go into connection.go (lifecycle), inbound.go (dispatch), outbound.go (pipeline)
- Introduce optional adapter interfaces: ConfigNormalizer, TargetResolver, BindingMatcher
- Move Descriptor() to Adapter interface, remove init()-based registration
- Relocate SessionHub to adapters/local package
- Extract shared UUID/time helpers to internal/db/uuid.go
- Decompose ConfigStore into fine-grained interfaces: ConfigLister, ConfigResolver, BindingStore, SessionStore
2026-02-06 23:47:12 +08:00

81 lines
1.9 KiB
Go

package channel
import (
"context"
"fmt"
"log/slog"
)
type inboundTask struct {
ctx context.Context
cfg ChannelConfig
msg InboundMessage
}
// HandleInbound enqueues an inbound message for asynchronous processing by the worker pool.
func (m *Manager) HandleInbound(ctx context.Context, cfg ChannelConfig, msg InboundMessage) error {
if m.processor == nil {
return fmt.Errorf("inbound processor not configured")
}
if ctx == nil {
ctx = context.Background()
}
m.startInboundWorkers(ctx)
if m.inboundCtx != nil && m.inboundCtx.Err() != nil {
return fmt.Errorf("inbound dispatcher stopped")
}
task := inboundTask{
ctx: context.WithoutCancel(ctx),
cfg: cfg,
msg: msg,
}
select {
case m.inboundQueue <- task:
return nil
default:
return fmt.Errorf("inbound queue full")
}
}
func (m *Manager) handleInbound(ctx context.Context, cfg ChannelConfig, msg InboundMessage) error {
if m.processor == nil {
return fmt.Errorf("inbound processor not configured")
}
sender := m.newReplySender(cfg, msg.Channel)
if err := m.processor.HandleInbound(ctx, cfg, msg, sender); err != nil {
if m.logger != nil {
m.logger.Error("inbound processing failed", slog.String("channel", msg.Channel.String()), slog.Any("error", err))
}
return err
}
return nil
}
func (m *Manager) startInboundWorkers(ctx context.Context) {
m.inboundOnce.Do(func() {
workerCtx := ctx
if workerCtx == nil {
workerCtx = context.Background()
}
m.inboundCtx, m.inboundCancel = context.WithCancel(workerCtx)
for i := 0; i < m.inboundWorkers; i++ {
go m.runInboundWorker(m.inboundCtx)
}
})
}
func (m *Manager) runInboundWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case task := <-m.inboundQueue:
if err := m.handleInbound(task.ctx, task.cfg, task.msg); err != nil {
if m.logger != nil {
m.logger.Error("inbound processing failed", slog.String("channel", task.msg.Channel.String()), slog.Any("error", err))
}
}
}
}
}