feat(tts): introduce TTS system (#195)

This commit is contained in:
Fodesu
2026-03-13 02:49:52 +08:00
committed by GitHub
parent 7904de87bd
commit b46e494d3a
71 changed files with 8959 additions and 159 deletions
+168 -32
View File
@@ -2,6 +2,7 @@ package inbound
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
@@ -27,6 +28,8 @@ import (
messagepkg "github.com/memohai/memoh/internal/message"
)
var base64Std = base64.StdEncoding
const (
silentReplyToken = "NO_REPLY"
minDuplicateTextLength = 10
@@ -54,21 +57,33 @@ type mediaIngestor interface {
IngestContainerFile(ctx context.Context, botID, containerPath string) (media.Asset, error)
}
// ttsSynthesizer synthesizes text to speech audio.
type ttsSynthesizer interface {
Synthesize(ctx context.Context, modelID string, text string, overrideCfg map[string]any) ([]byte, string, error)
}
// ttsModelResolver looks up the TTS model ID configured for a bot.
type ttsModelResolver interface {
ResolveTtsModelID(ctx context.Context, botID string) (string, error)
}
// ChannelInboundProcessor routes channel inbound messages to the chat gateway.
type ChannelInboundProcessor struct {
runner flow.Runner
routeResolver RouteResolver
message messagepkg.Writer
mediaService mediaIngestor
reactor channelReactor
inboxService *inbox.Service
commandHandler *command.Handler
registry *channel.Registry
logger *slog.Logger
jwtSecret string
tokenTTL time.Duration
identity *IdentityResolver
observer channel.StreamObserver
runner flow.Runner
routeResolver RouteResolver
message messagepkg.Writer
mediaService mediaIngestor
reactor channelReactor
inboxService *inbox.Service
commandHandler *command.Handler
registry *channel.Registry
logger *slog.Logger
jwtSecret string
tokenTTL time.Duration
identity *IdentityResolver
observer channel.StreamObserver
ttsService ttsSynthesizer
ttsModelResolver ttsModelResolver
}
// NewChannelInboundProcessor creates a processor with channel identity-based resolution.
@@ -148,6 +163,16 @@ func (p *ChannelInboundProcessor) SetInboxService(service *inbox.Service) {
p.inboxService = service
}
// SetTtsService configures the TTS synthesizer and settings reader for handling
// <speech> tag events (speech_delta) that require server-side audio synthesis.
func (p *ChannelInboundProcessor) SetTtsService(synth ttsSynthesizer, modelResolver ttsModelResolver) {
if p == nil {
return
}
p.ttsService = synth
p.ttsModelResolver = modelResolver
}
// SetCommandHandler configures the slash command handler for intercepting
// /command messages before they reach the LLM.
func (p *ChannelInboundProcessor) SetCommandHandler(handler *command.Handler) {
@@ -463,31 +488,17 @@ func (p *ChannelInboundProcessor) HandleInbound(ctx context.Context, cfg channel
ingested := p.ingestOutboundAttachments(ctx, strings.TrimSpace(identity.BotID), event.Attachments)
events[i].Attachments = ingested
assetMu.Lock()
for _, att := range ingested {
contentHash := strings.TrimSpace(att.ContentHash)
if contentHash == "" {
continue
}
ref := conversation.OutboundAssetRef{
ContentHash: contentHash,
Role: "attachment",
Ordinal: len(outboundAssetRefs),
Mime: strings.TrimSpace(att.Mime),
SizeBytes: att.Size,
}
if att.Metadata != nil {
if sk, ok := att.Metadata["storage_key"].(string); ok {
ref.StorageKey = sk
}
}
outboundAssetRefs = append(outboundAssetRefs, ref)
}
outboundAssetRefs = append(outboundAssetRefs, buildAssetRefs(ingested, len(outboundAssetRefs))...)
assetMu.Unlock()
}
if event.Type == channel.StreamEventReaction && len(event.Reactions) > 0 {
p.dispatchReactions(ctx, identity.BotID, msg.Channel, target, sourceMessageID, event.Reactions)
continue
}
if event.Type == channel.StreamEventSpeech && len(event.Speeches) > 0 {
p.synthesizeAndPushVoice(ctx, strings.TrimSpace(identity.BotID), event.Speeches, stream, &outboundAssetRefs, &assetMu)
continue
}
if pushErr := stream.Push(ctx, events[i]); pushErr != nil {
streamErr = pushErr
break
@@ -810,6 +821,7 @@ type gatewayStreamEnvelope struct {
Result json.RawMessage `json:"result"`
Attachments json.RawMessage `json:"attachments"`
Reactions json.RawMessage `json:"reactions"`
Speeches json.RawMessage `json:"speeches"`
}
type gatewayStreamDoneData struct {
@@ -911,6 +923,14 @@ func mapStreamChunkToChannelEvents(chunk conversation.StreamChunk) ([]channel.St
return []channel.StreamEvent{
{Type: channel.StreamEventReaction, Reactions: reactions},
}, finalMessages, nil
case "speech_delta":
speeches := parseSpeechDelta(envelope.Speeches)
if len(speeches) == 0 {
return nil, finalMessages, nil
}
return []channel.StreamEvent{
{Type: channel.StreamEventSpeech, Speeches: speeches},
}, finalMessages, nil
case "agent_start":
return []channel.StreamEvent{
{
@@ -1862,6 +1882,122 @@ func parseAttachmentDelta(raw json.RawMessage) []channel.Attachment {
return attachments
}
// synthesizeAndPushVoice handles speech_delta events by synthesizing audio
// and pushing the resulting voice attachment into the outbound stream.
func (p *ChannelInboundProcessor) synthesizeAndPushVoice(
ctx context.Context,
botID string,
speeches []channel.SpeechRequest,
stream channel.OutboundStream,
outboundAssetRefs *[]conversation.OutboundAssetRef,
assetMu *sync.Mutex,
) {
if p.ttsService == nil || p.ttsModelResolver == nil {
if p.logger != nil {
p.logger.Warn("speech_delta received but TTS service not configured")
}
return
}
modelID, err := p.ttsModelResolver.ResolveTtsModelID(ctx, botID)
if err != nil || strings.TrimSpace(modelID) == "" {
if p.logger != nil {
p.logger.Warn("speech_delta: bot has no TTS model configured", slog.String("bot_id", botID))
}
return
}
for _, speech := range speeches {
text := strings.TrimSpace(speech.Text)
if text == "" {
continue
}
audioData, contentType, synthErr := p.ttsService.Synthesize(ctx, modelID, text, nil)
if synthErr != nil {
if p.logger != nil {
p.logger.Warn("speech synthesis failed", slog.String("bot_id", botID), slog.Any("error", synthErr))
}
continue
}
dataURL := encodeDataURL(contentType, audioData)
voiceEvent := channel.StreamEvent{
Type: channel.StreamEventAttachment,
Attachments: []channel.Attachment{
{
Type: channel.AttachmentVoice,
URL: dataURL,
Mime: contentType,
Size: int64(len(audioData)),
},
},
}
ingested := p.ingestOutboundAttachments(ctx, botID, voiceEvent.Attachments)
voiceEvent.Attachments = ingested
assetMu.Lock()
*outboundAssetRefs = append(*outboundAssetRefs, buildAssetRefs(ingested, len(*outboundAssetRefs))...)
assetMu.Unlock()
if pushErr := stream.Push(ctx, voiceEvent); pushErr != nil {
if p.logger != nil {
p.logger.Warn("push voice attachment failed", slog.String("bot_id", botID), slog.Any("error", pushErr))
}
return
}
}
}
// parseSpeechDelta converts raw JSON speech data to SpeechRequest values.
func parseSpeechDelta(raw json.RawMessage) []channel.SpeechRequest {
if len(raw) == 0 {
return nil
}
var items []struct {
Text string `json:"text"`
}
if err := json.Unmarshal(raw, &items); err != nil {
return nil
}
speeches := make([]channel.SpeechRequest, 0, len(items))
for _, item := range items {
text := strings.TrimSpace(item.Text)
if text == "" {
continue
}
speeches = append(speeches, channel.SpeechRequest{Text: text})
}
return speeches
}
func buildAssetRefs(attachments []channel.Attachment, startOrdinal int) []conversation.OutboundAssetRef {
var refs []conversation.OutboundAssetRef
for _, att := range attachments {
contentHash := strings.TrimSpace(att.ContentHash)
if contentHash == "" {
continue
}
ref := conversation.OutboundAssetRef{
ContentHash: contentHash,
Role: "attachment",
Ordinal: startOrdinal + len(refs),
Mime: strings.TrimSpace(att.Mime),
SizeBytes: att.Size,
}
if att.Metadata != nil {
if sk, ok := att.Metadata["storage_key"].(string); ok {
ref.StorageKey = sk
}
}
refs = append(refs, ref)
}
return refs
}
func encodeDataURL(mime string, data []byte) string {
encoded := base64Encode(data)
return "data:" + mime + ";base64," + encoded
}
func base64Encode(data []byte) string {
return base64Std.EncodeToString(data)
}
// parseReactionDelta converts raw JSON reaction data to channel ReactRequests.
func parseReactionDelta(raw json.RawMessage) []channel.ReactRequest {
if len(raw) == 0 {