mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-27 07:16:19 +09:00
cc5f00355f
* feat: add email service with multi-adapter support Implement a full-stack email service with global provider management, per-bot bindings with granular read/write permissions, outbox audit storage, and MCP tool integration for direct mailbox access. Backend: - Email providers: CRUD with dynamic config schema (generic SMTP/IMAP, Mailgun) - Generic adapter: go-mail (SMTP) + go-imap/v2 (IMAP IDLE real-time push via UnilateralDataHandler + UID-based tracking + periodic check fallback) - Mailgun adapter: mailgun-go/v5 with dual inbound mode (webhook + poll) - Bot email bindings: per-bot provider binding with independent r/w permissions - Outbox: outbound email audit log with status tracking - Trigger: inbound emails push notification to bot_inbox (from/subject only, LLM reads full content on demand via MCP tools) - MailboxReader interface: on-demand IMAP queries for listing/reading emails - MCP tools: email_accounts, email_send, email_list (paginated mailbox), email_read (by UID) — all with multi-binding and provider_id selection - Webhook: /email/mailgun/webhook/:config_id (JWT-skipped, signature-verified) - DB migration: 0019_add_email (email_providers, bot_email_bindings, email_outbox) Frontend: - Email Providers page: /email-providers with MasterDetailSidebarLayout - Dynamic config form rendered from ordered provider meta schema with i18n keys - Bot detail: Email tab with bindings management + outbox audit table - Sidebar navigation entry - Full i18n support (en + zh) - Auto-generated SDK from Swagger Closes #17 * feat(email): trigger bot conversation immediately on inbound email Instead of only storing an inbox item and waiting for the next chat, the email trigger now proactively invokes the conversation resolver so the bot processes new emails right away — aligned with the schedule/heartbeat trigger pattern. * fix: lint --------- Co-authored-by: Acbox <acbox0328@gmail.com>
180 lines
4.5 KiB
Go
180 lines
4.5 KiB
Go
package email
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
)
|
|
|
|
// Manager manages the lifecycle of all email receiving connections.
|
|
type Manager struct {
|
|
logger *slog.Logger
|
|
service *Service
|
|
trigger *Trigger
|
|
outbox *OutboxService
|
|
|
|
mu sync.Mutex
|
|
conns map[string]Stopper // provider_id -> stopper
|
|
stopped bool
|
|
}
|
|
|
|
func NewManager(log *slog.Logger, service *Service, trigger *Trigger, outbox *OutboxService) *Manager {
|
|
return &Manager{
|
|
logger: log.With(slog.String("component", "email_manager")),
|
|
service: service,
|
|
trigger: trigger,
|
|
outbox: outbox,
|
|
conns: make(map[string]Stopper),
|
|
}
|
|
}
|
|
|
|
// Start initializes receiving for all providers that have readable bindings.
|
|
func (m *Manager) Start(ctx context.Context) error {
|
|
providers, err := m.service.ListProviders(ctx, "")
|
|
if err != nil {
|
|
return fmt.Errorf("list email providers: %w", err)
|
|
}
|
|
|
|
for _, p := range providers {
|
|
bindings, err := m.service.ListReadableBindingsByProvider(ctx, p.ID)
|
|
if err != nil {
|
|
m.logger.Error("failed to list bindings", slog.String("provider", p.ID), slog.Any("error", err))
|
|
continue
|
|
}
|
|
if len(bindings) == 0 {
|
|
continue
|
|
}
|
|
if err := m.startProvider(ctx, p); err != nil {
|
|
m.logger.Error("failed to start provider", slog.String("provider", p.ID), slog.Any("error", err))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) startProvider(ctx context.Context, p ProviderResponse) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if m.stopped {
|
|
return fmt.Errorf("manager is stopped")
|
|
}
|
|
if _, exists := m.conns[p.ID]; exists {
|
|
return nil
|
|
}
|
|
|
|
receiver, err := m.service.registry.GetReceiver(ProviderName(p.Provider))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
config := p.Config
|
|
if config == nil {
|
|
config = make(map[string]any)
|
|
}
|
|
config["_provider_id"] = p.ID
|
|
|
|
stopper, err := receiver.StartReceiving(ctx, config, m.trigger.HandleInbound)
|
|
if err != nil {
|
|
return fmt.Errorf("start receiving for %s: %w", p.ID, err)
|
|
}
|
|
|
|
m.conns[p.ID] = stopper
|
|
m.logger.Info("started email receiving", slog.String("provider_id", p.ID), slog.String("type", p.Provider))
|
|
return nil
|
|
}
|
|
|
|
// RefreshProvider restarts receiving for a specific provider.
|
|
func (m *Manager) RefreshProvider(ctx context.Context, providerID string) error {
|
|
m.stopProvider(providerID)
|
|
|
|
p, err := m.service.GetProvider(ctx, providerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bindings, err := m.service.ListReadableBindingsByProvider(ctx, providerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(bindings) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return m.startProvider(ctx, p)
|
|
}
|
|
|
|
func (m *Manager) stopProvider(providerID string) {
|
|
m.mu.Lock()
|
|
stopper, exists := m.conns[providerID]
|
|
if exists {
|
|
delete(m.conns, providerID)
|
|
}
|
|
m.mu.Unlock()
|
|
|
|
if exists && stopper != nil {
|
|
if err := stopper.Stop(context.Background()); err != nil {
|
|
m.logger.Error("failed to stop provider", slog.String("provider_id", providerID), slog.Any("error", err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop gracefully shuts down all receiving connections.
|
|
func (m *Manager) Stop() {
|
|
m.mu.Lock()
|
|
m.stopped = true
|
|
conns := make(map[string]Stopper, len(m.conns))
|
|
for k, v := range m.conns {
|
|
conns[k] = v
|
|
}
|
|
m.conns = make(map[string]Stopper)
|
|
m.mu.Unlock()
|
|
|
|
for id, stopper := range conns {
|
|
if err := stopper.Stop(context.Background()); err != nil {
|
|
m.logger.Error("failed to stop provider", slog.String("provider_id", id), slog.Any("error", err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// SendEmail sends an email through the specified provider, recording to outbox.
|
|
// If providerID is empty, it falls back to the first writable binding for the bot.
|
|
func (m *Manager) SendEmail(ctx context.Context, botID string, providerID string, msg OutboundEmail) (string, error) {
|
|
if providerID == "" {
|
|
binding, err := m.service.GetBotBinding(ctx, botID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if !binding.CanWrite {
|
|
return "", fmt.Errorf("email write permission denied for bot %s", botID)
|
|
}
|
|
providerID = binding.EmailProviderID
|
|
}
|
|
|
|
providerName, config, err := m.service.ProviderConfig(ctx, providerID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
sender, err := m.service.registry.GetSender(providerName)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
fromAddr, _ := config["username"].(string)
|
|
|
|
outboxID, err := m.outbox.Create(ctx, providerID, botID, msg, fromAddr)
|
|
if err != nil {
|
|
return "", fmt.Errorf("record outbox: %w", err)
|
|
}
|
|
|
|
messageID, err := sender.Send(ctx, config, msg)
|
|
if err != nil {
|
|
_ = m.outbox.MarkFailed(ctx, outboxID, err.Error())
|
|
return "", fmt.Errorf("send email: %w", err)
|
|
}
|
|
|
|
_ = m.outbox.MarkSent(ctx, outboxID, messageID)
|
|
return messageID, nil
|
|
}
|