feat(telegram): add processing reaction and fix reply-to-bot trigger

Add ProcessingStatusNotifier implementation: show 👀 reaction while
processing, remove on completion/failure. Fix isReplyToBot to match
only the current bot (bot.Self.ID) instead of any bot, preventing
multiple bots from responding to the same reply. Add rune-safe text
truncation for Telegram message length limit.
This commit is contained in:
BBQ
2026-02-13 20:45:14 +08:00
parent 14a1f9cad3
commit f3ea18a2da
2 changed files with 200 additions and 6 deletions
+88 -6
View File
@@ -204,7 +204,7 @@ func (a *TelegramAdapter) Connect(ctx context.Context, cfg channel.ChannelConfig
replyRef := buildTelegramReplyRef(update.Message, chatID)
isReplyToBot := update.Message.ReplyToMessage != nil &&
update.Message.ReplyToMessage.From != nil &&
update.Message.ReplyToMessage.From.IsBot
update.Message.ReplyToMessage.From.ID == bot.Self.ID
isMentioned := isTelegramBotMentioned(update.Message, bot.Self.UserName)
msg := channel.InboundMessage{
Channel: Type,
@@ -420,7 +420,7 @@ func sendTelegramText(bot *tgbotapi.BotAPI, target string, text string, replyTo
// sendTelegramTextReturnMessage sends a text message and returns the chat ID and message ID for later editing.
func sendTelegramTextReturnMessage(bot *tgbotapi.BotAPI, target string, text string, replyTo int, parseMode string) (chatID int64, messageID int, err error) {
text = sanitizeTelegramText(text)
text = truncateTelegramText(sanitizeTelegramText(text))
var sent tgbotapi.Message
if strings.HasPrefix(target, "@") {
message := tgbotapi.NewMessageToChannel(target, text)
@@ -459,10 +459,7 @@ var sendEditForTest func(bot *tgbotapi.BotAPI, edit tgbotapi.EditMessageTextConf
// editTelegramMessageText sends an edit request. It handles "message is not modified"
// silently but returns 429 and other errors to the caller for higher-level retry decisions.
func editTelegramMessageText(bot *tgbotapi.BotAPI, chatID int64, messageID int, text string, parseMode string) error {
text = sanitizeTelegramText(text)
if len(text) > telegramMaxMessageLength {
text = text[:telegramMaxMessageLength-3] + "..."
}
text = truncateTelegramText(sanitizeTelegramText(text))
edit := tgbotapi.NewEditMessageText(chatID, messageID, text)
edit.ParseMode = parseMode
send := sendEditForTest
@@ -826,3 +823,88 @@ func sanitizeTelegramText(text string) string {
}
return strings.ToValidUTF8(text, "")
}
// truncateTelegramText truncates text to telegramMaxMessageLength on a valid
// UTF-8 rune boundary, appending "..." when truncation occurs.
func truncateTelegramText(text string) string {
if len(text) <= telegramMaxMessageLength {
return text
}
const suffix = "..."
limit := telegramMaxMessageLength - len(suffix)
// Walk backwards to a rune boundary.
for limit > 0 && !utf8.RuneStart(text[limit]) {
limit--
}
return text[:limit] + suffix
}
const processingReactionEmoji = "👀"
// ProcessingStarted adds a reaction to the user's message to indicate processing.
func (a *TelegramAdapter) ProcessingStarted(ctx context.Context, cfg channel.ChannelConfig, msg channel.InboundMessage, info channel.ProcessingStatusInfo) (channel.ProcessingStatusHandle, error) {
chatID := strings.TrimSpace(info.ReplyTarget)
messageID := strings.TrimSpace(info.SourceMessageID)
if chatID == "" || messageID == "" {
return channel.ProcessingStatusHandle{}, nil
}
telegramCfg, err := parseConfig(cfg.Credentials)
if err != nil {
return channel.ProcessingStatusHandle{}, err
}
bot, err := a.getOrCreateBot(telegramCfg.BotToken, cfg.ID)
if err != nil {
return channel.ProcessingStatusHandle{}, err
}
if err := setTelegramReaction(bot, chatID, messageID, processingReactionEmoji); err != nil {
if a.logger != nil {
a.logger.Warn("add processing reaction failed", slog.String("config_id", cfg.ID), slog.Any("error", err))
}
return channel.ProcessingStatusHandle{}, nil
}
return channel.ProcessingStatusHandle{Token: processingReactionEmoji}, nil
}
// ProcessingCompleted removes the processing reaction after reply is sent.
func (a *TelegramAdapter) ProcessingCompleted(ctx context.Context, cfg channel.ChannelConfig, msg channel.InboundMessage, info channel.ProcessingStatusInfo, handle channel.ProcessingStatusHandle) error {
if handle.Token == "" {
return nil
}
chatID := strings.TrimSpace(info.ReplyTarget)
messageID := strings.TrimSpace(info.SourceMessageID)
if chatID == "" || messageID == "" {
return nil
}
telegramCfg, err := parseConfig(cfg.Credentials)
if err != nil {
return err
}
bot, err := a.getOrCreateBot(telegramCfg.BotToken, cfg.ID)
if err != nil {
return err
}
return clearTelegramReaction(bot, chatID, messageID)
}
// ProcessingFailed removes the processing reaction on failure.
func (a *TelegramAdapter) ProcessingFailed(ctx context.Context, cfg channel.ChannelConfig, msg channel.InboundMessage, info channel.ProcessingStatusInfo, handle channel.ProcessingStatusHandle, cause error) error {
return a.ProcessingCompleted(ctx, cfg, msg, info, handle)
}
func setTelegramReaction(bot *tgbotapi.BotAPI, chatID, messageID, emoji string) error {
params := tgbotapi.Params{}
params.AddNonEmpty("chat_id", chatID)
params.AddNonEmpty("message_id", messageID)
params.AddNonEmpty("reaction", fmt.Sprintf(`[{"type":"emoji","emoji":"%s"}]`, emoji))
_, err := bot.MakeRequest("setMessageReaction", params)
return err
}
func clearTelegramReaction(bot *tgbotapi.BotAPI, chatID, messageID string) error {
params := tgbotapi.Params{}
params.AddNonEmpty("chat_id", chatID)
params.AddNonEmpty("message_id", messageID)
params.AddNonEmpty("reaction", "[]")
_, err := bot.MakeRequest("setMessageReaction", params)
return err
}
@@ -6,6 +6,7 @@ import (
"strings"
"testing"
"time"
"unicode/utf8"
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
"github.com/memohai/memoh/internal/channel"
@@ -380,6 +381,69 @@ func TestGetTelegramRetryAfter(t *testing.T) {
}
}
func TestTruncateTelegramText(t *testing.T) {
t.Parallel()
short := "hello"
if got := truncateTelegramText(short); got != short {
t.Fatalf("short text should not be truncated: %q", got)
}
// Exactly at limit.
exact := strings.Repeat("a", telegramMaxMessageLength)
if got := truncateTelegramText(exact); got != exact {
t.Fatalf("exact-limit text should not be truncated, len=%d", len(got))
}
// Over limit with ASCII.
over := strings.Repeat("a", telegramMaxMessageLength+100)
got := truncateTelegramText(over)
if len(got) > telegramMaxMessageLength {
t.Fatalf("truncated text should be <= %d bytes: got %d", telegramMaxMessageLength, len(got))
}
if !strings.HasSuffix(got, "...") {
t.Fatalf("truncated text should end with '...': %q", got[len(got)-10:])
}
// Over limit with multi-byte characters (Chinese: 3 bytes each).
multi := strings.Repeat("\u4f60", telegramMaxMessageLength)
got = truncateTelegramText(multi)
if len(got) > telegramMaxMessageLength {
t.Fatalf("truncated multi-byte text should be <= %d bytes: got %d", telegramMaxMessageLength, len(got))
}
if !strings.HasSuffix(got, "...") {
t.Fatal("truncated multi-byte text should end with '...'")
}
// Verify no broken runes.
trimmed := strings.TrimSuffix(got, "...")
for i := 0; i < len(trimmed); {
r, size := utf8.DecodeRuneInString(trimmed[i:])
if r == utf8.RuneError && size == 1 {
t.Fatalf("truncated text contains invalid UTF-8 at byte %d", i)
}
i += size
}
}
func TestSanitizeTelegramText(t *testing.T) {
t.Parallel()
valid := "hello world"
if got := sanitizeTelegramText(valid); got != valid {
t.Fatalf("valid text should not change: %q", got)
}
// Invalid UTF-8 byte sequence.
invalid := "hello\xff\xfeworld"
got := sanitizeTelegramText(invalid)
if !utf8.ValidString(got) {
t.Fatalf("sanitized text should be valid UTF-8: %q", got)
}
if got != "helloworld" {
t.Fatalf("expected invalid bytes stripped: %q", got)
}
}
func TestEditTelegramMessageText_429ReturnsError(t *testing.T) {
t.Parallel()
@@ -407,3 +471,51 @@ func TestEditTelegramMessageText_429ReturnsError(t *testing.T) {
t.Fatalf("send should be called once (no internal retry): got %d", sendCalls)
}
}
func TestTelegramAdapter_ImplementsProcessingStatusNotifier(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
var _ channel.ProcessingStatusNotifier = adapter
}
func TestProcessingStarted_EmptyParams(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
ctx := context.Background()
cfg := channel.ChannelConfig{}
msg := channel.InboundMessage{}
handle, err := adapter.ProcessingStarted(ctx, cfg, msg, channel.ProcessingStatusInfo{})
if err != nil {
t.Fatalf("empty params should not error: %v", err)
}
if handle.Token != "" {
t.Fatalf("empty params should return empty handle: %q", handle.Token)
}
}
func TestProcessingCompleted_EmptyHandle(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
ctx := context.Background()
err := adapter.ProcessingCompleted(ctx, channel.ChannelConfig{}, channel.InboundMessage{}, channel.ProcessingStatusInfo{}, channel.ProcessingStatusHandle{})
if err != nil {
t.Fatalf("empty handle should be no-op: %v", err)
}
}
func TestProcessingFailed_DelegatesToCompleted(t *testing.T) {
t.Parallel()
adapter := NewTelegramAdapter(nil)
ctx := context.Background()
err := adapter.ProcessingFailed(ctx, channel.ChannelConfig{}, channel.InboundMessage{}, channel.ProcessingStatusInfo{}, channel.ProcessingStatusHandle{}, fmt.Errorf("test"))
if err != nil {
t.Fatalf("empty handle should be no-op: %v", err)
}
}