mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
29e6ddd1f9
- Replace global channelRegistry singleton with explicit *Registry passed via dependency injection - Split monolithic manager.go into connection.go (lifecycle), inbound.go (dispatch), outbound.go (pipeline) - Introduce optional adapter interfaces: ConfigNormalizer, TargetResolver, BindingMatcher - Move Descriptor() to Adapter interface, remove init()-based registration - Relocate SessionHub to adapters/local package - Extract shared UUID/time helpers to internal/db/uuid.go - Decompose ConfigStore into fine-grained interfaces: ConfigLister, ConfigResolver, BindingStore, SessionStore
81 lines
1.9 KiB
Go
81 lines
1.9 KiB
Go
package channel
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
)
|
|
|
|
type inboundTask struct {
|
|
ctx context.Context
|
|
cfg ChannelConfig
|
|
msg InboundMessage
|
|
}
|
|
|
|
// HandleInbound enqueues an inbound message for asynchronous processing by the worker pool.
|
|
func (m *Manager) HandleInbound(ctx context.Context, cfg ChannelConfig, msg InboundMessage) error {
|
|
if m.processor == nil {
|
|
return fmt.Errorf("inbound processor not configured")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
m.startInboundWorkers(ctx)
|
|
if m.inboundCtx != nil && m.inboundCtx.Err() != nil {
|
|
return fmt.Errorf("inbound dispatcher stopped")
|
|
}
|
|
task := inboundTask{
|
|
ctx: context.WithoutCancel(ctx),
|
|
cfg: cfg,
|
|
msg: msg,
|
|
}
|
|
select {
|
|
case m.inboundQueue <- task:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("inbound queue full")
|
|
}
|
|
}
|
|
|
|
func (m *Manager) handleInbound(ctx context.Context, cfg ChannelConfig, msg InboundMessage) error {
|
|
if m.processor == nil {
|
|
return fmt.Errorf("inbound processor not configured")
|
|
}
|
|
sender := m.newReplySender(cfg, msg.Channel)
|
|
if err := m.processor.HandleInbound(ctx, cfg, msg, sender); err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Error("inbound processing failed", slog.String("channel", msg.Channel.String()), slog.Any("error", err))
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) startInboundWorkers(ctx context.Context) {
|
|
m.inboundOnce.Do(func() {
|
|
workerCtx := ctx
|
|
if workerCtx == nil {
|
|
workerCtx = context.Background()
|
|
}
|
|
m.inboundCtx, m.inboundCancel = context.WithCancel(workerCtx)
|
|
for i := 0; i < m.inboundWorkers; i++ {
|
|
go m.runInboundWorker(m.inboundCtx)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (m *Manager) runInboundWorker(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case task := <-m.inboundQueue:
|
|
if err := m.handleInbound(task.ctx, task.cfg, task.msg); err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Error("inbound processing failed", slog.String("channel", task.msg.Channel.String()), slog.Any("error", err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|