diff --git a/apps/web/src/i18n/locales/en.json b/apps/web/src/i18n/locales/en.json index 131e92d8..e4fc0362 100644 --- a/apps/web/src/i18n/locales/en.json +++ b/apps/web/src/i18n/locales/en.json @@ -1074,6 +1074,7 @@ "wechatoa": "WeChat Official Account", "wecom": "WeCom", "dingtalk": "DingTalk", + "slack": "Slack", "web": "Web", "cli": "CLI", "local": "Local" @@ -1089,6 +1090,7 @@ "wechatoa": "OA", "wecom": "WC", "dingtalk": "DT", + "slack": "SK", "web": "Web", "cli": "CLI", "local": "LC" diff --git a/apps/web/src/i18n/locales/zh.json b/apps/web/src/i18n/locales/zh.json index 055bbd2e..318ae3f5 100644 --- a/apps/web/src/i18n/locales/zh.json +++ b/apps/web/src/i18n/locales/zh.json @@ -1070,6 +1070,7 @@ "wechatoa": "微信服务号", "wecom": "企业微信", "dingtalk": "钉钉", + "slack": "Slack", "web": "Web", "cli": "本地 CLI", "local": "本地" @@ -1085,6 +1086,7 @@ "wechatoa": "OA", "wecom": "企微", "dingtalk": "钉", + "slack": "SK", "web": "Web", "cli": "CLI", "local": "本地" diff --git a/apps/web/src/pages/profile/index.vue b/apps/web/src/pages/profile/index.vue index 81f41640..f93438ff 100644 --- a/apps/web/src/pages/profile/index.vue +++ b/apps/web/src/pages/profile/index.vue @@ -292,7 +292,7 @@ function platformLabel(platformKey: string): string { } const platformOptions = computed(() => { - const options = new Set(['telegram', 'feishu', 'discord', 'qq', 'matrix']) + const options = new Set(['telegram', 'feishu', 'discord', 'qq', 'matrix', 'slack']) for (const identity of identities.value) { const platform = identity.channel.trim() if (platform) { diff --git a/cmd/agent/app.go b/cmd/agent/app.go index 315263df..3c77a831 100644 --- a/cmd/agent/app.go +++ b/cmd/agent/app.go @@ -35,6 +35,7 @@ import ( "github.com/memohai/memoh/internal/channel/adapters/matrix" "github.com/memohai/memoh/internal/channel/adapters/misskey" "github.com/memohai/memoh/internal/channel/adapters/qq" + slackadapter "github.com/memohai/memoh/internal/channel/adapters/slack" "github.com/memohai/memoh/internal/channel/adapters/telegram" "github.com/memohai/memoh/internal/channel/adapters/wechatoa" "github.com/memohai/memoh/internal/channel/adapters/wecom" @@ -297,6 +298,11 @@ func provideChannelRegistry(log *slog.Logger, hub *local.RouteHub, mediaService feishuAdapter := feishu.NewFeishuAdapter(log) feishuAdapter.SetAssetOpener(mediaService) registry.MustRegister(feishuAdapter) + + slackAdapter := slackadapter.NewSlackAdapter(log) + slackAdapter.SetAssetOpener(mediaService) + registry.MustRegister(slackAdapter) + registry.MustRegister(wecom.NewWeComAdapter(log)) dingTalkAdapter := dingtalk.NewDingTalkAdapter(log) diff --git a/docs/docs/.vitepress/en.ts b/docs/docs/.vitepress/en.ts index 47349ca0..aee19f60 100644 --- a/docs/docs/.vitepress/en.ts +++ b/docs/docs/.vitepress/en.ts @@ -175,6 +175,10 @@ export const en = [ text: 'WeChat Official Account', link: '/channels/wechatoa.md' }, + { + text: 'Slack', + link: '/channels/slack.md' + } ] }, ] diff --git a/docs/docs/channels/index.md b/docs/docs/channels/index.md index 84721b82..1315df3d 100644 --- a/docs/docs/channels/index.md +++ b/docs/docs/channels/index.md @@ -4,6 +4,7 @@ Channels are the gateways that connect your Memoh Bots to the outside world. By Memoh currently supports the following channels: +- **[Slack](./slack)**: Workspace messaging with Socket Mode, threads, files, and reactions. - **[Telegram](./telegram)**: Feature-rich integration with streaming and attachment support. - **[Feishu (Lark)](./feishu)**: Enterprise-ready integration for business workflows. - **[Discord](./discord)**: Community-focused integration for servers and direct messages. diff --git a/docs/docs/channels/slack.md b/docs/docs/channels/slack.md new file mode 100644 index 00000000..2d71cd35 --- /dev/null +++ b/docs/docs/channels/slack.md @@ -0,0 +1,84 @@ +# Slack Channel Configuration + +Connecting your Memoh Bot to Slack allows it to receive direct messages, participate in channels and threads, read attachments, send files, and use streaming replies. + +## Step 1: Create a Slack App + +1. Go to the Slack API dashboard and create a new app. +2. Choose the workspace where you want to install the bot. +3. Open **Basic Information** and keep this app page open for the next steps. + +## Step 2: Enable Socket Mode + +Memoh's Slack adapter uses Socket Mode, so you need an app-level token in addition to the bot token. + +1. In **Basic Information**, enable **Socket Mode**. +2. Create an **App-Level Token** with the `connections:write` scope. +3. Copy the generated token. It starts with `xapp-`. + +## Step 3: Configure Bot Token Scopes + +In **OAuth & Permissions**, add the bot token scopes required by the current Slack adapter: + +- `app_mentions:read` - receive bot mentions in channels +- `channels:history` - read messages in public channels +- `groups:history` - read messages in private channels +- `im:history` - read direct messages +- `mpim:history` - read group direct messages +- `chat:write` - send replies and thread messages +- `files:read` - read uploaded files and images +- `files:write` - upload outbound files +- `reactions:write` - add and remove reactions + +You should also add these recommended scopes if you want Slack conversation names and metadata to show up more completely in Memoh: + +- `channels:read` +- `groups:read` +- `im:read` +- `mpim:read` + +## Step 4: Subscribe to Bot Events + +In **Event Subscriptions**, enable bot events and add: + +- `app_mention` +- `message.channels` +- `message.groups` +- `message.im` +- `message.mpim` + +These are the inbound event types currently handled by the Slack adapter. + +## Step 5: Install the App to Your Workspace + +1. In **OAuth & Permissions**, click **Install to Workspace**. +2. Review the permission screen. +3. Authorize the app. +4. Copy the **Bot User OAuth Token**. It starts with `xoxb-`. + +Make sure the `xoxb-...` bot token and the `xapp-...` app-level token come from the same Slack app and workspace. + +## Step 6: Configure Memoh + +1. Open your Bot detail page in the Memoh Web UI. +2. Go to the **Platforms** tab. +3. Click **Add Channel** and select **Slack**. +4. Fill in: + - **Bot Token**: your `xoxb-...` token + - **App-Level Token**: your `xapp-...` token +5. Click **Save and Enable**. + +## Step 7: Add the Bot to Conversations + +After the channel is enabled, the Slack app still needs to be present in the conversations where you want it to work. + +- For direct messages: open a DM with the app and send a message. +- For public channels: invite the bot to the channel. +- For private channels: invite the bot explicitly after installation. + +If the bot can send messages but cannot read uploaded images or files, check that `files:read` is enabled. If it connects but receives no incoming messages, check the bot events and the matching history scopes again. + +## Features Supported + +- **Direct Messages and Channels**: Support for DMs, public channels, private channels, and threads. +- **Attachments**: Read uploaded images and files from Slack, and send files back. diff --git a/docs/docs/getting-started/channels.md b/docs/docs/getting-started/channels.md index fd3acf91..d62d8577 100644 --- a/docs/docs/getting-started/channels.md +++ b/docs/docs/getting-started/channels.md @@ -26,6 +26,7 @@ Configure your bot's connections from the **Platforms** tab in the Bot Detail pa | WeCom (WeWork) | [WeCom Configuration](/channels/wecom) | Enterprise workspace integration | | WeChat | [WeChat Configuration](/channels/weixin) | Personal QR login flow | | WeChat Official Account | [WeChat Official Account Configuration](/channels/wechatoa) | Official account webhook flow | +| Slack | [Slack Configuration](/channels/slack) | Replies, no streaming | Two WeChat adapters exist on purpose: diff --git a/go.mod b/go.mod index 56fa84c8..dd7d898a 100644 --- a/go.mod +++ b/go.mod @@ -121,6 +121,7 @@ require ( github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kenshaw/emoji v0.4.1 // indirect github.com/klauspost/compress v1.18.4 // indirect github.com/labstack/gommon v0.4.2 // indirect github.com/leodido/go-urn v1.4.0 // indirect @@ -157,6 +158,7 @@ require ( github.com/segmentio/asm v1.2.1 // indirect github.com/segmentio/encoding v0.5.4 // indirect github.com/sirupsen/logrus v1.9.4 // indirect + github.com/slack-go/slack v0.19.0 // indirect github.com/spf13/pflag v1.0.9 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect diff --git a/go.sum b/go.sum index c8e89d0b..23124016 100644 --- a/go.sum +++ b/go.sum @@ -257,6 +257,8 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kenshaw/emoji v0.4.1 h1:w0SQHeU4iLt+6UCulY88Zr3uIjBU23mjOlbDs4trif8= +github.com/kenshaw/emoji v0.4.1/go.mod h1:elpkKAS92j09SJvW/0sXfdMgltL7TQi7ToZD4tehPko= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= @@ -391,6 +393,8 @@ github.com/sergi/go-diff v1.4.0 h1:n/SP9D5ad1fORl+llWyN+D6qoUETXNZARKjyY2/KVCw= github.com/sergi/go-diff v1.4.0/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4= github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g= +github.com/slack-go/slack v0.19.0 h1:J8lL/nGTsIUX53HU8YxZeI3PDkA+sxZsFrI2Dew7h44= +github.com/slack-go/slack v0.19.0/go.mod h1:K81UmCivcYd/5Jmz8vLBfuyoZ3B4rQC2GHVXHteXiAE= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= diff --git a/internal/channel/adapters/slack/config.go b/internal/channel/adapters/slack/config.go new file mode 100644 index 00000000..abe6e8f4 --- /dev/null +++ b/internal/channel/adapters/slack/config.go @@ -0,0 +1,132 @@ +package slack + +import ( + "errors" + "strings" + + "github.com/memohai/memoh/internal/channel" +) + +// Config holds the Slack bot credentials extracted from a channel configuration. +type Config struct { + BotToken string // xoxb-... + AppToken string // xapp-... (required for Socket Mode) +} + +// UserConfig holds the identifiers used to target a Slack user or channel. +type UserConfig struct { + UserID string + ChannelID string + Username string +} + +func normalizeConfig(raw map[string]any) (map[string]any, error) { + cfg, err := parseConfig(raw) + if err != nil { + return nil, err + } + return map[string]any{ + "botToken": cfg.BotToken, + "appToken": cfg.AppToken, + }, nil +} + +func normalizeUserConfig(raw map[string]any) (map[string]any, error) { + cfg, err := parseUserConfig(raw) + if err != nil { + return nil, err + } + result := map[string]any{} + if cfg.UserID != "" { + result["user_id"] = cfg.UserID + } + if cfg.ChannelID != "" { + result["channel_id"] = cfg.ChannelID + } + if cfg.Username != "" { + result["username"] = cfg.Username + } + return result, nil +} + +func resolveTarget(raw map[string]any) (string, error) { + cfg, err := parseUserConfig(raw) + if err != nil { + return "", err + } + if cfg.ChannelID != "" { + return cfg.ChannelID, nil + } + if cfg.UserID != "" { + return cfg.UserID, nil + } + return "", errors.New("slack binding is incomplete") +} + +func matchBinding(raw map[string]any, criteria channel.BindingCriteria) bool { + cfg, err := parseUserConfig(raw) + if err != nil { + return false + } + if value := strings.TrimSpace(criteria.Attribute("user_id")); value != "" && value == cfg.UserID { + return true + } + if value := strings.TrimSpace(criteria.Attribute("channel_id")); value != "" && value == cfg.ChannelID { + return true + } + if value := strings.TrimSpace(criteria.Attribute("username")); value != "" && strings.EqualFold(value, cfg.Username) { + return true + } + if criteria.SubjectID != "" { + if criteria.SubjectID == cfg.UserID || criteria.SubjectID == cfg.ChannelID { + return true + } + } + return false +} + +func buildUserConfig(identity channel.Identity) map[string]any { + result := map[string]any{} + if value := strings.TrimSpace(identity.Attribute("user_id")); value != "" { + result["user_id"] = value + } + if value := strings.TrimSpace(identity.Attribute("channel_id")); value != "" { + result["channel_id"] = value + } + if value := strings.TrimSpace(identity.Attribute("username")); value != "" { + result["username"] = value + } + return result +} + +func parseConfig(raw map[string]any) (Config, error) { + botToken := strings.TrimSpace(channel.ReadString(raw, "botToken", "bot_token")) + if botToken == "" { + return Config{}, errors.New("slack botToken is required") + } + appToken := strings.TrimSpace(channel.ReadString(raw, "appToken", "app_token")) + if appToken == "" { + return Config{}, errors.New("slack appToken is required for Socket Mode") + } + return Config{BotToken: botToken, AppToken: appToken}, nil +} + +func parseUserConfig(raw map[string]any) (UserConfig, error) { + userID := strings.TrimSpace(channel.ReadString(raw, "userId", "user_id")) + channelID := strings.TrimSpace(channel.ReadString(raw, "channelId", "channel_id")) + username := strings.TrimSpace(channel.ReadString(raw, "username")) + + if userID == "" && channelID == "" { + return UserConfig{}, errors.New("slack user config requires user_id or channel_id") + } + + return UserConfig{ + UserID: userID, + ChannelID: channelID, + Username: username, + }, nil +} + +func normalizeTarget(raw string) string { + return strings.TrimSpace(raw) +} diff --git a/internal/channel/adapters/slack/descriptor.go b/internal/channel/adapters/slack/descriptor.go new file mode 100644 index 00000000..fd69b0e6 --- /dev/null +++ b/internal/channel/adapters/slack/descriptor.go @@ -0,0 +1,5 @@ +package slack + +import "github.com/memohai/memoh/internal/channel" + +const Type channel.ChannelType = "slack" diff --git a/internal/channel/adapters/slack/emoji.go b/internal/channel/adapters/slack/emoji.go new file mode 100644 index 00000000..00a6f052 --- /dev/null +++ b/internal/channel/adapters/slack/emoji.go @@ -0,0 +1,91 @@ +package slack + +import ( + "strings" + "unicode/utf8" + + "github.com/kenshaw/emoji" +) + +// resolveSlackEmoji converts a Unicode emoji character to its Slack shortcode +// name using the Gemoji dataset. Slack's reactions.add API requires shortcode +// names (e.g. "thumbsup") rather than Unicode characters (e.g. "👍"). +// If the input is already a valid shortcode (ASCII text) or cannot be resolved, +// it is returned after stripping any surrounding colons. +func resolveSlackEmoji(raw string) string { + raw = strings.Trim(raw, ":") + if raw == "" { + return raw + } + if resolved, ok := resolveSlackEmojiAlias(raw); ok { + return resolved + } + return raw +} + +func resolveSlackEmojiAlias(raw string) (string, bool) { + e := emoji.FromCode(raw) + if e != nil && len(e.Aliases) > 0 { + return e.Aliases[0], true + } + + base, tone, changed := splitEmojiSkinTone(raw) + if !changed { + return "", false + } + + e = emoji.FromCode(base) + if e == nil || len(e.Aliases) == 0 { + return "", false + } + + alias := e.Aliases[0] + if tone == emoji.Neutral { + return alias, true + } + return alias + "::skin-tone-" + slackSkinToneSuffix(tone), true +} + +func splitEmojiSkinTone(raw string) (string, emoji.SkinTone, bool) { + var ( + tone emoji.SkinTone + changed bool + runes = make([]rune, 0, utf8.RuneCountInString(raw)) + ) + + for _, r := range raw { + switch emoji.SkinTone(r) { + case emoji.Light, emoji.MediumLight, emoji.Medium, emoji.MediumDark, emoji.Dark: + tone = emoji.SkinTone(r) + changed = true + continue + } + if r == '\uFE0F' { + changed = true + continue + } + runes = append(runes, r) + } + + if !changed { + return raw, emoji.Neutral, false + } + return string(runes), tone, true +} + +func slackSkinToneSuffix(tone emoji.SkinTone) string { + switch tone { + case emoji.Light: + return "2" + case emoji.MediumLight: + return "3" + case emoji.Medium: + return "4" + case emoji.MediumDark: + return "5" + case emoji.Dark: + return "6" + default: + return "" + } +} diff --git a/internal/channel/adapters/slack/slack.go b/internal/channel/adapters/slack/slack.go new file mode 100644 index 00000000..533e5ed9 --- /dev/null +++ b/internal/channel/adapters/slack/slack.go @@ -0,0 +1,1211 @@ +package slack + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "strings" + "sync" + "time" + "unicode/utf8" + + "github.com/slack-go/slack" + "github.com/slack-go/slack/slackevents" + "github.com/slack-go/slack/socketmode" + + "github.com/memohai/memoh/internal/channel" + "github.com/memohai/memoh/internal/channel/common" + "github.com/memohai/memoh/internal/media" +) + +const ( + inboundDedupTTL = time.Minute + slackMaxLength = 40000 + channelNameTTL = 5 * time.Minute +) + +// assetOpener reads stored asset bytes by content hash. +type assetOpener interface { + Open(ctx context.Context, botID, contentHash string) (io.ReadCloser, media.Asset, error) +} + +type slackConnection struct { + api *slack.Client + sm *socketmode.Client + cancel context.CancelFunc +} + +type cachedSlackChannelName struct { + name string + chatType string + cachedAt time.Time +} + +type cachedSlackUserName struct { + displayName string + cachedAt time.Time +} + +type SlackAdapter struct { + logger *slog.Logger + mu sync.RWMutex + connections map[string]*slackConnection // keyed by config ID + seenMessages map[string]time.Time // keyed by configID:messageTS + channelNames map[string]cachedSlackChannelName // keyed by configID:channelID + userNames map[string]cachedSlackUserName // keyed by configID:userID + assets assetOpener + apiFactory func(Config, ...slack.Option) *slack.Client + authTest func(*slack.Client) (*slack.AuthTestResponse, error) + openConversation func(context.Context, *slack.Client, *slack.OpenConversationParameters) (*slack.Channel, bool, bool, error) + socketOpen func(Config) (*slack.Client, *socketmode.Client) + socketRun func(context.Context, *socketmode.Client) error + historyFetch func(context.Context, *slack.Client, *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) +} + +var ( + _ channel.Sender = (*SlackAdapter)(nil) + _ channel.StreamSender = (*SlackAdapter)(nil) + _ channel.Reactor = (*SlackAdapter)(nil) + _ channel.Receiver = (*SlackAdapter)(nil) + _ channel.AttachmentResolver = (*SlackAdapter)(nil) + _ channel.SelfDiscoverer = (*SlackAdapter)(nil) + _ channel.ConfigNormalizer = (*SlackAdapter)(nil) + _ channel.TargetResolver = (*SlackAdapter)(nil) + _ channel.BindingMatcher = (*SlackAdapter)(nil) + _ channel.ProcessingStatusNotifier = (*SlackAdapter)(nil) +) + +func NewSlackAdapter(log *slog.Logger) *SlackAdapter { + if log == nil { + log = slog.Default() + } + return &SlackAdapter{ + logger: log.With(slog.String("adapter", "slack")), + connections: make(map[string]*slackConnection), + seenMessages: make(map[string]time.Time), + channelNames: make(map[string]cachedSlackChannelName), + userNames: make(map[string]cachedSlackUserName), + apiFactory: func(cfg Config, options ...slack.Option) *slack.Client { + opts := []slack.Option{ + slack.OptionRetry(3), + } + opts = append(opts, options...) + return slack.New(cfg.BotToken, opts...) + }, + authTest: func(api *slack.Client) (*slack.AuthTestResponse, error) { + return api.AuthTest() + }, + openConversation: func(ctx context.Context, api *slack.Client, params *slack.OpenConversationParameters) (*slack.Channel, bool, bool, error) { + return api.OpenConversationContext(ctx, params) + }, + socketOpen: newSocketModeClient, + socketRun: func(ctx context.Context, sm *socketmode.Client) error { + return sm.RunContext(ctx) + }, + historyFetch: func(ctx context.Context, api *slack.Client, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) { + return api.GetConversationHistoryContext(ctx, params) + }, + } +} + +// SetAssetOpener configures the asset opener for reading stored attachments by content hash. +func (a *SlackAdapter) SetAssetOpener(opener assetOpener) { + a.mu.Lock() + defer a.mu.Unlock() + a.assets = opener +} + +func (*SlackAdapter) Type() channel.ChannelType { + return Type +} + +func (*SlackAdapter) Descriptor() channel.Descriptor { + return channel.Descriptor{ + Type: Type, + DisplayName: "Slack", + Capabilities: channel.ChannelCapabilities{ + Text: true, + Markdown: true, + Reply: true, + Attachments: true, + Media: true, + Streaming: true, + BlockStreaming: true, + Reactions: true, + Threads: true, + }, + ConfigSchema: channel.ConfigSchema{ + Version: 1, + Fields: map[string]channel.FieldSchema{ + "botToken": { + Type: channel.FieldSecret, + Required: true, + Title: "Bot Token", + Description: "Slack Bot User OAuth Token (xoxb-...)", + }, + "appToken": { + Type: channel.FieldSecret, + Required: true, + Title: "App-Level Token", + Description: "Slack App-Level Token for Socket Mode (xapp-...)", + }, + }, + }, + UserConfigSchema: channel.ConfigSchema{ + Version: 1, + Fields: map[string]channel.FieldSchema{ + "user_id": {Type: channel.FieldString}, + "channel_id": {Type: channel.FieldString}, + "username": {Type: channel.FieldString}, + }, + }, + TargetSpec: channel.TargetSpec{ + Format: "channel_id | user_id", + Hints: []channel.TargetHint{ + {Label: "Channel ID", Example: "C0123456789"}, + {Label: "User ID", Example: "U0123456789"}, + }, + }, + } +} + +func (a *SlackAdapter) newAPIClient(cfg Config, options ...slack.Option) *slack.Client { + if a != nil && a.apiFactory != nil { + return a.apiFactory(cfg, options...) + } + opts := []slack.Option{ + slack.OptionRetry(3), + } + opts = append(opts, options...) + return slack.New(cfg.BotToken, opts...) +} + +func newSocketModeClient(cfg Config) (*slack.Client, *socketmode.Client) { + api := slack.New( + cfg.BotToken, + slack.OptionRetry(3), + slack.OptionAppLevelToken(cfg.AppToken), + ) + return api, socketmode.New(api) +} + +func (a *SlackAdapter) getOrCreateConnection(channelCfg channel.ChannelConfig, cfg Config) (*slackConnection, error) { + a.mu.RLock() + conn, ok := a.connections[channelCfg.ID] + a.mu.RUnlock() + if ok { + return conn, nil + } + + a.mu.Lock() + defer a.mu.Unlock() + if c, ok := a.connections[channelCfg.ID]; ok { + return c, nil + } + + socketOpen := a.socketOpen + if socketOpen == nil { + socketOpen = newSocketModeClient + } + api, sm := socketOpen(cfg) + + conn = &slackConnection{ + api: api, + sm: sm, + } + a.connections[channelCfg.ID] = conn + return conn, nil +} + +func (a *SlackAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig, handler channel.InboundHandler) (channel.Connection, error) { + if a.logger != nil { + a.logger.Info("start", slog.String("config_id", cfg.ID)) + } + + slackCfg, err := parseConfig(cfg.Credentials) + if err != nil { + return nil, err + } + + conn, err := a.getOrCreateConnection(cfg, slackCfg) + if err != nil { + return nil, err + } + + // Discover self identity for filtering bot's own messages + authTest := a.authTest + if authTest == nil { + authTest = func(api *slack.Client) (*slack.AuthTestResponse, error) { + return api.AuthTest() + } + } + authResp, err := authTest(conn.api) + if err != nil { + a.clearConnection(cfg.ID) + return nil, fmt.Errorf("slack auth test: %w", err) + } + selfUserID := authResp.UserID + + smCtx, cancel := context.WithCancel(ctx) + conn.cancel = cancel + connectedCh := make(chan struct{}) + startErrCh := make(chan error, 1) + var startupOnce sync.Once + signalConnected := func() { + startupOnce.Do(func() { + close(connectedCh) + }) + } + signalStartupError := func(err error) { + if err == nil { + err = errors.New("slack socket mode startup failed") + } + select { + case startErrCh <- err: + default: + } + } + + go func() { + for { + select { + case <-smCtx.Done(): + return + case evt, ok := <-conn.sm.Events: + if !ok { + return + } + switch evt.Type { + case socketmode.EventTypeConnected: + signalConnected() + case socketmode.EventTypeInvalidAuth: + signalStartupError(errors.New("slack socket mode invalid auth")) + case socketmode.EventTypeConnectionError: + if connErr, ok := evt.Data.(*slack.ConnectionErrorEvent); ok && connErr != nil && connErr.ErrorObj != nil { + signalStartupError(fmt.Errorf("slack socket mode connect: %w", connErr.ErrorObj)) + } else { + signalStartupError(errors.New("slack socket mode connect failed")) + } + } + a.handleSocketModeEvent(smCtx, conn, evt, cfg, handler, selfUserID) + } + } + }() + + go func() { + socketRun := a.socketRun + if socketRun == nil { + socketRun = func(ctx context.Context, sm *socketmode.Client) error { + return sm.RunContext(ctx) + } + } + if err := socketRun(smCtx, conn.sm); err != nil { + if !errors.Is(err, context.Canceled) { + signalStartupError(fmt.Errorf("slack socket mode run: %w", err)) + } + if a.logger != nil && !errors.Is(err, context.Canceled) { + a.logger.Error("socket mode run error", slog.String("config_id", cfg.ID), slog.Any("error", err)) + } + } + }() + + select { + case <-connectedCh: + case err := <-startErrCh: + cancel() + a.clearConnection(cfg.ID) + return nil, err + case <-ctx.Done(): + cancel() + a.clearConnection(cfg.ID) + return nil, ctx.Err() + } + + stop := func(_ context.Context) error { + if a.logger != nil { + a.logger.Info("stop", slog.String("config_id", cfg.ID)) + } + cancel() + a.clearConnection(cfg.ID) + return nil + } + + return channel.NewConnection(cfg, stop), nil +} + +func (a *SlackAdapter) handleSocketModeEvent( + ctx context.Context, + conn *slackConnection, + evt socketmode.Event, + cfg channel.ChannelConfig, + handler channel.InboundHandler, + selfUserID string, +) { + switch evt.Type { + case socketmode.EventTypeEventsAPI: + eventsAPIEvent, ok := evt.Data.(slackevents.EventsAPIEvent) + if !ok { + return + } + if evt.Request != nil { + conn.sm.Ack(*evt.Request) + } + + if eventsAPIEvent.Type != slackevents.CallbackEvent { + return + } + + switch ev := eventsAPIEvent.InnerEvent.Data.(type) { + case *slackevents.MessageEvent: + a.handleMessageEvent(ctx, conn, ev, cfg, handler, selfUserID) + case *slackevents.AppMentionEvent: + a.handleAppMentionEvent(ctx, conn, ev, cfg, handler) + } + + case socketmode.EventTypeConnecting: + if a.logger != nil { + a.logger.Info("connecting to Slack Socket Mode", slog.String("config_id", cfg.ID)) + } + + case socketmode.EventTypeConnected: + if a.logger != nil { + a.logger.Info("connected to Slack Socket Mode", slog.String("config_id", cfg.ID)) + } + + case socketmode.EventTypeConnectionError: + if a.logger != nil { + a.logger.Error("Slack Socket Mode connection error", slog.String("config_id", cfg.ID)) + } + + case socketmode.EventTypeInteractive: + if evt.Request != nil { + conn.sm.Ack(*evt.Request) + } + + case socketmode.EventTypeSlashCommand: + if evt.Request != nil { + conn.sm.Ack(*evt.Request) + } + } +} + +func (a *SlackAdapter) handleMessageEvent( + ctx context.Context, + conn *slackConnection, + ev *slackevents.MessageEvent, + cfg channel.ChannelConfig, + handler channel.InboundHandler, + selfUserID string, +) { + if ev.BotID != "" || ev.User == "" || ev.User == selfUserID { + return + } + + // Skip message subtypes that aren't regular messages + if ev.SubType != "" && ev.SubType != "file_share" { + return + } + + text := strings.TrimSpace(ev.Text) + attachments := a.collectAttachments(ev.Message) + if text == "" && len(attachments) == 0 { + return + } + + if a.isDuplicateInbound(cfg.ID, ev.TimeStamp) { + return + } + + chatType := channel.ConversationTypeGroup + switch ev.ChannelType { + case "im": + chatType = channel.ConversationTypePrivate + case "mpim": + chatType = channel.ConversationTypeGroup + case "group": + chatType = channel.ConversationTypeGroup + } + + // Resolve user display name + displayName := a.resolveUserDisplayName(conn.api, cfg.ID, ev.User) + + isMentioned := strings.Contains(ev.Text, "<@"+selfUserID+">") + + threadID := ev.ThreadTimeStamp + if ev.Message != nil && strings.TrimSpace(ev.Message.ThreadTimestamp) != "" { + threadID = strings.TrimSpace(ev.Message.ThreadTimestamp) + } + conversationName, _ := a.lookupConversationInfo(ctx, conn.api, cfg.ID, ev.Channel) + + msg := channel.InboundMessage{ + Channel: Type, + Message: channel.Message{ + ID: ev.TimeStamp, + Format: channel.MessageFormatPlain, + Text: text, + Attachments: attachments, + }, + BotID: cfg.BotID, + ReplyTarget: ev.Channel, + Sender: channel.Identity{ + SubjectID: ev.User, + DisplayName: displayName, + Attributes: slackIdentityAttributes(ev.User, "", ev.ChannelType, ev.Channel), + }, + Conversation: channel.Conversation{ + ID: ev.Channel, + Type: chatType, + Name: conversationName, + ThreadID: threadID, + }, + ReceivedAt: time.Now().UTC(), + Source: "slack", + Metadata: map[string]any{ + "channel_type": ev.ChannelType, + "channel_name": conversationName, + "is_mentioned": isMentioned, + "thread_ts": threadID, + "subtype": ev.SubType, + }, + } + + if a.logger != nil { + a.logger.Info("inbound received", + slog.String("config_id", cfg.ID), + slog.String("chat_type", chatType), + slog.String("user_id", ev.User), + slog.String("text", common.SummarizeText(text)), + ) + } + + go func() { + if err := handler(ctx, cfg, msg); err != nil && a.logger != nil { + a.logger.Error("handle inbound failed", slog.String("config_id", cfg.ID), slog.Any("error", err)) + } + }() +} + +func (a *SlackAdapter) handleAppMentionEvent( + ctx context.Context, + conn *slackConnection, + ev *slackevents.AppMentionEvent, + cfg channel.ChannelConfig, + handler channel.InboundHandler, +) { + if ev.BotID != "" || ev.User == "" { + return + } + + text := strings.TrimSpace(ev.Text) + if text == "" { + return + } + + attachments := a.fetchMessageAttachments(ctx, conn.api, ev.Channel, ev.TimeStamp) + + if a.isDuplicateInbound(cfg.ID, ev.TimeStamp) { + return + } + + displayName := a.resolveUserDisplayName(conn.api, cfg.ID, ev.User) + + threadID := ev.ThreadTimeStamp + conversationName, conversationType := a.lookupConversationInfo(ctx, conn.api, cfg.ID, ev.Channel) + if conversationType == "" { + conversationType = channel.ConversationTypeGroup + } + + msg := channel.InboundMessage{ + Channel: Type, + Message: channel.Message{ + ID: ev.TimeStamp, + Format: channel.MessageFormatPlain, + Text: text, + Attachments: attachments, + }, + BotID: cfg.BotID, + ReplyTarget: ev.Channel, + Sender: channel.Identity{ + SubjectID: ev.User, + DisplayName: displayName, + Attributes: map[string]string{ + "user_id": ev.User, + }, + }, + Conversation: channel.Conversation{ + ID: ev.Channel, + Type: conversationType, + Name: conversationName, + ThreadID: threadID, + }, + ReceivedAt: time.Now().UTC(), + Source: "slack", + Metadata: map[string]any{ + "channel_name": conversationName, + "is_mentioned": true, + "thread_ts": threadID, + }, + } + + if a.logger != nil { + a.logger.Info("app mention received", + slog.String("config_id", cfg.ID), + slog.String("user_id", ev.User), + slog.String("text", common.SummarizeText(text)), + ) + } + + go func() { + if err := handler(ctx, cfg, msg); err != nil && a.logger != nil { + a.logger.Error("handle inbound failed", slog.String("config_id", cfg.ID), slog.Any("error", err)) + } + }() +} + +func (a *SlackAdapter) Send(ctx context.Context, cfg channel.ChannelConfig, msg channel.PreparedOutboundMessage) error { + slackCfg, err := parseConfig(cfg.Credentials) + if err != nil { + return err + } + api := a.newAPIClient(slackCfg) + target, err := a.resolveOutboundTarget(ctx, api, msg.Target) + if err != nil { + return err + } + + return a.sendSlackMessage(ctx, api, target, msg) +} + +func (a *SlackAdapter) sendSlackMessage(ctx context.Context, api *slack.Client, channelID string, msg channel.PreparedOutboundMessage) error { + text := truncateSlackText(msg.Message.Message.PlainText()) + threadTS := "" + if msg.Message.Message.Reply != nil && msg.Message.Message.Reply.MessageID != "" { + threadTS = msg.Message.Message.Reply.MessageID + } + + opts := []slack.MsgOption{ + slack.MsgOptionText(text, false), + } + + if threadTS != "" { + opts = append(opts, slack.MsgOptionTS(threadTS)) + } + + if len(msg.Message.Attachments) > 0 { + for _, att := range msg.Message.Attachments { + if err := a.uploadPreparedAttachment(ctx, api, channelID, threadTS, att); err != nil { + if a.logger != nil { + a.logger.Error("upload attachment failed", slog.Any("error", err)) + } + return err + } + } + } + + if text == "" && len(msg.Message.Attachments) > 0 { + return nil + } + + if text == "" { + return errors.New("cannot send empty message") + } + + _, _, err := api.PostMessageContext(ctx, channelID, opts...) + return err +} + +func (*SlackAdapter) uploadPreparedAttachment(ctx context.Context, api *slack.Client, channelID string, threadTS string, att channel.PreparedAttachment) error { + if att.Kind != channel.PreparedAttachmentUpload { + return fmt.Errorf("slack attachment requires upload source, got %s", att.Kind) + } + if att.Open == nil { + return errors.New("slack attachment upload is not openable") + } + + reader, err := att.Open(ctx) + if err != nil { + return err + } + defer func() { _ = reader.Close() }() + + data, err := media.ReadAllWithLimit(reader, media.MaxAssetBytes) + if err != nil { + return err + } + + name := strings.TrimSpace(att.Name) + if name == "" { + name = "attachment" + if ext := mimeExtension(strings.TrimSpace(att.Mime)); ext != "" { + name += ext + } + } + + _, err = api.UploadFileContext(ctx, slack.UploadFileParameters{ + Channel: channelID, + ThreadTimestamp: threadTS, + Filename: name, + Reader: bytes.NewReader(data), + FileSize: len(data), + }) + return err +} + +func (a *SlackAdapter) ResolveAttachment(ctx context.Context, cfg channel.ChannelConfig, attachment channel.Attachment) (channel.AttachmentPayload, error) { + slackCfg, err := parseConfig(cfg.Credentials) + if err != nil { + return channel.AttachmentPayload{}, err + } + + return a.resolveAttachmentWithClient(ctx, a.newAPIClient(slackCfg), attachment) +} + +func (*SlackAdapter) resolveAttachmentWithClient(ctx context.Context, api *slack.Client, attachment channel.Attachment) (channel.AttachmentPayload, error) { + downloadURL := strings.TrimSpace(attachment.URL) + if attachment.Size > media.MaxAssetBytes { + return channel.AttachmentPayload{}, fmt.Errorf("%w: max %d bytes", media.ErrAssetTooLarge, media.MaxAssetBytes) + } + if downloadURL == "" { + fileID := strings.TrimSpace(attachment.PlatformKey) + if fileID == "" { + return channel.AttachmentPayload{}, errors.New("slack attachment requires url or platform_key") + } + file, _, _, err := api.GetFileInfoContext(ctx, fileID, 0, 0) + if err != nil { + return channel.AttachmentPayload{}, fmt.Errorf("slack get file info: %w", err) + } + if file == nil { + return channel.AttachmentPayload{}, errors.New("slack file info response is empty") + } + downloadURL = strings.TrimSpace(file.URLPrivateDownload) + if downloadURL == "" { + downloadURL = strings.TrimSpace(file.URLPrivate) + } + if strings.TrimSpace(attachment.Name) == "" { + attachment.Name = strings.TrimSpace(file.Name) + } + if strings.TrimSpace(attachment.Mime) == "" { + attachment.Mime = strings.TrimSpace(file.Mimetype) + } + if attachment.Size <= 0 { + attachment.Size = int64(file.Size) + } + if attachment.Size > media.MaxAssetBytes { + return channel.AttachmentPayload{}, fmt.Errorf("%w: max %d bytes", media.ErrAssetTooLarge, media.MaxAssetBytes) + } + } + + if downloadURL == "" { + return channel.AttachmentPayload{}, errors.New("slack attachment download URL is empty") + } + + reader, err := streamSlackAttachment(ctx, api, downloadURL, media.MaxAssetBytes) + if err != nil { + return channel.AttachmentPayload{}, err + } + + return channel.AttachmentPayload{ + Reader: reader, + Mime: strings.TrimSpace(attachment.Mime), + Name: strings.TrimSpace(attachment.Name), + Size: attachment.Size, + }, nil +} + +func truncateSlackText(text string) string { + if utf8.RuneCountInString(text) <= slackMaxLength { + return text + } + runes := []rune(text) + return string(runes[:slackMaxLength-3]) + "..." +} + +func mimeExtension(mime string) string { + switch mime { + case "image/jpeg", "image/jpg": + return ".jpg" + case "image/png": + return ".png" + case "image/gif": + return ".gif" + case "image/webp": + return ".webp" + case "video/mp4": + return ".mp4" + case "video/webm": + return ".webm" + case "audio/mpeg", "audio/mp3": + return ".mp3" + case "audio/ogg": + return ".ogg" + case "audio/wav": + return ".wav" + case "application/pdf": + return ".pdf" + case "text/plain": + return ".txt" + default: + return "" + } +} + +func (a *SlackAdapter) OpenStream(ctx context.Context, cfg channel.ChannelConfig, target string, opts channel.StreamOptions) (channel.PreparedOutboundStream, error) { + target = strings.TrimSpace(target) + slackCfg, err := parseConfig(cfg.Credentials) + if err != nil { + return nil, err + } + api := a.newAPIClient(slackCfg) + target, err = a.resolveOutboundTarget(ctx, api, target) + if err != nil { + return nil, err + } + + reply := opts.Reply + if reply == nil && strings.TrimSpace(opts.SourceMessageID) != "" { + reply = &channel.ReplyRef{ + Target: target, + MessageID: strings.TrimSpace(opts.SourceMessageID), + } + } + + return &slackOutboundStream{ + adapter: a, + cfg: cfg, + target: target, + reply: reply, + api: api, + }, nil +} + +func (*SlackAdapter) ProcessingStarted(_ context.Context, _ channel.ChannelConfig, _ channel.InboundMessage, _ channel.ProcessingStatusInfo) (channel.ProcessingStatusHandle, error) { + // Slack does not have a public typing indicator API for bots + return channel.ProcessingStatusHandle{}, nil +} + +func (*SlackAdapter) ProcessingCompleted(_ context.Context, _ channel.ChannelConfig, _ channel.InboundMessage, _ channel.ProcessingStatusInfo, _ channel.ProcessingStatusHandle) error { + return nil +} + +func (*SlackAdapter) ProcessingFailed(_ context.Context, _ channel.ChannelConfig, _ channel.InboundMessage, _ channel.ProcessingStatusInfo, _ channel.ProcessingStatusHandle, _ error) error { + return nil +} + +func (a *SlackAdapter) React(ctx context.Context, cfg channel.ChannelConfig, target string, messageID string, emoji string) error { + slackCfg, err := parseConfig(cfg.Credentials) + if err != nil { + return err + } + api := a.newAPIClient(slackCfg) + target, err = a.resolveOutboundTarget(ctx, api, target) + if err != nil { + return err + } + + emoji = resolveSlackEmoji(emoji) + + return api.AddReaction(emoji, slack.ItemRef{ + Channel: target, + Timestamp: messageID, + }) +} + +func (a *SlackAdapter) Unreact(ctx context.Context, cfg channel.ChannelConfig, target string, messageID string, emoji string) error { + slackCfg, err := parseConfig(cfg.Credentials) + if err != nil { + return err + } + api := a.newAPIClient(slackCfg) + target, err = a.resolveOutboundTarget(ctx, api, target) + if err != nil { + return err + } + + emoji = resolveSlackEmoji(emoji) + + return api.RemoveReaction(emoji, slack.ItemRef{ + Channel: target, + Timestamp: messageID, + }) +} + +func (a *SlackAdapter) resolveOutboundTarget(ctx context.Context, api *slack.Client, target string) (string, error) { + target = strings.TrimSpace(target) + if target == "" { + return "", errors.New("slack target is required") + } + if !strings.HasPrefix(target, "U") { + return target, nil + } + openConversation := a.openConversation + if openConversation == nil { + openConversation = func(ctx context.Context, api *slack.Client, params *slack.OpenConversationParameters) (*slack.Channel, bool, bool, error) { + return api.OpenConversationContext(ctx, params) + } + } + conversation, _, _, err := openConversation(ctx, api, &slack.OpenConversationParameters{ + Users: []string{target}, + ReturnIM: true, + }) + if err != nil { + return "", fmt.Errorf("slack open dm conversation: %w", err) + } + if conversation == nil || strings.TrimSpace(conversation.ID) == "" { + return "", errors.New("slack open dm conversation returned empty channel") + } + return strings.TrimSpace(conversation.ID), nil +} + +func (a *SlackAdapter) DiscoverSelf(_ context.Context, credentials map[string]any) (map[string]any, string, error) { + cfg, err := parseConfig(credentials) + if err != nil { + return nil, "", err + } + + api := a.newAPIClient(cfg) + resp, err := api.AuthTest() + if err != nil { + return nil, "", fmt.Errorf("slack auth test: %w", err) + } + + identity := map[string]any{ + "user_id": resp.UserID, + "bot_id": resp.BotID, + "team_id": resp.TeamID, + "username": resp.User, + "team": resp.Team, + } + + return identity, resp.UserID, nil +} + +func (*SlackAdapter) NormalizeConfig(raw map[string]any) (map[string]any, error) { + return normalizeConfig(raw) +} + +func (*SlackAdapter) NormalizeUserConfig(raw map[string]any) (map[string]any, error) { + return normalizeUserConfig(raw) +} + +func (*SlackAdapter) NormalizeTarget(raw string) string { + return normalizeTarget(raw) +} + +func (*SlackAdapter) ResolveTarget(userConfig map[string]any) (string, error) { + return resolveTarget(userConfig) +} + +func (*SlackAdapter) MatchBinding(config map[string]any, criteria channel.BindingCriteria) bool { + return matchBinding(config, criteria) +} + +func (*SlackAdapter) BuildUserConfig(identity channel.Identity) map[string]any { + return buildUserConfig(identity) +} + +func (a *SlackAdapter) isDuplicateInbound(token, messageTS string) bool { + if strings.TrimSpace(token) == "" || strings.TrimSpace(messageTS) == "" { + return false + } + + now := time.Now().UTC() + expireBefore := now.Add(-inboundDedupTTL) + + a.mu.Lock() + defer a.mu.Unlock() + + for key, seenAt := range a.seenMessages { + if seenAt.Before(expireBefore) { + delete(a.seenMessages, key) + } + } + + seenKey := token + ":" + messageTS + if _, ok := a.seenMessages[seenKey]; ok { + return true + } + a.seenMessages[seenKey] = now + return false +} + +func (a *SlackAdapter) clearConnection(appToken string) { + a.mu.Lock() + defer a.mu.Unlock() + if conn, ok := a.connections[appToken]; ok { + if conn.cancel != nil { + conn.cancel() + } + delete(a.connections, appToken) + } +} + +func (a *SlackAdapter) resolveUserDisplayName(api *slack.Client, configID, userID string) string { + configID = strings.TrimSpace(configID) + userID = strings.TrimSpace(userID) + if api == nil || configID == "" || userID == "" { + return userID + } + cacheKey := configID + ":" + userID + + expireBefore := time.Now().UTC().Add(-channelNameTTL) + + a.mu.RLock() + cached, ok := a.userNames[cacheKey] + a.mu.RUnlock() + if ok && cached.cachedAt.After(expireBefore) { + return cached.displayName + } + + userInfo, err := api.GetUserInfo(userID) + if err != nil { + return userID + } + displayName := strings.TrimSpace(userInfo.Profile.DisplayName) + if displayName == "" { + displayName = strings.TrimSpace(userInfo.RealName) + } + if displayName == "" { + displayName = strings.TrimSpace(userInfo.Name) + } + if displayName == "" { + displayName = userID + } + + a.mu.Lock() + a.userNames[cacheKey] = cachedSlackUserName{displayName: displayName, cachedAt: time.Now().UTC()} + a.mu.Unlock() + return displayName +} + +func (*SlackAdapter) collectAttachments(msg *slack.Msg) []channel.Attachment { + if msg == nil || len(msg.Files) == 0 { + return nil + } + + attachments := make([]channel.Attachment, 0, len(msg.Files)) + for _, file := range msg.Files { + attachment := channel.Attachment{ + Type: channel.AttachmentFile, + PlatformKey: strings.TrimSpace(file.ID), + SourcePlatform: Type.String(), + Name: strings.TrimSpace(file.Name), + Size: int64(file.Size), + Mime: strings.TrimSpace(file.Mimetype), + } + + switch { + case strings.HasPrefix(file.Mimetype, "image/"): + attachment.Type = channel.AttachmentImage + case strings.HasPrefix(file.Mimetype, "video/"): + attachment.Type = channel.AttachmentVideo + case strings.HasPrefix(file.Mimetype, "audio/"): + attachment.Type = channel.AttachmentAudio + } + + attachments = append(attachments, attachment) + } + + return attachments +} + +func (a *SlackAdapter) fetchMessageAttachments(ctx context.Context, api *slack.Client, channelID string, ts string) []channel.Attachment { + if api == nil || strings.TrimSpace(channelID) == "" || strings.TrimSpace(ts) == "" { + return nil + } + historyFetch := a.historyFetch + if historyFetch == nil { + historyFetch = func(ctx context.Context, api *slack.Client, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) { + return api.GetConversationHistoryContext(ctx, params) + } + } + resp, err := historyFetch(ctx, api, &slack.GetConversationHistoryParameters{ + ChannelID: channelID, + Oldest: ts, + Latest: ts, + Inclusive: true, + Limit: 1, + }) + if err != nil || resp == nil || len(resp.Messages) == 0 { + return nil + } + msg := resp.Messages[0].Msg + return a.collectAttachments(&msg) +} + +func streamSlackAttachment(ctx context.Context, api *slack.Client, downloadURL string, maxBytes int64) (io.ReadCloser, error) { + if api == nil { + return nil, errors.New("slack client is required") + } + if maxBytes <= 0 { + return nil, fmt.Errorf("%w: max %d bytes", media.ErrAssetTooLarge, maxBytes) + } + streamCtx, cancel := context.WithCancel(ctx) + pr, pw := io.Pipe() + + go func() { + writer := &limitedSlackPipeWriter{ + pipe: pw, + maxBytes: maxBytes, + } + err := api.GetFileContext(streamCtx, downloadURL, writer) + if err == nil && writer.err != nil { + err = writer.err + } + if err != nil { + if errors.Is(err, media.ErrAssetTooLarge) { + err = fmt.Errorf("%w: max %d bytes", media.ErrAssetTooLarge, maxBytes) + } else { + err = fmt.Errorf("slack download file: %w", err) + } + _ = pw.CloseWithError(err) + return + } + _ = pw.Close() + }() + + return &slackAttachmentStream{ + ReadCloser: pr, + cancel: cancel, + }, nil +} + +type slackAttachmentStream struct { + io.ReadCloser + cancel context.CancelFunc +} + +func (s *slackAttachmentStream) Close() error { + if s == nil { + return nil + } + if s.cancel != nil { + s.cancel() + } + if s.ReadCloser == nil { + return nil + } + return s.ReadCloser.Close() +} + +type limitedSlackPipeWriter struct { + pipe *io.PipeWriter + maxBytes int64 + written int64 + err error +} + +func (w *limitedSlackPipeWriter) Write(p []byte) (int, error) { + if w == nil || w.pipe == nil { + return 0, errors.New("pipe writer is required") + } + if w.maxBytes <= 0 { + w.err = media.ErrAssetTooLarge + return 0, w.err + } + remaining := w.maxBytes - w.written + if remaining <= 0 { + w.err = media.ErrAssetTooLarge + return 0, w.err + } + if int64(len(p)) <= remaining { + n, err := w.pipe.Write(p) + w.written += int64(n) + return n, err + } + allowed := p[:remaining] + n, err := w.pipe.Write(allowed) + w.written += int64(n) + if err != nil { + return n, err + } + w.err = media.ErrAssetTooLarge + return n, w.err +} + +func slackIdentityAttributes(userID, username, channelType, channelID string) map[string]string { + attrs := map[string]string{} + if value := strings.TrimSpace(userID); value != "" { + attrs["user_id"] = value + } + if value := strings.TrimSpace(username); value != "" { + attrs["username"] = value + } + if strings.TrimSpace(channelType) == "im" { + if value := strings.TrimSpace(channelID); value != "" { + attrs["channel_id"] = value + } + } + return attrs +} + +func (a *SlackAdapter) lookupConversationName(ctx context.Context, api *slack.Client, configID, channelID string) string { + name, _ := a.lookupConversationInfo(ctx, api, configID, channelID) + return name +} + +func (a *SlackAdapter) lookupConversationInfo(ctx context.Context, api *slack.Client, configID, channelID string) (string, string) { + configID = strings.TrimSpace(configID) + channelID = strings.TrimSpace(channelID) + if api == nil || configID == "" || channelID == "" { + return "", "" + } + + cacheKey := configID + ":" + channelID + expireBefore := time.Now().UTC().Add(-channelNameTTL) + + a.mu.RLock() + cached, ok := a.channelNames[cacheKey] + a.mu.RUnlock() + if ok && cached.cachedAt.After(expireBefore) { + return cached.name, cached.chatType + } + + name, chatType, err := a.fetchConversationInfo(ctx, api, channelID) + if err != nil { + if a.logger != nil { + a.logger.Debug("resolve slack conversation name failed", + slog.String("channel_id", channelID), + slog.Any("error", err), + ) + } + return "", "" + } + if name == "" && chatType == "" { + return "", "" + } + + a.mu.Lock() + a.channelNames[cacheKey] = cachedSlackChannelName{name: name, chatType: chatType, cachedAt: time.Now().UTC()} + a.mu.Unlock() + return name, chatType +} + +func (*SlackAdapter) fetchConversationInfo(ctx context.Context, api *slack.Client, channelID string) (string, string, error) { + info, err := api.GetConversationInfoContext(ctx, &slack.GetConversationInfoInput{ + ChannelID: channelID, + }) + if err != nil { + return "", "", err + } + if info == nil { + return "", "", nil + } + + name := strings.TrimSpace(info.Name) + if name == "" { + name = strings.TrimSpace(info.NameNormalized) + } + chatType := channel.ConversationTypeGroup + switch { + case info.IsIM: + chatType = channel.ConversationTypePrivate + case info.IsMpIM: + chatType = channel.ConversationTypeGroup + case info.IsPrivate: + chatType = channel.ConversationTypeGroup + } + return name, chatType, nil +} diff --git a/internal/channel/adapters/slack/slack_test.go b/internal/channel/adapters/slack/slack_test.go new file mode 100644 index 00000000..fdc326f0 --- /dev/null +++ b/internal/channel/adapters/slack/slack_test.go @@ -0,0 +1,1308 @@ +package slack + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/slack-go/slack" + "github.com/slack-go/slack/slackevents" + "github.com/slack-go/slack/socketmode" + + "github.com/memohai/memoh/internal/channel" + "github.com/memohai/memoh/internal/media" +) + +var ( + testBotToken = strings.Join([]string{"xoxb", "test", "token"}, "-") + testShortBotToken = strings.Join([]string{"xoxb", "test"}, "-") + testAppToken = strings.Join([]string{"xapp", "test"}, "-") + testBadAppToken = strings.Join([]string{"xapp", "bad"}, "-") + testDownloadAuthValue = "Bearer " + testBotToken +) + +func TestSlackRegistryExposesSupportedInterfaces(t *testing.T) { + t.Parallel() + + reg := channel.NewRegistry() + reg.MustRegister(NewSlackAdapter(nil)) + + if sender, ok := reg.GetSender(Type); !ok || sender == nil { + t.Fatal("expected Slack adapter to implement channel.Sender") + } + if streamSender, ok := reg.GetStreamSender(Type); !ok || streamSender == nil { + t.Fatal("expected Slack adapter to implement channel.StreamSender") + } + if editor, ok := reg.GetMessageEditor(Type); ok || editor != nil { + t.Fatal("did not expect Slack adapter to implement channel.MessageEditor") + } +} + +func TestSlackDescriptorDoesNotAdvertiseEdit(t *testing.T) { + t.Parallel() + + if NewSlackAdapter(nil).Descriptor().Capabilities.Edit { + t.Fatal("Slack descriptor should not advertise edit support") + } +} + +func TestSlackResolveOutboundTargetUsesDMForUserID(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + adapter.openConversation = func(_ context.Context, _ *slack.Client, params *slack.OpenConversationParameters) (*slack.Channel, bool, bool, error) { + if len(params.Users) != 1 || params.Users[0] != "U123" { + t.Fatalf("unexpected users: %#v", params.Users) + } + if !params.ReturnIM { + t.Fatal("expected ReturnIM to be true") + } + return &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: "D123"}}}, false, false, nil + } + + target, err := adapter.resolveOutboundTarget(context.Background(), slack.New(testShortBotToken), "U123") + if err != nil { + t.Fatalf("resolveOutboundTarget: %v", err) + } + if target != "D123" { + t.Fatalf("expected DM channel target, got %q", target) + } +} + +func TestSlackResolveOutboundTargetRejectsEmptyDMChannel(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + adapter.openConversation = func(_ context.Context, _ *slack.Client, _ *slack.OpenConversationParameters) (*slack.Channel, bool, bool, error) { + return &slack.Channel{}, false, false, nil + } + + _, err := adapter.resolveOutboundTarget(context.Background(), slack.New(testShortBotToken), "U123") + if err == nil || !strings.Contains(err.Error(), "empty channel") { + t.Fatalf("expected empty channel error, got %v", err) + } +} + +func TestSlackCollectAttachmentsOmitsPrivateURLForInboundFiles(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + attachments := adapter.collectAttachments(&slack.Msg{ + Files: []slack.File{{ + ID: "F123", + Name: "cat.png", + Mimetype: "image/png", + Size: 42, + URLPrivateDownload: "https://files.slack.test/F123", + }}, + }) + + if len(attachments) != 1 { + t.Fatalf("expected 1 attachment, got %d", len(attachments)) + } + if attachments[0].URL != "" { + t.Fatalf("expected private URL to be omitted, got %q", attachments[0].URL) + } + if attachments[0].PlatformKey != "F123" { + t.Fatalf("unexpected platform key: %q", attachments[0].PlatformKey) + } + if attachments[0].Type != channel.AttachmentImage { + t.Fatalf("unexpected attachment type: %q", attachments[0].Type) + } +} + +func TestResolveSlackEmoji(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + in string + want string + }{ + {name: "unicode", in: "👍", want: "+1"}, + {name: "shortcode with colons", in: ":thumbsup:", want: "thumbsup"}, + {name: "shortcode plain", in: "thumbsup", want: "thumbsup"}, + {name: "skin tone unicode", in: "👍🏽", want: "+1::skin-tone-4"}, + {name: "variation selector unicode", in: "✌️", want: "v"}, + {name: "skin tone wave", in: "👋🏻", want: "wave::skin-tone-2"}, + {name: "unknown passthrough", in: "not-an-emoji", want: "not-an-emoji"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := resolveSlackEmoji(tt.in); got != tt.want { + t.Fatalf("resolveSlackEmoji(%q) = %q, want %q", tt.in, got, tt.want) + } + }) + } +} + +func TestSlackConnectClearsCachedClientWhenAuthFails(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + clientTokens := make(map[*slack.Client]string) + factoryCalls := 0 + adapter.socketOpen = func(cfg Config) (*slack.Client, *socketmode.Client) { + factoryCalls++ + api := slack.New(cfg.BotToken) + clientTokens[api] = cfg.BotToken + return api, socketmode.New(api) + } + adapter.authTest = func(api *slack.Client) (*slack.AuthTestResponse, error) { + if clientTokens[api] == "bad-token" { + return nil, errors.New("invalid bot token") + } + return &slack.AuthTestResponse{UserID: "U123"}, nil + } + adapter.socketRun = func(ctx context.Context, sm *socketmode.Client) error { + select { + case sm.Events <- socketmode.Event{Type: socketmode.EventTypeConnected}: + case <-ctx.Done(): + return ctx.Err() + } + <-ctx.Done() + return ctx.Err() + } + + cfg := channel.ChannelConfig{ + ID: "cfg-auth-retry", + BotID: "bot-1", + ChannelType: Type, + Credentials: map[string]any{ + "botToken": "bad-token", + "appToken": testAppToken, + }, + } + if _, err := adapter.Connect(context.Background(), cfg, func(context.Context, channel.ChannelConfig, channel.InboundMessage) error { + return nil + }); err == nil { + t.Fatal("expected auth failure") + } + if len(adapter.connections) != 0 { + t.Fatal("failed startup should not leave cached Slack clients behind") + } + + cfg.Credentials["botToken"] = "good-token" + conn, err := adapter.Connect(context.Background(), cfg, func(context.Context, channel.ChannelConfig, channel.InboundMessage) error { + return nil + }) + if err != nil { + t.Fatalf("Connect retry should succeed: %v", err) + } + if factoryCalls != 2 { + t.Fatalf("expected 2 client constructions after retry, got %d", factoryCalls) + } + if err := conn.Stop(context.Background()); err != nil { + t.Fatalf("Stop: %v", err) + } +} + +func TestSlackConnectFailsWhenSocketModeStartupFails(t *testing.T) { + t.Parallel() + + startErr := errors.New("invalid app token") + adapter := NewSlackAdapter(nil) + adapter.socketOpen = func(cfg Config) (*slack.Client, *socketmode.Client) { + api := slack.New(cfg.BotToken) + return api, socketmode.New(api) + } + adapter.authTest = func(*slack.Client) (*slack.AuthTestResponse, error) { + return &slack.AuthTestResponse{UserID: "U123"}, nil + } + adapter.socketRun = func(ctx context.Context, sm *socketmode.Client) error { + select { + case sm.Events <- socketmode.Event{ + Type: socketmode.EventTypeConnectionError, + Data: &slack.ConnectionErrorEvent{ErrorObj: startErr}, + }: + case <-ctx.Done(): + return ctx.Err() + } + return startErr + } + + cfg := channel.ChannelConfig{ + ID: "cfg-startup-error", + BotID: "bot-1", + ChannelType: Type, + Credentials: map[string]any{ + "botToken": testShortBotToken, + "appToken": testBadAppToken, + }, + } + conn, err := adapter.Connect(context.Background(), cfg, func(context.Context, channel.ChannelConfig, channel.InboundMessage) error { + return nil + }) + if err == nil { + if conn != nil { + _ = conn.Stop(context.Background()) + } + t.Fatal("expected socket mode startup failure") + } + if !strings.Contains(err.Error(), "invalid app token") { + t.Fatalf("unexpected startup error: %v", err) + } + if len(adapter.connections) != 0 { + t.Fatal("startup failure should clear cached Slack connection") + } +} + +func TestSlackResolveAttachmentDownloadsPrivateURLWithBearerToken(t *testing.T) { + t.Parallel() + + var gotAuth string + adapter := NewSlackAdapter(nil) + client := slack.New( + testBotToken, + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + if r.URL.String() != "https://files.slack.test/private/file.txt" { + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + gotAuth = r.Header.Get("Authorization") + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/plain"}}, + Body: io.NopCloser(strings.NewReader("slack-private-file")), + }, nil + })}), + slack.OptionRetry(3), + ) + + payload, err := adapter.resolveAttachmentWithClient(context.Background(), client, channel.Attachment{ + URL: "https://files.slack.test/private/file.txt", + Name: "file.txt", + Mime: "text/plain", + Size: 18, + Type: channel.AttachmentFile, + PlatformKey: "F123", + }) + if err != nil { + t.Fatalf("ResolveAttachment: %v", err) + } + defer func() { _ = payload.Reader.Close() }() + + data, err := io.ReadAll(payload.Reader) + if err != nil { + t.Fatalf("ReadAll: %v", err) + } + if string(data) != "slack-private-file" { + t.Fatalf("unexpected payload: %q", string(data)) + } + if gotAuth != testDownloadAuthValue { + t.Fatalf("unexpected auth header: %q", gotAuth) + } +} + +func TestSlackResolveAttachmentFallsBackToFilesInfo(t *testing.T) { + t.Parallel() + + var gotFileToken string + var gotDownloadAuth string + adapter := NewSlackAdapter(nil) + client := slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/files.info": + if err := r.ParseForm(); err != nil { + t.Fatalf("ParseForm: %v", err) + } + gotFileToken = r.FormValue("token") + body, _ := json.Marshal(map[string]any{ + "ok": true, + "file": map[string]any{ + "id": "F123", + "name": "fallback.txt", + "mimetype": "text/plain", + "size": 13, + "url_private_download": "https://files.slack.test/download/F123", + }, + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://files.slack.test/download/F123": + gotDownloadAuth = r.Header.Get("Authorization") + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/plain"}}, + Body: io.NopCloser(strings.NewReader("fallback-file")), + }, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ) + + payload, err := adapter.resolveAttachmentWithClient(context.Background(), client, channel.Attachment{ + PlatformKey: "F123", + Type: channel.AttachmentFile, + }) + if err != nil { + t.Fatalf("resolveAttachmentWithClient: %v", err) + } + defer func() { _ = payload.Reader.Close() }() + + data, err := io.ReadAll(payload.Reader) + if err != nil { + t.Fatalf("ReadAll: %v", err) + } + if string(data) != "fallback-file" { + t.Fatalf("unexpected payload: %q", string(data)) + } + if gotFileToken != testBotToken { + t.Fatalf("unexpected files.info token: %q", gotFileToken) + } + if gotDownloadAuth != testDownloadAuthValue { + t.Fatalf("unexpected download auth header: %q", gotDownloadAuth) + } + if payload.Name != "fallback.txt" { + t.Fatalf("unexpected name: %q", payload.Name) + } + if payload.Mime != "text/plain" { + t.Fatalf("unexpected mime: %q", payload.Mime) + } +} + +func TestSlackResolveAttachmentRejectsOversizedKnownSlackFile(t *testing.T) { + t.Parallel() + + var downloadCalls int + adapter := NewSlackAdapter(nil) + client := slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/files.info": + body, _ := json.Marshal(map[string]any{ + "ok": true, + "file": map[string]any{ + "id": "F999", + "name": "huge.bin", + "mimetype": "application/octet-stream", + "size": media.MaxAssetBytes + 1, + "url_private_download": "https://files.slack.test/download/F999", + }, + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://files.slack.test/download/F999": + downloadCalls++ + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/octet-stream"}}, + Body: io.NopCloser(strings.NewReader("should-not-download")), + }, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ) + + _, err := adapter.resolveAttachmentWithClient(context.Background(), client, channel.Attachment{ + PlatformKey: "F999", + Type: channel.AttachmentFile, + }) + if err == nil { + t.Fatal("expected oversized attachment error") + } + if !strings.Contains(err.Error(), "media asset too large") { + t.Fatalf("unexpected error: %v", err) + } + if downloadCalls != 0 { + t.Fatalf("expected oversized file to be rejected before download, got %d download calls", downloadCalls) + } +} + +func TestSlackHandleMessageEventStoresDMChannelID(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + conn := &slackConnection{} + cfg := channel.ChannelConfig{ID: "cfg-1", BotID: "bot-1"} + msgCh := make(chan channel.InboundMessage, 1) + + adapter.handleMessageEvent(context.Background(), conn, &slackevents.MessageEvent{ + User: "U123", + Text: "hello", + TimeStamp: "1710000000.000100", + Channel: "D123", + ChannelType: "im", + Message: &slack.Msg{Text: "hello"}, + }, cfg, func(_ context.Context, _ channel.ChannelConfig, msg channel.InboundMessage) error { + msgCh <- msg + return nil + }, "UBOT") + + select { + case msg := <-msgCh: + if got := msg.Sender.Attribute("channel_id"); got != "D123" { + t.Fatalf("unexpected channel_id: %q", got) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for inbound message") + } +} + +func TestSlackHandleMessageEventSkipsChannelIDOutsideDM(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + conn := &slackConnection{} + cfg := channel.ChannelConfig{ID: "cfg-2", BotID: "bot-1"} + msgCh := make(chan channel.InboundMessage, 1) + + adapter.handleMessageEvent(context.Background(), conn, &slackevents.MessageEvent{ + User: "U123", + Text: "hello", + TimeStamp: "1710000000.000101", + Channel: "C123", + ChannelType: "channel", + Message: &slack.Msg{Text: "hello"}, + }, cfg, func(_ context.Context, _ channel.ChannelConfig, msg channel.InboundMessage) error { + msgCh <- msg + return nil + }, "UBOT") + + select { + case msg := <-msgCh: + if got := msg.Sender.Attribute("channel_id"); got != "" { + t.Fatalf("expected empty channel_id, got %q", got) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for inbound message") + } +} + +func TestSlackHandleMessageEventResolvesConversationName(t *testing.T) { + t.Parallel() + + var conversationsInfoCalls int + adapter := NewSlackAdapter(nil) + conn := &slackConnection{ + api: slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/conversations.info": + conversationsInfoCalls++ + if err := r.ParseForm(); err != nil { + t.Fatalf("ParseForm: %v", err) + } + body, _ := json.Marshal(map[string]any{ + "ok": true, + "channel": map[string]any{ + "id": "C123", + "name": "general", + }, + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ), + } + cfg := channel.ChannelConfig{ID: "cfg-name", BotID: "bot-1"} + msgCh := make(chan channel.InboundMessage, 1) + + adapter.handleMessageEvent(context.Background(), conn, &slackevents.MessageEvent{ + User: "U123", + Text: "hello", + TimeStamp: "1710000000.000102", + Channel: "C123", + ChannelType: "channel", + Message: &slack.Msg{Text: "hello"}, + }, cfg, func(_ context.Context, _ channel.ChannelConfig, msg channel.InboundMessage) error { + msgCh <- msg + return nil + }, "UBOT") + + select { + case msg := <-msgCh: + if msg.Conversation.Name != "general" { + t.Fatalf("unexpected conversation name: %q", msg.Conversation.Name) + } + gotMeta, _ := msg.Metadata["channel_name"].(string) + if gotMeta != "general" { + t.Fatalf("unexpected metadata channel_name: %q", gotMeta) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for inbound message") + } + if conversationsInfoCalls != 1 { + t.Fatalf("unexpected conversations.info calls: %d", conversationsInfoCalls) + } +} + +func TestSlackLookupConversationNameCachesResolvedNames(t *testing.T) { + t.Parallel() + + var conversationsInfoCalls int + adapter := NewSlackAdapter(nil) + api := slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/conversations.info": + conversationsInfoCalls++ + body, _ := json.Marshal(map[string]any{ + "ok": true, + "channel": map[string]any{ + "id": "C123", + "name": "general", + }, + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ) + + first := adapter.lookupConversationName(context.Background(), api, "cfg-cache", "C123") + second := adapter.lookupConversationName(context.Background(), api, "cfg-cache", "C123") + if first != "general" || second != "general" { + t.Fatalf("unexpected cached names: %q / %q", first, second) + } + if conversationsInfoCalls != 1 { + t.Fatalf("unexpected conversations.info calls: %d", conversationsInfoCalls) + } +} + +func TestSlackHandleMessageEventKeepsFlowWhenConversationNameLookupFails(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + conn := &slackConnection{ + api: slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/conversations.info": + body, _ := json.Marshal(map[string]any{ + "ok": false, + "error": "missing_scope", + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ), + } + cfg := channel.ChannelConfig{ID: "cfg-name-fail", BotID: "bot-1"} + msgCh := make(chan channel.InboundMessage, 1) + + adapter.handleMessageEvent(context.Background(), conn, &slackevents.MessageEvent{ + User: "U123", + Text: "hello", + TimeStamp: "1710000000.000103", + Channel: "C123", + ChannelType: "channel", + Message: &slack.Msg{Text: "hello"}, + }, cfg, func(_ context.Context, _ channel.ChannelConfig, msg channel.InboundMessage) error { + msgCh <- msg + return nil + }, "UBOT") + + select { + case msg := <-msgCh: + if msg.Conversation.Name != "" { + t.Fatalf("expected empty conversation name, got %q", msg.Conversation.Name) + } + gotMeta, _ := msg.Metadata["channel_name"].(string) + if gotMeta != "" { + t.Fatalf("expected empty metadata channel_name, got %q", gotMeta) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for inbound message") + } +} + +func TestSlackHandleAppMentionEventPreservesAttachments(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + adapter.historyFetch = func(_ context.Context, _ *slack.Client, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) { + if params == nil || params.ChannelID != "C123" || params.Oldest != "1710000000.000200" || !params.Inclusive { + t.Fatalf("unexpected history params: %+v", params) + } + return &slack.GetConversationHistoryResponse{ + Messages: []slack.Message{{ + Msg: slack.Msg{ + Text: "hi <@UBOT>", + Files: []slack.File{{ + ID: "F123", + Name: "cat.png", + Mimetype: "image/png", + Size: 42, + URLPrivateDownload: "https://files.slack.test/F123", + }}, + }, + }}, + }, nil + } + + conn := &slackConnection{api: slack.New(testBotToken)} + cfg := channel.ChannelConfig{ID: "cfg-mention", BotID: "bot-1"} + msgCh := make(chan channel.InboundMessage, 1) + + adapter.handleAppMentionEvent(context.Background(), conn, &slackevents.AppMentionEvent{ + User: "U123", + Text: "hi <@UBOT>", + TimeStamp: "1710000000.000200", + Channel: "C123", + }, cfg, func(_ context.Context, _ channel.ChannelConfig, msg channel.InboundMessage) error { + msgCh <- msg + return nil + }) + + select { + case msg := <-msgCh: + if len(msg.Message.Attachments) != 1 { + t.Fatalf("expected 1 attachment, got %d", len(msg.Message.Attachments)) + } + if got := msg.Message.Attachments[0].PlatformKey; got != "F123" { + t.Fatalf("unexpected attachment platform key: %q", got) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for inbound mention message") + } +} + +func TestSlackHandleAppMentionEventPreservesPrivateChannelType(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + adapter.mu.Lock() + adapter.channelNames["cfg-private:C999"] = cachedSlackChannelName{ + name: "ops-private", + chatType: channel.ConversationTypeGroup, + cachedAt: time.Now().UTC(), + } + adapter.mu.Unlock() + + conn := &slackConnection{api: slack.New(testBotToken)} + cfg := channel.ChannelConfig{ID: "cfg-private", BotID: "bot-1"} + msgCh := make(chan channel.InboundMessage, 1) + + adapter.handleAppMentionEvent(context.Background(), conn, &slackevents.AppMentionEvent{ + User: "U123", + Text: "hi <@UBOT>", + TimeStamp: "1710000000.000201", + Channel: "C999", + }, cfg, func(_ context.Context, _ channel.ChannelConfig, msg channel.InboundMessage) error { + msgCh <- msg + return nil + }) + + select { + case msg := <-msgCh: + if msg.Conversation.Type != channel.ConversationTypeGroup { + t.Fatalf("unexpected conversation type: %q", msg.Conversation.Type) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for inbound mention message") + } +} + +func TestSlackSendReturnsAttachmentUploadFailures(t *testing.T) { + t.Parallel() + + var postMessageCalls int + adapter := NewSlackAdapter(nil) + api := slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/files.getUploadURLExternal": + if err := r.ParseForm(); err != nil { + t.Fatalf("ParseForm: %v", err) + } + body, _ := json.Marshal(map[string]any{ + "ok": true, + "upload_url": "https://upload.slack.test/fail", + "file_id": "F123", + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://upload.slack.test/fail": + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(strings.NewReader("upload failed")), + Header: make(http.Header), + }, nil + case "https://slack.test/api/files.completeUploadExternal": + t.Fatal("completeUploadExternal should not be called after failed upload") + return nil, nil + case "https://slack.test/api/chat.postMessage": + postMessageCalls++ + t.Fatal("chat.postMessage should not be called after failed attachment upload") + return nil, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ) + + err := adapter.sendSlackMessage(context.Background(), api, "C123", channel.PreparedOutboundMessage{ + Message: channel.PreparedMessage{ + Message: channel.Message{ + Text: "hello", + }, + Attachments: []channel.PreparedAttachment{preparedSlackUploadAttachment("hello.txt", "text/plain", "hello")}, + }, + }) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "UploadToURL") { + t.Fatalf("unexpected error: %v", err) + } + if postMessageCalls != 0 { + t.Fatalf("unexpected chat.postMessage calls: %d", postMessageCalls) + } +} + +func TestSlackSendAttachmentOnlyReturnsUploadFailures(t *testing.T) { + t.Parallel() + + api := slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/files.getUploadURLExternal": + body, _ := json.Marshal(map[string]any{ + "ok": true, + "upload_url": "https://upload.slack.test/fail-only-attachment", + "file_id": "F124", + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://upload.slack.test/fail-only-attachment": + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(strings.NewReader("upload failed")), + Header: make(http.Header), + }, nil + case "https://slack.test/api/files.completeUploadExternal": + t.Fatal("completeUploadExternal should not be called after failed attachment upload") + return nil, nil + case "https://slack.test/api/chat.postMessage": + t.Fatal("chat.postMessage should not be called for attachment-only failure") + return nil, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ) + + adapter := NewSlackAdapter(nil) + err := adapter.sendSlackMessage(context.Background(), api, "C123", channel.PreparedOutboundMessage{ + Message: channel.PreparedMessage{ + Message: channel.Message{ + Attachments: []channel.Attachment{{ + Type: channel.AttachmentFile, + Name: "hello.txt", + Mime: "text/plain", + Size: 5, + }}, + }, + Attachments: []channel.PreparedAttachment{preparedSlackUploadAttachment("hello.txt", "text/plain", "hello")}, + }, + }) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "UploadToURL") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestSlackStreamAttachmentOnlyClearsPlaceholder(t *testing.T) { + t.Parallel() + + var postCalls, deleteCalls, uploadCalls int + client := slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/chat.postMessage": + postCalls++ + body, _ := json.Marshal(map[string]any{ + "ok": true, + "channel": "C123", + "ts": "1710000000.000300", + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://slack.test/api/chat.delete": + deleteCalls++ + body, _ := json.Marshal(map[string]any{ + "ok": true, + "channel": "C123", + "ts": "1710000000.000300", + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://slack.test/api/files.getUploadURLExternal": + uploadCalls++ + body, _ := json.Marshal(map[string]any{ + "ok": true, + "upload_url": "https://upload.slack.test/F200", + "file_id": "F200", + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://upload.slack.test/F200": + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("ok")), + Header: make(http.Header), + }, nil + case "https://slack.test/api/files.completeUploadExternal": + body, _ := json.Marshal(map[string]any{"ok": true, "files": []map[string]any{{"id": "F200"}}}) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ) + + stream := &slackOutboundStream{ + adapter: NewSlackAdapter(nil), + cfg: channel.ChannelConfig{ID: "cfg-stream"}, + target: "C123", + api: client, + } + if err := stream.Push(context.Background(), channel.PreparedStreamEvent{ + Type: channel.StreamEventStatus, + Status: channel.StreamStatusStarted, + }); err != nil { + t.Fatalf("status push: %v", err) + } + if err := stream.Push(context.Background(), channel.PreparedStreamEvent{ + Type: channel.StreamEventAttachment, + Attachments: []channel.PreparedAttachment{ + preparedSlackUploadAttachment("hello.txt", "text/plain", "hello"), + }, + }); err != nil { + t.Fatalf("attachment push: %v", err) + } + + if postCalls != 1 { + t.Fatalf("expected 1 placeholder post, got %d", postCalls) + } + if deleteCalls != 1 { + t.Fatalf("expected placeholder delete before attachment upload, got %d", deleteCalls) + } + if uploadCalls != 1 { + t.Fatalf("expected 1 attachment upload, got %d", uploadCalls) + } + if stream.msgTS != "" { + t.Fatalf("expected placeholder state to be cleared, got msgTS=%q", stream.msgTS) + } +} + +func TestSlackStreamFinalFallbackDeletesOldPlaceholder(t *testing.T) { + t.Parallel() + + var postCalls, updateCalls, deleteCalls int + client := slack.New( + testBotToken, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + switch r.URL.String() { + case "https://slack.test/api/chat.postMessage": + postCalls++ + ts := "1710000000.000400" + if postCalls > 1 { + ts = "1710000000.000401" + } + body, _ := json.Marshal(map[string]any{ + "ok": true, + "channel": "C123", + "ts": ts, + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://slack.test/api/chat.update": + updateCalls++ + body, _ := json.Marshal(map[string]any{ + "ok": false, + "error": "cant_update_message", + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + case "https://slack.test/api/chat.delete": + deleteCalls++ + body, _ := json.Marshal(map[string]any{ + "ok": true, + "channel": "C123", + "ts": "1710000000.000400", + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + default: + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + })}), + slack.OptionRetry(3), + ) + + stream := &slackOutboundStream{ + adapter: NewSlackAdapter(nil), + cfg: channel.ChannelConfig{ID: "cfg-stream-fallback"}, + target: "C123", + api: client, + } + if err := stream.Push(context.Background(), channel.PreparedStreamEvent{ + Type: channel.StreamEventStatus, + Status: channel.StreamStatusStarted, + }); err != nil { + t.Fatalf("status push: %v", err) + } + if err := stream.Push(context.Background(), channel.PreparedStreamEvent{ + Type: channel.StreamEventFinal, + Final: &channel.PreparedStreamFinalizePayload{ + Message: channel.PreparedMessage{ + Message: channel.Message{Text: "final answer"}, + }, + }, + }); err != nil { + t.Fatalf("final push: %v", err) + } + + if postCalls != 2 { + t.Fatalf("expected 2 postMessage calls, got %d", postCalls) + } + if updateCalls != 1 { + t.Fatalf("expected 1 update attempt, got %d", updateCalls) + } + if deleteCalls != 1 { + t.Fatalf("expected stale placeholder to be deleted once, got %d", deleteCalls) + } + if stream.msgTS != "1710000000.000401" { + t.Fatalf("expected stream to track replacement message, got %q", stream.msgTS) + } +} + +func preparedSlackUploadAttachment(name string, mime string, content string) channel.PreparedAttachment { + return channel.PreparedAttachment{ + Logical: channel.Attachment{ + Type: channel.AttachmentFile, + Name: name, + Mime: mime, + Size: int64(len(content)), + }, + Kind: channel.PreparedAttachmentUpload, + Name: name, + Mime: mime, + Size: int64(len(content)), + Open: func(context.Context) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(content)), nil + }, + } +} + +func TestSlackSendResolvesUserTargetToDMChannel(t *testing.T) { + t.Parallel() + + var gotChannel string + adapter := NewSlackAdapter(nil) + adapter.apiFactory = func(cfg Config, options ...slack.Option) *slack.Client { + return slack.New( + cfg.BotToken, + append(options, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + if r.URL.String() != "https://slack.test/api/chat.postMessage" { + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + if err := r.ParseForm(); err != nil { + t.Fatalf("ParseForm: %v", err) + } + gotChannel = r.FormValue("channel") + body, _ := json.Marshal(map[string]any{"ok": true, "channel": gotChannel, "ts": "1710000000.000100"}) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + })}), + )..., + ) + } + adapter.openConversation = func(_ context.Context, _ *slack.Client, params *slack.OpenConversationParameters) (*slack.Channel, bool, bool, error) { + if len(params.Users) != 1 || params.Users[0] != "U123" { + t.Fatalf("unexpected users: %#v", params.Users) + } + return &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: "D456"}}}, false, false, nil + } + + err := adapter.Send(context.Background(), channel.ChannelConfig{ + Credentials: map[string]any{ + "botToken": testShortBotToken, + "appToken": testAppToken, + }, + }, channel.PreparedOutboundMessage{ + Target: "U123", + Message: channel.PreparedMessage{ + Message: channel.Message{Text: "hello"}, + }, + }) + if err != nil { + t.Fatalf("Send: %v", err) + } + if gotChannel != "D456" { + t.Fatalf("expected postMessage channel D456, got %q", gotChannel) + } +} + +func TestSlackOpenStreamResolvesUserTargetToDMChannel(t *testing.T) { + t.Parallel() + + adapter := NewSlackAdapter(nil) + adapter.openConversation = func(_ context.Context, _ *slack.Client, params *slack.OpenConversationParameters) (*slack.Channel, bool, bool, error) { + if len(params.Users) != 1 || params.Users[0] != "U123" { + t.Fatalf("unexpected users: %#v", params.Users) + } + return &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: "D999"}}}, false, false, nil + } + + stream, err := adapter.OpenStream(context.Background(), channel.ChannelConfig{ + Credentials: map[string]any{ + "botToken": testShortBotToken, + "appToken": testAppToken, + }, + }, "U123", channel.StreamOptions{}) + if err != nil { + t.Fatalf("OpenStream: %v", err) + } + + slackStream, ok := stream.(*slackOutboundStream) + if !ok { + t.Fatalf("unexpected stream type %T", stream) + } + if slackStream.target != "D999" { + t.Fatalf("expected resolved DM target, got %q", slackStream.target) + } +} + +func TestSlackReactConvertsSkinToneEmojiToSlackName(t *testing.T) { + t.Parallel() + + var gotName string + adapter := NewSlackAdapter(nil) + adapter.apiFactory = func(cfg Config, options ...slack.Option) *slack.Client { + return slack.New( + cfg.BotToken, + append(options, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + if r.URL.String() != "https://slack.test/api/reactions.add" { + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + if err := r.ParseForm(); err != nil { + t.Fatalf("ParseForm: %v", err) + } + gotName = r.FormValue("name") + body, _ := json.Marshal(map[string]any{"ok": true}) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + })}), + )..., + ) + } + + err := adapter.React(context.Background(), channel.ChannelConfig{ + Credentials: map[string]any{ + "botToken": testShortBotToken, + "appToken": testAppToken, + }, + }, "C123", "1710000000.000100", "👍🏽") + if err != nil { + t.Fatalf("React: %v", err) + } + if gotName != "+1::skin-tone-4" { + t.Fatalf("expected skin tone slack reaction name, got %q", gotName) + } +} + +func TestSlackUnreactConvertsSkinToneEmojiToSlackName(t *testing.T) { + t.Parallel() + + var gotName string + adapter := NewSlackAdapter(nil) + adapter.apiFactory = func(cfg Config, options ...slack.Option) *slack.Client { + return slack.New( + cfg.BotToken, + append(options, + slack.OptionAPIURL("https://slack.test/api/"), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + if r.URL.String() != "https://slack.test/api/reactions.remove" { + return &http.Response{StatusCode: http.StatusNotFound, Body: io.NopCloser(strings.NewReader("not found")), Header: make(http.Header)}, nil + } + if err := r.ParseForm(); err != nil { + t.Fatalf("ParseForm: %v", err) + } + gotName = r.FormValue("name") + body, _ := json.Marshal(map[string]any{"ok": true}) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + })}), + )..., + ) + } + + err := adapter.Unreact(context.Background(), channel.ChannelConfig{ + Credentials: map[string]any{ + "botToken": testShortBotToken, + "appToken": testAppToken, + }, + }, "C123", "1710000000.000100", "👍🏽") + if err != nil { + t.Fatalf("Unreact: %v", err) + } + if gotName != "+1::skin-tone-4" { + t.Fatalf("expected skin tone slack reaction name, got %q", gotName) + } +} + +func TestSlackResolveUserDisplayNameScopesCacheByConfig(t *testing.T) { + t.Parallel() + + newClient := func(apiURL, displayName string, calls *int) *slack.Client { + return slack.New( + testBotToken, + slack.OptionAPIURL(apiURL), + slack.OptionHTTPClient(&http.Client{Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + if !strings.HasSuffix(r.URL.String(), "/users.info") { + return &http.Response{ + StatusCode: http.StatusNotFound, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader("not found")), + }, nil + } + *calls++ + body, _ := json.Marshal(map[string]any{ + "ok": true, + "user": map[string]any{ + "id": "U123", + "name": strings.ToLower(displayName), + "profile": map[string]any{ + "display_name": displayName, + }, + }, + }) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(string(body))), + }, nil + })}), + ) + } + + var callsA, callsB int + adapter := NewSlackAdapter(nil) + apiA := newClient("https://slack-a.test/api/", "Alice A", &callsA) + apiB := newClient("https://slack-b.test/api/", "Alice B", &callsB) + + if got := adapter.resolveUserDisplayName(apiA, "cfg-a", "U123"); got != "Alice A" { + t.Fatalf("cfg-a first lookup = %q", got) + } + if got := adapter.resolveUserDisplayName(apiB, "cfg-b", "U123"); got != "Alice B" { + t.Fatalf("cfg-b first lookup = %q", got) + } + if got := adapter.resolveUserDisplayName(apiA, "cfg-a", "U123"); got != "Alice A" { + t.Fatalf("cfg-a cached lookup = %q", got) + } + + if callsA != 1 { + t.Fatalf("expected cfg-a to fetch once, got %d", callsA) + } + if callsB != 1 { + t.Fatalf("expected cfg-b to fetch once, got %d", callsB) + } +} + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} diff --git a/internal/channel/adapters/slack/stream.go b/internal/channel/adapters/slack/stream.go new file mode 100644 index 00000000..994311b8 --- /dev/null +++ b/internal/channel/adapters/slack/stream.go @@ -0,0 +1,395 @@ +package slack + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strings" + "sync" + "sync/atomic" + "time" + + slackapi "github.com/slack-go/slack" + + "github.com/memohai/memoh/internal/channel" +) + +const ( + slackStreamUpdateThrottle = 1500 * time.Millisecond + slackStreamRetryFallback = 2 * time.Second + slackStreamFinalMaxRetries = 3 +) + +type slackOutboundStream struct { + adapter *SlackAdapter + cfg channel.ChannelConfig + target string + reply *channel.ReplyRef + api *slackapi.Client + closed atomic.Bool + mu sync.Mutex + msgTS string // Slack message timestamp (used as message ID) + buffer strings.Builder + lastSent string + lastUpdate time.Time + nextUpdate time.Time +} + +var _ channel.PreparedOutboundStream = (*slackOutboundStream)(nil) + +func (s *slackOutboundStream) Push(ctx context.Context, event channel.PreparedStreamEvent) error { + if s == nil || s.adapter == nil { + return errors.New("slack stream not configured") + } + if s.closed.Load() { + return errors.New("slack stream is closed") + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + switch event.Type { + case channel.StreamEventStatus: + if event.Status == channel.StreamStatusStarted { + return s.ensureMessage(ctx, "Thinking...") + } + return nil + + case channel.StreamEventDelta: + if event.Delta == "" || event.Phase == channel.StreamPhaseReasoning { + return nil + } + s.mu.Lock() + s.buffer.WriteString(event.Delta) + s.mu.Unlock() + + return s.updateMessage(ctx) + + case channel.StreamEventFinal: + if event.Final == nil { + return errors.New("slack stream final payload is required") + } + s.mu.Lock() + bufText := strings.TrimSpace(s.buffer.String()) + s.mu.Unlock() + finalText := bufText + if authoritative := strings.TrimSpace(event.Final.Message.Message.PlainText()); authoritative != "" { + finalText = authoritative + } + if finalText != "" { + if err := s.finalizeMessage(ctx, finalText); err != nil { + return err + } + } else if err := s.clearPlaceholder(ctx); err != nil { + return err + } + for _, att := range event.Final.Message.Attachments { + if err := s.sendAttachment(ctx, att); err != nil { + return err + } + } + return nil + + case channel.StreamEventError: + errText := channel.RedactIMErrorText(strings.TrimSpace(event.Error)) + if errText == "" { + return nil + } + return s.finalizeMessage(ctx, "Error: "+errText) + + case channel.StreamEventAttachment: + if len(event.Attachments) == 0 { + return nil + } + s.mu.Lock() + finalText := strings.TrimSpace(s.buffer.String()) + s.mu.Unlock() + if finalText != "" { + if err := s.finalizeMessage(ctx, finalText); err != nil { + return err + } + } else if err := s.clearPlaceholder(ctx); err != nil { + return err + } + for _, att := range event.Attachments { + if err := s.sendAttachment(ctx, att); err != nil { + return err + } + } + return nil + + case channel.StreamEventAgentStart, channel.StreamEventAgentEnd, + channel.StreamEventPhaseStart, channel.StreamEventPhaseEnd, + channel.StreamEventProcessingStarted, channel.StreamEventProcessingCompleted, + channel.StreamEventProcessingFailed, + channel.StreamEventToolCallStart, channel.StreamEventToolCallEnd, + channel.StreamEventReaction, channel.StreamEventSpeech: + return nil + + default: + return fmt.Errorf("unsupported stream event type: %s", event.Type) + } +} + +func (s *slackOutboundStream) Close(ctx context.Context) error { + if s == nil { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + s.closed.Store(true) + return nil +} + +func (s *slackOutboundStream) ensureMessage(ctx context.Context, text string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.msgTS != "" { + return nil + } + + text = truncateSlackText(text) + + ts, err := s.postMessageWithRetry(ctx, text) + if err != nil { + return err + } + + s.msgTS = ts + s.lastSent = normalizeSlackStreamText(text) + s.lastUpdate = time.Now() + s.nextUpdate = s.lastUpdate.Add(slackStreamUpdateThrottle) + return nil +} + +func (s *slackOutboundStream) updateMessage(ctx context.Context) error { + s.mu.Lock() + msgTS := s.msgTS + content := truncateSlackText(strings.TrimSpace(s.buffer.String())) + lastSent := s.lastSent + nextUpdate := s.nextUpdate + s.mu.Unlock() + + if msgTS == "" || content == "" { + return nil + } + if normalizeSlackStreamText(content) == normalizeSlackStreamText(lastSent) { + return nil + } + if time.Now().Before(nextUpdate) { + return nil + } + + err := s.updateMessageText(ctx, msgTS, content) + if err == nil { + s.mu.Lock() + s.lastSent = normalizeSlackStreamText(content) + s.lastUpdate = time.Now() + s.nextUpdate = s.lastUpdate.Add(slackStreamUpdateThrottle) + s.mu.Unlock() + return nil + } + if delay, ok := slackRetryDelay(err); ok { + s.mu.Lock() + s.nextUpdate = time.Now().Add(delay) + s.mu.Unlock() + if s.adapter != nil && s.adapter.logger != nil { + s.adapter.logger.Warn("slack stream update throttled", + slog.String("config_id", s.cfg.ID), + slog.String("target", s.target), + slog.Duration("retry_after", delay), + slog.Any("error", err), + ) + } + return nil + } + if s.adapter != nil && s.adapter.logger != nil { + s.adapter.logger.Warn("slack stream update failed", + slog.String("config_id", s.cfg.ID), + slog.String("target", s.target), + slog.Any("error", err), + ) + } + return nil +} + +func (s *slackOutboundStream) finalizeMessage(ctx context.Context, text string) error { + s.mu.Lock() + text = truncateSlackText(text) + msgTS := s.msgTS + lastSent := s.lastSent + s.mu.Unlock() + + if normalizeSlackStreamText(text) == normalizeSlackStreamText(lastSent) && msgTS != "" { + return nil + } + + if msgTS == "" { + ts, err := s.postMessageWithRetry(ctx, text) + if err != nil { + return err + } + s.mu.Lock() + s.msgTS = ts + s.lastSent = normalizeSlackStreamText(text) + s.lastUpdate = time.Now() + s.nextUpdate = s.lastUpdate.Add(slackStreamUpdateThrottle) + s.mu.Unlock() + return nil + } + + err := s.updateMessageTextWithRetry(ctx, msgTS, text) + if err == nil { + s.mu.Lock() + s.lastSent = normalizeSlackStreamText(text) + s.lastUpdate = time.Now() + s.nextUpdate = s.lastUpdate.Add(slackStreamUpdateThrottle) + s.mu.Unlock() + return nil + } + + if s.adapter != nil && s.adapter.logger != nil { + s.adapter.logger.Warn("slack stream final update failed, falling back to new message", + slog.String("config_id", s.cfg.ID), + slog.String("target", s.target), + slog.Any("error", err), + ) + } + + if err := s.clearPlaceholder(ctx); err != nil { + return err + } + + ts, postErr := s.postMessageWithRetry(ctx, text) + if postErr != nil { + return postErr + } + s.mu.Lock() + s.msgTS = ts + s.lastSent = normalizeSlackStreamText(text) + s.lastUpdate = time.Now() + s.nextUpdate = s.lastUpdate.Add(slackStreamUpdateThrottle) + s.mu.Unlock() + return nil +} + +func (s *slackOutboundStream) clearPlaceholder(ctx context.Context) error { + s.mu.Lock() + msgTS := s.msgTS + s.mu.Unlock() + + if msgTS == "" { + return nil + } + if _, _, err := s.api.DeleteMessageContext(ctx, s.target, msgTS); err != nil { + return err + } + + s.mu.Lock() + s.msgTS = "" + s.lastSent = "" + s.lastUpdate = time.Time{} + s.nextUpdate = time.Time{} + s.mu.Unlock() + return nil +} + +func (s *slackOutboundStream) sendAttachment(ctx context.Context, att channel.PreparedAttachment) error { + threadTS := "" + if s.reply != nil && s.reply.MessageID != "" { + threadTS = s.reply.MessageID + } + return s.adapter.uploadPreparedAttachment(ctx, s.api, s.target, threadTS, att) +} + +func (s *slackOutboundStream) postMessageWithRetry(ctx context.Context, text string) (string, error) { + opts := []slackapi.MsgOption{ + slackapi.MsgOptionText(text, false), + } + if s.reply != nil && s.reply.MessageID != "" { + opts = append(opts, slackapi.MsgOptionTS(s.reply.MessageID)) + } + + var lastErr error + for attempt := 0; attempt < slackStreamFinalMaxRetries; attempt++ { + _, ts, err := s.api.PostMessageContext(ctx, s.target, opts...) + if err == nil { + return ts, nil + } + lastErr = err + delay, ok := slackRetryDelay(err) + if !ok { + return "", err + } + if err := sleepWithContext(ctx, delay); err != nil { + return "", err + } + } + return "", lastErr +} + +func (s *slackOutboundStream) updateMessageText(ctx context.Context, msgTS string, text string) error { + _, _, _, err := s.api.UpdateMessageContext( + ctx, + s.target, + msgTS, + slackapi.MsgOptionText(text, false), + ) + return err +} + +func (s *slackOutboundStream) updateMessageTextWithRetry(ctx context.Context, msgTS string, text string) error { + var lastErr error + for attempt := 0; attempt < slackStreamFinalMaxRetries; attempt++ { + err := s.updateMessageText(ctx, msgTS, text) + if err == nil { + return nil + } + lastErr = err + delay, ok := slackRetryDelay(err) + if !ok { + return err + } + if err := sleepWithContext(ctx, delay); err != nil { + return err + } + } + return lastErr +} + +func slackRetryDelay(err error) (time.Duration, bool) { + var rateLimitedErr *slackapi.RateLimitedError + if errors.As(err, &rateLimitedErr) { + if rateLimitedErr.RetryAfter > 0 { + return rateLimitedErr.RetryAfter, true + } + return slackStreamRetryFallback, true + } + return 0, false +} + +func sleepWithContext(ctx context.Context, delay time.Duration) error { + if delay <= 0 { + return nil + } + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + +func normalizeSlackStreamText(value string) string { + return strings.TrimSpace(value) +} diff --git a/internal/channel/types.go b/internal/channel/types.go index 468aa4bb..e200c2b4 100644 --- a/internal/channel/types.go +++ b/internal/channel/types.go @@ -446,4 +446,5 @@ const ( ChannelTypeWeixin ChannelType = "weixin" ChannelTypeWeChatOA ChannelType = "wechatoa" ChannelTypeLocal ChannelType = "local" + ChannelTypeSlack ChannelType = "slack" )