mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
213 lines
6.6 KiB
Go
213 lines
6.6 KiB
Go
package channel
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// LifecycleStore persists channel configs for lifecycle orchestration.
|
|
type LifecycleStore interface {
|
|
ResolveEffectiveConfig(ctx context.Context, botID string, channelType ChannelType) (ChannelConfig, error)
|
|
UpsertConfig(ctx context.Context, botID string, channelType ChannelType, req UpsertConfigRequest) (ChannelConfig, error)
|
|
UpdateConfigDisabled(ctx context.Context, botID string, channelType ChannelType, disabled bool) (ChannelConfig, error)
|
|
DeleteConfig(ctx context.Context, botID string, channelType ChannelType) error
|
|
}
|
|
|
|
// ConnectionController controls runtime channel connections.
|
|
type ConnectionController interface {
|
|
EnsureConnection(ctx context.Context, cfg ChannelConfig) error
|
|
RemoveConnection(ctx context.Context, botID string, channelType ChannelType)
|
|
}
|
|
|
|
// ErrEnableChannelFailed indicates that enabling the channel (e.g. EnsureConnection) failed.
|
|
var ErrEnableChannelFailed = errors.New("enable channel failed")
|
|
|
|
// Lifecycle coordinates persisted config updates and runtime connection state.
|
|
type Lifecycle struct {
|
|
store LifecycleStore
|
|
controller ConnectionController
|
|
}
|
|
|
|
// NewLifecycle creates a lifecycle coordinator from storage and connection controller.
|
|
func NewLifecycle(store LifecycleStore, controller ConnectionController) *Lifecycle {
|
|
return &Lifecycle{
|
|
store: store,
|
|
controller: controller,
|
|
}
|
|
}
|
|
|
|
// UpsertBotChannelConfig updates config and applies connection lifecycle.
|
|
// For disabled=true, it stores config and stops any active connection.
|
|
// For disabled=false, it stores config then starts connection; on start failure it rolls back.
|
|
func (s *Lifecycle) UpsertBotChannelConfig(ctx context.Context, botID string, channelType ChannelType, req UpsertConfigRequest) (ChannelConfig, error) {
|
|
if s.store == nil {
|
|
return ChannelConfig{}, fmt.Errorf("channel lifecycle store not configured")
|
|
}
|
|
disabled := false
|
|
if req.Disabled != nil {
|
|
disabled = *req.Disabled
|
|
}
|
|
if !disabled && s.controller == nil {
|
|
return ChannelConfig{}, fmt.Errorf("channel connection controller not configured")
|
|
}
|
|
|
|
previous, hadPrevious, err := s.getPreviousConfig(ctx, botID, channelType)
|
|
if err != nil {
|
|
return ChannelConfig{}, err
|
|
}
|
|
|
|
updated, err := s.store.UpsertConfig(ctx, botID, channelType, req)
|
|
if err != nil {
|
|
return ChannelConfig{}, err
|
|
}
|
|
|
|
if disabled {
|
|
if s.controller != nil {
|
|
s.controller.RemoveConnection(ctx, botID, channelType)
|
|
}
|
|
return updated, nil
|
|
}
|
|
|
|
if err := s.controller.EnsureConnection(ctx, updated); err != nil {
|
|
if rollbackErr := s.rollbackUpsert(ctx, botID, channelType, hadPrevious, previous); rollbackErr != nil {
|
|
return ChannelConfig{}, fmt.Errorf("%w (rollback failed: %v): %w", ErrEnableChannelFailed, rollbackErr, err)
|
|
}
|
|
return ChannelConfig{}, fmt.Errorf("%w: %w", ErrEnableChannelFailed, err)
|
|
}
|
|
return updated, nil
|
|
}
|
|
|
|
// DeleteBotChannelConfig removes persisted config and stops active runtime connection.
|
|
func (s *Lifecycle) DeleteBotChannelConfig(ctx context.Context, botID string, channelType ChannelType) error {
|
|
if s.store == nil {
|
|
return fmt.Errorf("channel lifecycle store not configured")
|
|
}
|
|
if err := s.store.DeleteConfig(ctx, botID, channelType); err != nil {
|
|
return err
|
|
}
|
|
if s.controller != nil {
|
|
s.controller.RemoveConnection(ctx, botID, channelType)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetBotChannelStatus updates only the disabled status and applies runtime lifecycle.
|
|
func (s *Lifecycle) SetBotChannelStatus(ctx context.Context, botID string, channelType ChannelType, disabled bool) (ChannelConfig, error) {
|
|
if s.store == nil {
|
|
return ChannelConfig{}, fmt.Errorf("channel lifecycle store not configured")
|
|
}
|
|
if s.controller == nil {
|
|
return ChannelConfig{}, fmt.Errorf("channel connection controller not configured")
|
|
}
|
|
|
|
updated, err := s.store.UpdateConfigDisabled(ctx, botID, channelType, disabled)
|
|
if err != nil {
|
|
return ChannelConfig{}, err
|
|
}
|
|
if disabled {
|
|
s.controller.RemoveConnection(ctx, botID, channelType)
|
|
return updated, nil
|
|
}
|
|
|
|
if err := s.controller.EnsureConnection(ctx, updated); err != nil {
|
|
if _, rollbackErr := s.store.UpdateConfigDisabled(ctx, botID, channelType, true); rollbackErr != nil {
|
|
return ChannelConfig{}, fmt.Errorf("%w (status rollback failed: %v): %w", ErrEnableChannelFailed, rollbackErr, err)
|
|
}
|
|
s.controller.RemoveConnection(ctx, botID, channelType)
|
|
return ChannelConfig{}, fmt.Errorf("%w: %w", ErrEnableChannelFailed, err)
|
|
}
|
|
return updated, nil
|
|
}
|
|
|
|
func (s *Lifecycle) getPreviousConfig(ctx context.Context, botID string, channelType ChannelType) (ChannelConfig, bool, error) {
|
|
cfg, err := s.store.ResolveEffectiveConfig(ctx, botID, channelType)
|
|
if err == nil {
|
|
return cfg, true, nil
|
|
}
|
|
if isChannelConfigNotFound(err) {
|
|
return ChannelConfig{}, false, nil
|
|
}
|
|
return ChannelConfig{}, false, err
|
|
}
|
|
|
|
func (s *Lifecycle) rollbackUpsert(ctx context.Context, botID string, channelType ChannelType, hadPrevious bool, previous ChannelConfig) error {
|
|
if !hadPrevious {
|
|
if err := s.store.DeleteConfig(ctx, botID, channelType); err != nil {
|
|
return err
|
|
}
|
|
if s.controller != nil {
|
|
s.controller.RemoveConnection(ctx, botID, channelType)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
restoreReq := upsertRequestFromConfig(previous)
|
|
restored, err := s.store.UpsertConfig(ctx, botID, channelType, restoreReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if s.controller == nil {
|
|
return nil
|
|
}
|
|
if restored.Disabled {
|
|
s.controller.RemoveConnection(ctx, botID, channelType)
|
|
return nil
|
|
}
|
|
return s.controller.EnsureConnection(ctx, restored)
|
|
}
|
|
|
|
func isChannelConfigNotFound(err error) bool {
|
|
return errors.Is(err, ErrChannelConfigNotFound)
|
|
}
|
|
|
|
func upsertRequestFromConfig(cfg ChannelConfig) UpsertConfigRequest {
|
|
disabled := cfg.Disabled
|
|
restored := UpsertConfigRequest{
|
|
Credentials: cloneAnyMap(cfg.Credentials),
|
|
ExternalIdentity: strings.TrimSpace(cfg.ExternalIdentity),
|
|
SelfIdentity: cloneAnyMap(cfg.SelfIdentity),
|
|
Routing: cloneAnyMap(cfg.Routing),
|
|
Disabled: &disabled,
|
|
}
|
|
if !cfg.VerifiedAt.IsZero() {
|
|
verifiedAt := cfg.VerifiedAt.UTC()
|
|
restored.VerifiedAt = &verifiedAt
|
|
}
|
|
return restored
|
|
}
|
|
|
|
func cloneAnyMap(input map[string]any) map[string]any {
|
|
if len(input) == 0 {
|
|
return map[string]any{}
|
|
}
|
|
out := make(map[string]any, len(input))
|
|
for key, value := range input {
|
|
out[key] = cloneAnyValue(value)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func cloneAnyValue(value any) any {
|
|
switch v := value.(type) {
|
|
case map[string]any:
|
|
return cloneAnyMap(v)
|
|
case []any:
|
|
items := make([]any, 0, len(v))
|
|
for _, item := range v {
|
|
items = append(items, cloneAnyValue(item))
|
|
}
|
|
return items
|
|
case []string:
|
|
items := make([]string, len(v))
|
|
copy(items, v)
|
|
return items
|
|
case time.Time:
|
|
return v
|
|
default:
|
|
return v
|
|
}
|
|
}
|